Archive

Archive for the ‘Hadoop’ Category

A look into Oozie internals

November 24, 2013 Leave a comment

I have been looking into the Oozie code base trying to understand its underlying architecture.
Here are some of my findings:

There are different Services in the Oozie web app. Among them are ActionService, AuthorizationService, CoordinatorStoreService, DagEngineService, SchemaService, UserGroupInformationService, etc. They are in the org.apache.oozie.service package. These services implemented the Service interface. See the Service interface below.

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.oozie.service;

/**
 * A service is component managed by the {@link Services} singleton.
 */
public interface Service {
    public static final String DEFAULT_LOCK_TIMEOUT = "oozie.service.default.lock.timeout";

    /**
     * Prefix for all services configuration properties.
     */
    public static final String CONF_PREFIX = "oozie.service.";

    /**
     * Constant for XCommand
     */
    public static final String USE_XCOMMAND = "oozie.useXCommand";

    /**
     * Initialize the service. <p/> Invoked by the {@link Service} singleton at start up time.
     *
     * @param services services singleton initializing the service.
     * @throws ServiceException thrown if the service could not initialize.
     */
    public void init(Services services) throws ServiceException;

    /**
     * Destroy the service. <p/> Invoked by the {@link Service} singleton at shutdown time.
     */
    public void destroy();

    /**
     * Return the public interface of the service. <p/> Services are retrieved by its public interface. Specializations
     * of services must return the public interface.
     *
     * @return the interface of the service.
     */
    public Class<? extends Service> getInterface();

    /**
     * Lock timeout value if service is only allowed to have one single running instance.
     */
    public static long lockTimeout = Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);

}

The way Oozie makes use of these services is mostly through the app.getService call.
Incoming requests are handled by different servlets which would invoke different services to get thing done.

Also, there are different executors. Each extends the abstract ActionExecutor class. Among them are EmailActionExecutor, HiveActionExecutor.

There is another kind of Executor, JPAExecutor which implements a different interface.
They are used in JPAService. JPAService is the basic JPA service which provides database execution service, eg. execute the select query, update query, or JPAExecutor operation. OpenJPA is used in the ORM layer.

The standard ORM entity beans are used to represent DB entity object with named queries.
For example, WorkflowJobBean as shown below.


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.oozie;

import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.JsonUtils;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.IOException;
import java.io.DataOutput;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import javax.persistence.Entity;
import javax.persistence.Column;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Basic;
import javax.persistence.Lob;
import javax.persistence.Table;
import javax.persistence.Transient;

import java.sql.Timestamp;

import org.apache.openjpa.persistence.jdbc.Index;
import org.apache.openjpa.persistence.jdbc.Strategy;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

@Entity

@NamedQueries({

    @NamedQuery(name = "UPDATE_WORKFLOW", query = "update WorkflowJobBean w set w.appName = :appName, w.appPath = :appPath, w.conf = :conf, w.group = :groupName, w.run = :run, w.user = :user, w.createdTimestamp = :createdTime, w.endTimestamp = :endTime, w.externalId = :externalId, w.lastModifiedTimestamp = :lastModTime,w.logToken = :logToken, w.protoActionConf = :protoActionConf, w.slaXml =:slaXml, w.startTimestamp = :startTime, w.statusStr = :status, w.wfInstance = :wfInstance where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_MODTIME", query = "update WorkflowJobBean w set w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_MODTIME", query = "update WorkflowJobBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_PARENT_MODIFIED", query = "update WorkflowJobBean w set w.parentId = :parentId, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime, w.endTimestamp = :endTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime, w.startTimestamp = :startTime, w.endTimestamp = :endTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_RERUN", query = "update WorkflowJobBean w set w.appName = :appName, w.protoActionConf = :protoActionConf, w.appPath = :appPath, w.conf = :conf, w.logToken = :logToken, w.user = :user, w.group = :group, w.externalId = :externalId, w.endTimestamp = :endTime, w.run = :run, w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "DELETE_WORKFLOW", query = "delete from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOWS", query = "select OBJECT(w) from WorkflowJobBean w order by w.startTimestamp desc"),

    @NamedQuery(name = "GET_WORKFLOWS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId from WorkflowJobBean w order by w.createdTimestamp desc"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT", query = "select count(w) from WorkflowJobBean w"),

    @NamedQuery(name = "GET_COMPLETED_WORKFLOWS_OLDER_THAN", query = "select w from WorkflowJobBean w where w.endTimestamp < :endTime"),

    @NamedQuery(name = "GET_COMPLETED_WORKFLOWS_WITH_NO_PARENT_OLDER_THAN", query = "select w.id from WorkflowJobBean w where w.endTimestamp < :endTime and w.parentId is null"),

    @NamedQuery(name = "GET_WORKFLOW", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_STARTTIME", query = "select w.id, w.startTimestamp from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_USER_GROUP", query = "select w.user, w.group from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_SUSPEND", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance  from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_RERUN", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.run, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_DEFINITION", query = "select w.id, w.user, w.group, w.appName, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_ACTION_OP", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_KILL", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.slaXml from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_RESUME", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_FOR_UPDATE", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_FOR_SLA", query = "select w.id, w.statusStr, w.startTimestamp, w.endTimestamp from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_ID_FOR_EXTERNAL_ID", query = "select  w.id from WorkflowJobBean w where w.externalId = :externalId"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS", query = "select count(w) from WorkflowJobBean w where w.statusStr = :status"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS", query = "select count(w) from WorkflowJobBean w where w.statusStr = :status and w.lastModifiedTimestamp > :lastModTime"),

    @NamedQuery(name = "GET_WORKFLOWS_WITH_WORKFLOW_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId = :parentId"),

    @NamedQuery(name = "GET_WORKFLOWS_WITH_COORD_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId like :parentId"), // when setting parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%")

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' OR w.endTimestamp >= :endTime)"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId like :parentId and (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' OR w.endTimestamp >= :endTime)"), // when setting parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%")

    @NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_STATUS", query = "select w.statusStr from WorkflowJobBean w where w.id = :id")
        })
@Table(name = "WF_JOBS")
public class WorkflowJobBean implements Writable, WorkflowJob, JsonBean {

    @Id
    private String id;

    @Basic
    @Column(name = "proto_action_conf")
    @Lob
    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
    private StringBlob protoActionConf;

    @Basic
    @Column(name = "log_token")
    private String logToken = null;

    @Basic
    @Index
    @Column(name = "external_id")
    private String externalId = null;

    @Basic
    @Index
    @Column(name = "status")
    private String statusStr = WorkflowJob.Status.PREP.toString();

    @Basic
    @Column(name = "created_time")
    private java.sql.Timestamp createdTimestamp = null;

    @Basic
    @Column(name = "start_time")
    private java.sql.Timestamp startTimestamp = null;

    @Basic
    @Index
    @Column(name = "end_time")
    private java.sql.Timestamp endTimestamp = null;

    @Basic
    @Index
    @Column(name = "last_modified_time")
    private java.sql.Timestamp lastModifiedTimestamp = null;

    @Basic
    @Column(name = "wf_instance")
    @Lob
    @Strategy("org.apache.oozie.executor.jpa.BinaryBlobValueHandler")
    private BinaryBlob wfInstance ;

    @Basic
    @Column(name = "sla_xml")
    @Lob
    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
    private StringBlob slaXml;


    @Basic
    @Column(name = "app_name")
    private String appName = null;

    @Basic
    @Column(name = "app_path")
    private String appPath = null;

    @Basic
    @Column(name = "conf")
    @Lob
    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
    private StringBlob conf;

    @Basic
    @Column(name = "user_name")
    private String user = null;

    @Basic
    @Column(name = "group_name")
    private String group;

    @Basic
    @Column(name = "run")
    private int run = 1;

    @Basic
    @Index
    @Column(name = "parent_id")
    private String parentId;

    @Transient
    private String consoleUrl;

    @Transient
    private List<WorkflowActionBean> actions;


    /**
     * Default constructor.
     */
    public WorkflowJobBean() {
        actions = new ArrayList<WorkflowActionBean>();
    }

    /**
     * Serialize the workflow bean to a data output.
     *
     * @param dataOutput data output.
     * @throws IOException thrown if the workflow bean could not be serialized.
     */
    public void write(DataOutput dataOutput) throws IOException {
        WritableUtils.writeStr(dataOutput, getAppPath());
        WritableUtils.writeStr(dataOutput, getAppName());
        WritableUtils.writeStr(dataOutput, getId());
        WritableUtils.writeStr(dataOutput, getParentId());
        WritableUtils.writeStr(dataOutput, getConf());
        WritableUtils.writeStr(dataOutput, getStatusStr());
        dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
        dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
        dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
        dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
        WritableUtils.writeStr(dataOutput, getUser());
        WritableUtils.writeStr(dataOutput, getGroup());
        dataOutput.writeInt(getRun());
        WritableUtils.writeStr(dataOutput, logToken);
        WritableUtils.writeStr(dataOutput, getProtoActionConf());
    }

    /**
     * Deserialize a workflow bean from a data input.
     *
     * @param dataInput data input.
     * @throws IOException thrown if the workflow bean could not be deserialized.
     */
    public void readFields(DataInput dataInput) throws IOException {
        setAppPath(WritableUtils.readStr(dataInput));
        setAppName(WritableUtils.readStr(dataInput));
        setId(WritableUtils.readStr(dataInput));
        setParentId(WritableUtils.readStr(dataInput));
        setConf(WritableUtils.readStr(dataInput));
        setStatus(WorkflowJob.Status.valueOf(WritableUtils.readStr(dataInput)));
        // setStatus(WritableUtils.readStr(dataInput));
        long d = dataInput.readLong();
        if (d != -1) {
            setCreatedTime(new Date(d));
        }
        d = dataInput.readLong();
        if (d != -1) {
        }
        setStartTime(new Date(d));
        d = dataInput.readLong();
        if (d != -1) {
            setLastModifiedTime(new Date(d));
        }
        d = dataInput.readLong();
        if (d != -1) {
            setEndTime(new Date(d));
        }
        setUser(WritableUtils.readStr(dataInput));
        setGroup(WritableUtils.readStr(dataInput));
        setRun(dataInput.readInt());
        logToken = WritableUtils.readStr(dataInput);
        setProtoActionConf(WritableUtils.readStr(dataInput));
        setExternalId(getExternalId());
    }

    public boolean inTerminalState() {
        boolean inTerminalState = false;
        switch (WorkflowJob.Status.valueOf(statusStr)) {
            case FAILED:
            case KILLED:
            case SUCCEEDED:
                inTerminalState = true;
                break;
            default:
                break;
        }
        return inTerminalState;
    }

    public String getLogToken() {
        return logToken;
    }

    public void setLogToken(String logToken) {
        this.logToken = logToken;
    }

    public String getSlaXml() {
        return slaXml == null ? null : slaXml.getString();
    }

    public void setSlaXml(String slaXml) {
        if (this.slaXml == null) {
            this.slaXml = new StringBlob(slaXml);
        }
        else {
            this.slaXml.setString(slaXml);
        }
    }

    public void setSlaXmlBlob(StringBlob slaXml) {
        this.slaXml = slaXml;
    }

    public StringBlob getSlaXmlBlob() {
        return this.slaXml;
    }

    public WorkflowInstance getWorkflowInstance() {
        return wfInstance == null ? null : get(wfInstance.getBytes());
    }

    public BinaryBlob getWfInstanceBlob() {
        return this.wfInstance;
    }

    public void setWorkflowInstance(WorkflowInstance workflowInstance) {
        if (this.wfInstance == null) {
            this.wfInstance = new BinaryBlob(WritableUtils.toByteArray((LiteWorkflowInstance) workflowInstance), true);
        }
        else {
            this.wfInstance.setBytes(WritableUtils.toByteArray((LiteWorkflowInstance) workflowInstance));
        }
    }

    public void setWfInstanceBlob(BinaryBlob wfInstance) {
        this.wfInstance = wfInstance;
    }

    public String getProtoActionConf() {
        return protoActionConf == null ? null : protoActionConf.getString();
    }

    public void setProtoActionConf(String protoActionConf) {
        if (this.protoActionConf == null) {
            this.protoActionConf = new StringBlob(protoActionConf);
        }
        else {
            this.protoActionConf.setString(protoActionConf);
        }
    }

    public void setProtoActionConfBlob (StringBlob protoBytes) {
        this.protoActionConf = protoBytes;
    }

    public StringBlob getProtoActionConfBlob() {
        return this.protoActionConf;
    }

    public String getlogToken() {
        return logToken;
    }

    public Timestamp getLastModifiedTimestamp() {
        return lastModifiedTimestamp;
    }

    public Timestamp getStartTimestamp() {
        return startTimestamp;
    }

    public Timestamp getCreatedTimestamp() {
        return createdTimestamp;
    }

    public Timestamp getEndTimestamp() {
        return endTimestamp;
    }

    public void setStatusStr (String statusStr) {
        this.statusStr = statusStr;
    }

    public void setStatus(Status val) {
        this.statusStr = val.toString();
    }

    @Override
    public Status getStatus() {
        return Status.valueOf(statusStr);
    }

    public String getStatusStr() {
        return statusStr;
    }

    public void setExternalId(String externalId) {
        this.externalId = externalId;
    }

    @Override
    public String getExternalId() {
        return externalId;
    }

    public void setLastModifiedTime(Date lastModifiedTime) {
        this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
    }

    public Date getLastModifiedTime() {
        return DateUtils.toDate(lastModifiedTimestamp);
    }

    public Date getCreatedTime() {
        return DateUtils.toDate(createdTimestamp);
    }

    public void setCreatedTime(Date createdTime) {
        this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
    }

    @Override
    public Date getStartTime() {
        return DateUtils.toDate(startTimestamp);
    }

    public void setStartTime(Date startTime) {
        this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
    }

    public Date getEndTime() {
        return DateUtils.toDate(endTimestamp);
    }

    public void setEndTime(Date endTime) {
        this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
    }

    private WorkflowInstance get(byte[] array) {
        LiteWorkflowInstance pInstance = WritableUtils.fromByteArray(array, LiteWorkflowInstance.class);
        return pInstance;
    }

    @SuppressWarnings("unchecked")
    public JSONObject toJSONObject() {
        return toJSONObject("GMT");
    }

    @SuppressWarnings("unchecked")
    public JSONObject toJSONObject(String timeZoneId) {
        JSONObject json = new JSONObject();
        json.put(JsonTags.WORKFLOW_APP_PATH, getAppPath());
        json.put(JsonTags.WORKFLOW_APP_NAME, getAppName());
        json.put(JsonTags.WORKFLOW_ID, getId());
        json.put(JsonTags.WORKFLOW_EXTERNAL_ID, getExternalId());
        json.put(JsonTags.WORKFLOW_PARENT_ID, getParentId());
        json.put(JsonTags.WORKFLOW_CONF, getConf());
        json.put(JsonTags.WORKFLOW_STATUS, getStatus().toString());
        json.put(JsonTags.WORKFLOW_LAST_MOD_TIME, JsonUtils.formatDateRfc822(getLastModifiedTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_CREATED_TIME, JsonUtils.formatDateRfc822(getCreatedTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_START_TIME, JsonUtils.formatDateRfc822(getStartTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_END_TIME, JsonUtils.formatDateRfc822(getEndTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_USER, getUser());
        json.put(JsonTags.WORKFLOW_GROUP, getGroup());
        json.put(JsonTags.WORKFLOW_ACL, getAcl());
        json.put(JsonTags.WORKFLOW_RUN, (long) getRun());
        json.put(JsonTags.WORKFLOW_CONSOLE_URL, getConsoleUrl());
        json.put(JsonTags.WORKFLOW_ACTIONS, WorkflowActionBean.toJSONArray(actions, timeZoneId));
        json.put(JsonTags.TO_STRING, toString());
        return json;
    }

    public String getAppPath() {
        return appPath;
    }

    public void setAppPath(String appPath) {
        this.appPath = appPath;
    }

    public String getAppName() {
        return appName;
    }

    public void setAppName(String appName) {
        this.appName = appName;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getConf() {
        return conf == null ? null : conf.getString();
    }

    public void setConf(String conf) {
        if (this.conf == null) {
            this.conf = new StringBlob(conf);
        }
        else {
            this.conf.setString(conf);
        }
    }

    public void setConfBlob(StringBlob conf) {
        this.conf = conf;
    }

    public StringBlob getConfBlob() {
        return this.conf;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getGroup() {
        return group;
    }

    @Override
    public String getAcl() {
        return getGroup();
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public int getRun() {
        return run;
    }

    public void setRun(int run) {
        this.run = run;
    }

    /**
     * Return the workflow job console URL.
     *
     * @return the workflow job console URL.
     */
    public String getConsoleUrl() {
        return consoleUrl;
    }

    /**
     * Return the corresponding Action ID, if any.
     *
     * @return the coordinator Action Id.
     */
    public String getParentId() {
        return parentId;
    }

    /**
     * Set coordinator action id
     *
     * @param parentId : coordinator action id
     */
    public void setParentId(String parentId) {
        this.parentId = parentId;
    }

    /**
     * Set the workflow job console URL.
     *
     * @param consoleUrl the workflow job console URL.
     */
    public void setConsoleUrl(String consoleUrl) {
        this.consoleUrl = consoleUrl;
    }

    @SuppressWarnings("unchecked")
    public List<WorkflowAction> getActions() {
        return (List) actions;
    }

    public void setActions(List<WorkflowActionBean> nodes) {
        this.actions = (nodes != null) ? nodes : new ArrayList<WorkflowActionBean>();
    }

    @Override
    public String toString() {
        return MessageFormat.format("Workflow id[{0}] status[{1}]", getId(), getStatus());
    }

    /**
     * Convert a workflows list into a JSONArray.
     *
     * @param workflows workflows list.
     * @param timeZoneId time zone to use for dates in the JSON array.
     * @return the corresponding JSON array.
     */
    @SuppressWarnings("unchecked")
    public static JSONArray toJSONArray(List<WorkflowJobBean> workflows, String timeZoneId) {
        JSONArray array = new JSONArray();
        if (workflows != null) {
            for (WorkflowJobBean node : workflows) {
                array.add(node.toJSONObject(timeZoneId));
            }
        }
        return array;
    }



}

If you are interested in the relational table schema used in Oozie, you can take a look at the OozieSchema in org.apache.oozie.store package which encapsulates the schema in java code

Oozie has the following DB tables
Process Instance Table, WorkflowJob Table, Actions Table, Version Table

      // Process Instance Table
        PI_wfId(OozieTable.WF_PROCESS_INSTANCE, String.class, true, 100),
        PI_state(OozieTable.WF_PROCESS_INSTANCE, Blob.class, false),

        // WorkflowJob Table
        WF_id(OozieTable.WORKFLOWS, String.class, true, 100),
        WF_externalId(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appPath(OozieTable.WORKFLOWS, String.class, false, 255),
        WF_conf(OozieTable.WORKFLOWS, String.class, false),
        WF_protoActionConf(OozieTable.WORKFLOWS, String.class, false),
        WF_logToken(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_status(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_run(OozieTable.WORKFLOWS, Long.class, false),
        WF_lastModTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_createdTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_startTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_endTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_userName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_groupName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_authToken(OozieTable.WORKFLOWS, String.class, false),

        // Actions Table
        ACTIONS_id(OozieTable.ACTIONS, String.class, true, 100),
        ACTIONS_name(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_type(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_wfId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_conf(OozieTable.ACTIONS, String.class, false),
        ACTIONS_status(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_externalStatus(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorCode(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorMessage(OozieTable.ACTIONS, String.class, false),
        ACTIONS_transition(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_retries(OozieTable.ACTIONS, Long.class, false),
        ACTIONS_startTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_endTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_lastCheckTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_data(OozieTable.ACTIONS, String.class, false),
        ACTIONS_externalId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_trackerUri(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_consoleUrl(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_executionPath(OozieTable.ACTIONS, String.class, false, 255),
        ACTIONS_pending(OozieTable.ACTIONS, Boolean.class, false),
        ACTIONS_pendingAge(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_signalValue(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_logToken(OozieTable.ACTIONS, String.class, false, 100),

        // Version Table
        VER_versionNumber(OozieTable.VERSION, String.class, false, 255);


OozieSchema which defines the DB tables used in Oozie.


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.oozie.store;

import java.sql.Blob;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.oozie.util.db.Schema;
import org.apache.oozie.util.db.Schema.Column;
import org.apache.oozie.util.db.Schema.DBType;
import org.apache.oozie.util.db.Schema.Index;
import org.apache.oozie.util.db.Schema.Table;

public class OozieSchema {

    private static String oozieDbName;

    private static final String OOZIE_VERSION = "0.1";

    public static final Map<Table, List<Column>> TABLE_COLUMNS = new HashMap<Table, List<Column>>();

    static {
        for (Column column : OozieColumn.values()) {
            List<Column> tColumns = TABLE_COLUMNS.get(column.table());
            if (tColumns == null) {
                tColumns = new ArrayList<Column>();
                TABLE_COLUMNS.put(column.table(), tColumns);
            }
            tColumns.add(column);
        }
    }

    public static void setOozieDbName(String dbName) {
        oozieDbName = dbName;
    }

    public static enum OozieTable implements Table {
        WORKFLOWS,
        ACTIONS,
        WF_PROCESS_INSTANCE,
        VERSION;

        @Override
        public String toString() {
            return oozieDbName + "." + name().toUpperCase();
        }
    }

    public static enum OozieIndex implements Index {
        IDX_WF_APPNAME(OozieColumn.WF_appName),
        IDX_WF_USER(OozieColumn.WF_userName),
        IDX_WF_GROUP(OozieColumn.WF_groupName),
        IDX_WF_STATUS(OozieColumn.WF_status),
        IDX_WF_EXTERNAL_ID(OozieColumn.WF_externalId),

        IDX_ACTIONS_BEGINTIME(OozieColumn.ACTIONS_pendingAge),
        IDX_ACTIONS_WFID(OozieColumn.ACTIONS_wfId);

        final Column column;

        OozieIndex(Column column) {
            this.column = column;
        }

        public Column column() {
            return column;
        }
    }

     public static enum OozieColumn implements Column {
        // Process Instance Table
        PI_wfId(OozieTable.WF_PROCESS_INSTANCE, String.class, true, 100),
        PI_state(OozieTable.WF_PROCESS_INSTANCE, Blob.class, false),

        // WorkflowJob Table
        WF_id(OozieTable.WORKFLOWS, String.class, true, 100),
        WF_externalId(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appPath(OozieTable.WORKFLOWS, String.class, false, 255),
        WF_conf(OozieTable.WORKFLOWS, String.class, false),
        WF_protoActionConf(OozieTable.WORKFLOWS, String.class, false),
        WF_logToken(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_status(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_run(OozieTable.WORKFLOWS, Long.class, false),
        WF_lastModTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_createdTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_startTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_endTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_userName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_groupName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_authToken(OozieTable.WORKFLOWS, String.class, false),

        // Actions Table
        ACTIONS_id(OozieTable.ACTIONS, String.class, true, 100),
        ACTIONS_name(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_type(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_wfId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_conf(OozieTable.ACTIONS, String.class, false),
        ACTIONS_status(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_externalStatus(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorCode(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorMessage(OozieTable.ACTIONS, String.class, false),
        ACTIONS_transition(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_retries(OozieTable.ACTIONS, Long.class, false),
        ACTIONS_startTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_endTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_lastCheckTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_data(OozieTable.ACTIONS, String.class, false),
        ACTIONS_externalId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_trackerUri(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_consoleUrl(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_executionPath(OozieTable.ACTIONS, String.class, false, 255),
        ACTIONS_pending(OozieTable.ACTIONS, Boolean.class, false),
        ACTIONS_pendingAge(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_signalValue(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_logToken(OozieTable.ACTIONS, String.class, false, 100),

        // Version Table
        VER_versionNumber(OozieTable.VERSION, String.class, false, 255);

        final Table table;
        final Class<?> type;
        int length = -1;
        final boolean isPrimaryKey;

        OozieColumn(Table table, Class<?> type, boolean isPrimaryKey) {
            this(table, type, isPrimaryKey, -1);
        }

        OozieColumn(Table table, Class<?> type, boolean isPrimaryKey, int length) {
            this.table = table;
            this.type = type;
            this.isPrimaryKey = isPrimaryKey;
            this.length = length;
        }

        private String getName() {
            String tName = table.name();
            return tName + "." + columnName();
        }

        public String columnName() {
            return name().split("_")[1].toLowerCase();
        }

        @Override
        public String toString() {
            return getName();
        }

        public Table table() {
            return table;
        }

        public Class<?> getType() {
            return type;
        }

        public int getLength() {
            return length;
        }

        public String asLabel() {
            return name().toUpperCase();
        }

        public boolean isPrimaryKey() {
            return isPrimaryKey;
        }
    }

    /**
     * Generates the create table SQL Statement
     *
     * @param table
     * @param dbType
     * @return SQL Statement to create the table
     */
    public static String generateCreateTableScript(Table table, DBType dbType) {
        return Schema.generateCreateTableScript(table, dbType, TABLE_COLUMNS.get(table));
    }

    /**
     * Gets the query that will be used to validate the connection
     *
     * @param dbName
     * @return
     */
    public static String getValidationQuery(String dbName) {
        return "select count(" + OozieColumn.VER_versionNumber.columnName() + ") from " + dbName + "."
                + OozieTable.VERSION.name().toUpperCase();
    }

    /**
     * Generates the Insert statement to insert the OOZIE_VERSION to table
     *
     * @param dbName
     * @return
     */
    public static String generateInsertVersionScript(String dbName) {
        return "INSERT INTO " + dbName + "." + OozieTable.VERSION.name().toUpperCase() + "("
                + OozieColumn.VER_versionNumber.columnName() + ") VALUES(" + OOZIE_VERSION + ")";
    }

    /**
     * Gets the Oozie Schema Version
     *
     * @return
     */
    public static String getOozieVersion() {
        return OOZIE_VERSION;
    }
}

In Oozie, there are different DB stores, eg. Coord Store, Workflow Store, SLA Store
They are in the package org.apache.oozie.store

Store is the abstract class to separate Entities from Actual store implementation
It allows one to get the EntityManager, getConnection, perform transaction control and rollback

The DB implementations are
CoordinatorStore, DB Implementation of Coord Store
WorkflowStore, DB Implementation of Workflow Store


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.oozie.store;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import javax.persistence.EntityManager;
import javax.persistence.Query;

import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.WorkflowsInfo;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob.Status;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.SchemaService.SchemaName;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.openjpa.persistence.OpenJPAEntityManager;
import org.apache.openjpa.persistence.OpenJPAPersistence;
import org.apache.openjpa.persistence.OpenJPAQuery;
import org.apache.openjpa.persistence.jdbc.FetchDirection;
import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
import org.apache.openjpa.persistence.jdbc.ResultSetType;

/**
 * DB Implementation of Workflow Store
 */
public class WorkflowStore extends Store {
    private Connection conn;
    private EntityManager entityManager;
    private boolean selectForUpdate;
    private static final String INSTR_GROUP = "db";
    public static final int LOCK_TIMEOUT = 50000;
    private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
    private static final String countStr = "Select count(w) from WorkflowJobBean w";

    public WorkflowStore() {
    }

    public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException {
        super();
        conn = ParamChecker.notNull(connection, "conn");
        entityManager = getEntityManager();
        this.selectForUpdate = selectForUpdate;
    }

    public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException {
        super(store);
        conn = ParamChecker.notNull(connection, "conn");
        entityManager = getEntityManager();
        this.selectForUpdate = selectForUpdate;
    }

    public WorkflowStore(boolean selectForUpdate) throws StoreException {
        super();
        entityManager = getEntityManager();
        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
        OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager);
        conn = (Connection) kem.getConnection();
        this.selectForUpdate = selectForUpdate;
    }

    public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException {
        super(store);
        entityManager = getEntityManager();
        this.selectForUpdate = selectForUpdate;
    }

    /**
     * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job.
     *
     * @param workflow workflow bean
     * @throws StoreException
     */

    public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException {
        ParamChecker.notNull(workflow, "workflow");

        doOperation("insertWorkflow", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                entityManager.persist(workflow);
                return null;
            }
        });
    }

    /**
     * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow
     * depending on the locking parameter.
     *
     * @param id Workflow ID
     * @param locking true if Workflow is to be locked
     * @return WorkflowJobBean
     * @throws StoreException
     */
    public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException {
        ParamChecker.notEmpty(id, "WorkflowID");
        WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() {
            public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException {
                WorkflowJobBean wfBean = null;
                wfBean = getWorkflowOnly(id, locking);
                if (wfBean == null) {
                    throw new StoreException(ErrorCode.E0604, id);
                }
                /*
                 * WorkflowInstance wfInstance; //krishna and next line
                 * wfInstance = workflowLib.get(id); wfInstance =
                 * wfBean.get(wfBean.getWfInstance());
                 * wfBean.setWorkflowInstance(wfInstance);
                 * wfBean.setWfInstance(wfInstance);
                 */
                return wfBean;
            }
        });
        return wfBean;
    }

    /**
     * Get the number of Workflows with the given status.
     *
     * @param status Workflow Status.
     * @return number of Workflows with given status.
     * @throws StoreException
     */
    public int getWorkflowCountWithStatus(final String status) throws StoreException {
        ParamChecker.notEmpty(status, "status");
        Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() {
            public Integer call() throws SQLException {
                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS");
                q.setParameter("status", status);
                Long count = (Long) q.getSingleResult();
                return Integer.valueOf(count.intValue());
            }
        });
        return cnt.intValue();
    }

    /**
     * Get the number of Workflows with the given status which was modified in given time limit.
     *
     * @param status Workflow Status.
     * @param secs No. of seconds within which the workflow got modified.
     * @return number of Workflows modified within given time with given status.
     * @throws StoreException
     */
    public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException {
        ParamChecker.notEmpty(status, "status");
        ParamChecker.notEmpty(status, "secs");
        Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() {
            public Integer call() throws SQLException {
                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS");
                Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000));
                q.setParameter("status", status);
                q.setParameter("lastModTime", ts);
                Long count = (Long) q.getSingleResult();
                return Integer.valueOf(count.intValue());
            }
        });
        return cnt.intValue();
    }

    /**
     * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated
     *
     * @param wfBean Workflow Bean
     * @throws StoreException If Workflow doesn't exist
     */
    public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
        ParamChecker.notNull(wfBean, "WorkflowJobBean");
        doOperation("updateWorkflow", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
                WorkflowJobQueryExecutor.getInstance().executeUpdate(
                        WorkflowJobQueryExecutor.WorkflowJobQuery.UPDATE_WORKFLOW, wfBean);
                return null;
            }
        });
    }

    /**
     * Create a new Action record in the ACTIONS table with the given Bean.
     *
     * @param action WorkflowActionBean
     * @throws StoreException If the action is already present
     */
    public void insertAction(final WorkflowActionBean action) throws StoreException {
        ParamChecker.notNull(action, "WorkflowActionBean");
        doOperation("insertAction", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                entityManager.persist(action);
                return null;
            }
        });
    }

    /**
     * Load the action data and returns a bean.
     *
     * @param id Action Id
     * @param locking true if the action is to be locked
     * @return Action Bean
     * @throws StoreException If action doesn't exist
     */
    public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException {
        ParamChecker.notEmpty(id, "ActionID");
        WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() {
            public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException,
                    InterruptedException {
                Query q = entityManager.createNamedQuery("GET_ACTION");
                /*
                 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
                 * FetchPlan fetch = oq.getFetchPlan();
                 * fetch.setReadLockMode(LockModeType.WRITE);
                 * fetch.setLockTimeout(1000); // 1 seconds }
                 */
                WorkflowActionBean action = null;
                q.setParameter("id", id);
                List<WorkflowActionBean> actions = q.getResultList();
                // action = (WorkflowActionBean) q.getSingleResult();
                if (actions.size() > 0) {
                    action = actions.get(0);
                }
                else {
                    throw new StoreException(ErrorCode.E0605, id);
                }

                /*
                 * if (locking) return action; else
                 */
                // return action;
                return getBeanForRunningAction(action);
            }
        });
        return action;
    }

    /**
     * Update the given action bean to DB.
     *
     * @param action Action Bean
     * @throws StoreException if action doesn't exist
     */
    public void updateAction(final WorkflowActionBean action) throws StoreException {
        ParamChecker.notNull(action, "WorkflowActionBean");
        doOperation("updateAction", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
                WorkflowActionQueryExecutor.getInstance().executeUpdate(
                        WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action);
                return null;
            }
        });
    }

    /**
     * Delete the Action with given id.
     *
     * @param id Action ID
     * @throws StoreException if Action doesn't exist
     */
    public void deleteAction(final String id) throws StoreException {
        ParamChecker.notEmpty(id, "ActionID");
        doOperation("deleteAction", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                /*
                 * Query q = entityManager.createNamedQuery("DELETE_ACTION");
                 * q.setParameter("id", id); q.executeUpdate();
                 */
                WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id);
                if (action != null) {
                    entityManager.remove(action);
                }
                return null;
            }
        });
    }

    /**
     * Loads all the actions for the given Workflow. Also locks all the actions if locking is true.
     *
     * @param wfId Workflow ID
     * @param locking true if Actions are to be locked
     * @return A List of WorkflowActionBean
     * @throws StoreException
     */
    public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking)
            throws StoreException {
        ParamChecker.notEmpty(wfId, "WorkflowID");
        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
                                                       new Callable<List<WorkflowActionBean>>() {
                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
                                                                   InterruptedException {
                                                               List<WorkflowActionBean> actions;
                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
                                                               try {
                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");

                                                                   /*
                                                                   * OpenJPAQuery oq = OpenJPAPersistence.cast(q);
                                                                   * if (locking) { //
                                                                   * q.setHint("openjpa.FetchPlan.ReadLockMode"
                                                                   * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan();
                                                                   * fetch.setReadLockMode(LockModeType.WRITE);
                                                                   * fetch.setLockTimeout(1000); // 1 seconds }
                                                                   */
                                                                   q.setParameter("wfId", wfId);
                                                                   actions = q.getResultList();
                                                                   for (WorkflowActionBean a : actions) {
                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
                                                                       actionList.add(aa);
                                                                   }
                                                               }
                                                               catch (IllegalStateException e) {
                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                                                               }
                                                               /*
                                                               * if (locking) { return actions; } else {
                                                               */
                                                               return actionList;
                                                               // }
                                                           }
                                                       });
        return actions;
    }

    /**
     * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true.
     *
     * @param wfId Workflow ID
     * @param start offset for select statement
     * @param len number of Workflow Actions to be returned
     * @param locking true if Actions are to be locked
     * @return A List of WorkflowActionBean
     * @throws StoreException
     */
    public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len)
            throws StoreException {
        ParamChecker.notEmpty(wfId, "WorkflowID");
        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
                                                       new Callable<List<WorkflowActionBean>>() {
                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
                                                                   InterruptedException {
                                                               List<WorkflowActionBean> actions;
                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
                                                               try {
                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
                                                                   OpenJPAQuery oq = OpenJPAPersistence.cast(q);
                                                                   q.setParameter("wfId", wfId);
                                                                   q.setFirstResult(start - 1);
                                                                   q.setMaxResults(len);
                                                                   actions = q.getResultList();
                                                                   for (WorkflowActionBean a : actions) {
                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
                                                                       actionList.add(aa);
                                                                   }
                                                               }
                                                               catch (IllegalStateException e) {
                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                                                               }
                                                               return actionList;
                                                           }
                                                       });
        return actions;
    }

    /**
     * Load All the actions that are pending for more than given time.
     *
     * @param minimumPendingAgeSecs Minimum Pending age in seconds
     * @return List of action beans
     * @throws StoreException
     */
    public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException {
        List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() {
            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
                Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
                List<WorkflowActionBean> actionList = null;
                try {
                    Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS");
                    q.setParameter("pendingAge", ts);
                    actionList = q.getResultList();
                }
                catch (IllegalStateException e) {
                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                }
                return actionList;
            }
        });
        return actions;
    }

    /**
     * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs
     *
     * @param checkAgeSecs check age in seconds.
     * @return List of action beans.
     * @throws StoreException
     */
    public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException {
        List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() {

            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
                List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>();
                Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
                try {
                    Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS");
                    q.setParameter("lastCheckTime", ts);
                    actions = q.getResultList();
                }
                catch (IllegalStateException e) {
                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                }

                return actions;
            }
        });
        return actions;
    }

    /**
     * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL.
     *
     * @param wfId String
     * @return List of action beans
     * @throws StoreException
     */
    public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException {
        List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS",
                new Callable<List<WorkflowActionBean>>() {
                    public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
                        List<WorkflowActionBean> actionList = null;
                        try {
                            Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS");
                            q.setParameter("wfId", wfId);
                            actionList = q.getResultList();
                        }
                        catch (IllegalStateException e) {
                            throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                        }

                        return actionList;
                    }
                });
        return actions;
    }

    /**
     * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group,
     * appName, status.
     *
     * @param filter Filter condition
     * @param start offset for select statement
     * @param len number of Workflows to be returned
     * @return A list of workflows
     * @throws StoreException
     */
    public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len)
            throws StoreException {

        WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() {
            @SuppressWarnings("unchecked")
            public WorkflowsInfo call() throws SQLException, StoreException {

                List<String> orArray = new ArrayList<String>();
                List<String> colArray = new ArrayList<String>();
                List<String> valArray = new ArrayList<String>();
                StringBuilder sb = new StringBuilder("");
                boolean isStatus = false;
                boolean isGroup = false;
                boolean isAppName = false;
                boolean isUser = false;
                boolean isEnabled = false;
                int index = 0;
                for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
                    String colName = null;
                    String colVar = null;
                    if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
                        List<String> values = filter.get(OozieClient.FILTER_GROUP);
                        colName = "group";
                        for (int i = 0; i < values.size(); i++) {
                            colVar = "group";
                            colVar = colVar + index;
                            if (!isEnabled && !isGroup) {
                                sb.append(seletStr).append(" where w.group IN (:group" + index);
                                isGroup = true;
                                isEnabled = true;
                            }
                            else {
                                if (isEnabled && !isGroup) {
                                    sb.append(" and w.group IN (:group" + index);
                                    isGroup = true;
                                }
                                else {
                                    if (isGroup) {
                                        sb.append(", :group" + index);
                                    }
                                }
                            }
                            if (i == values.size() - 1) {
                                sb.append(")");
                            }
                            index++;
                            valArray.add(values.get(i));
                            orArray.add(colName);
                            colArray.add(colVar);
                        }
                    }
                    else {
                        if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
                            List<String> values = filter.get(OozieClient.FILTER_STATUS);
                            colName = "status";
                            for (int i = 0; i < values.size(); i++) {
                                colVar = "status";
                                colVar = colVar + index;
                                if (!isEnabled && !isStatus) {
                                    sb.append(seletStr).append(" where w.statusStr IN (:status" + index);
                                    isStatus = true;
                                    isEnabled = true;
                                }
                                else {
                                    if (isEnabled && !isStatus) {
                                        sb.append(" and w.statusStr IN (:status" + index);
                                        isStatus = true;
                                    }
                                    else {
                                        if (isStatus) {
                                            sb.append(", :status" + index);
                                        }
                                    }
                                }
                                if (i == values.size() - 1) {
                                    sb.append(")");
                                }
                                index++;
                                valArray.add(values.get(i));
                                orArray.add(colName);
                                colArray.add(colVar);
                            }
                        }
                        else {
                            if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
                                List<String> values = filter.get(OozieClient.FILTER_NAME);
                                colName = "appName";
                                for (int i = 0; i < values.size(); i++) {
                                    colVar = "appName";
                                    colVar = colVar + index;
                                    if (!isEnabled && !isAppName) {
                                        sb.append(seletStr).append(" where w.appName IN (:appName" + index);
                                        isAppName = true;
                                        isEnabled = true;
                                    }
                                    else {
                                        if (isEnabled && !isAppName) {
                                            sb.append(" and w.appName IN (:appName" + index);
                                            isAppName = true;
                                        }
                                        else {
                                            if (isAppName) {
                                                sb.append(", :appName" + index);
                                            }
                                        }
                                    }
                                    if (i == values.size() - 1) {
                                        sb.append(")");
                                    }
                                    index++;
                                    valArray.add(values.get(i));
                                    orArray.add(colName);
                                    colArray.add(colVar);
                                }
                            }
                            else {
                                if (entry.getKey().equals(OozieClient.FILTER_USER)) {
                                    List<String> values = filter.get(OozieClient.FILTER_USER);
                                    colName = "user";
                                    for (int i = 0; i < values.size(); i++) {
                                        colVar = "user";
                                        colVar = colVar + index;
                                        if (!isEnabled && !isUser) {
                                            sb.append(seletStr).append(" where w.user IN (:user" + index);
                                            isUser = true;
                                            isEnabled = true;
                                        }
                                        else {
                                            if (isEnabled && !isUser) {
                                                sb.append(" and w.user IN (:user" + index);
                                                isUser = true;
                                            }
                                            else {
                                                if (isUser) {
                                                    sb.append(", :user" + index);
                                                }
                                            }
                                        }
                                        if (i == values.size() - 1) {
                                            sb.append(")");
                                        }
                                        index++;
                                        valArray.add(values.get(i));
                                        orArray.add(colName);
                                        colArray.add(colVar);
                                    }
                                }
                            }
                        }
                    }
                }

                int realLen = 0;

                Query q = null;
                Query qTotal = null;
                if (orArray.size() == 0) {
                    q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS");
                    q.setFirstResult(start - 1);
                    q.setMaxResults(len);
                    qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT");
                }
                else {
                    if (orArray.size() > 0) {
                        StringBuilder sbTotal = new StringBuilder(sb);
                        sb.append(" order by w.startTimestamp desc ");
                        XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
                        q = entityManager.createQuery(sb.toString());
                        q.setFirstResult(start - 1);
                        q.setMaxResults(len);
                        qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr));
                        for (int i = 0; i < orArray.size(); i++) {
                            q.setParameter(colArray.get(i), valArray.get(i));
                            qTotal.setParameter(colArray.get(i), valArray.get(i));
                        }
                    }
                }

                OpenJPAQuery kq = OpenJPAPersistence.cast(q);
                JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
                fetch.setFetchBatchSize(20);
                fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
                fetch.setFetchDirection(FetchDirection.FORWARD);
                fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
                List<?> resultList = q.getResultList();
                List<Object[]> objectArrList = (List<Object[]>) resultList;
                List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();

                for (Object[] arr : objectArrList) {
                    WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);
                    wfBeansList.add(ww);
                }

                realLen = ((Long) qTotal.getSingleResult()).intValue();

                return new WorkflowsInfo(wfBeansList, start, len, realLen);
            }
        });
        return workFlowsInfo;

    }

    /**
     * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded
     *
     * @param id Workflow Id
     * @return Workflow Bean
     * @throws StoreException If Workflow doesn't exist
     */
    public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException {
        ParamChecker.notEmpty(id, "WorkflowID");
        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
                WorkflowJobBean wfBean = null;
                wfBean = getWorkflowforInfo(id, false);
                if (wfBean == null) {
                    throw new StoreException(ErrorCode.E0604, id);
                }
                else {
                    wfBean.setActions(getActionsForWorkflow(id, false));
                }
                return wfBean;
            }
        });
        return wfBean;
    }

    /**
     * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded
     *
     * @param id Workflow Id
     * @param start offset for select statement for actions
     * @param len number of Workflow Actions to be returned
     * @return Workflow Bean
     * @throws StoreException If Workflow doesn't exist
     */
    public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException {
        ParamChecker.notEmpty(id, "WorkflowID");
        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
                WorkflowJobBean wfBean = null;
                wfBean = getWorkflowforInfo(id, false);
                if (wfBean == null) {
                    throw new StoreException(ErrorCode.E0604, id);
                }
                else {
                    wfBean.setActions(getActionsSubsetForWorkflow(id, start, len));
                }
                return wfBean;
            }
        });
        return wfBean;
    }

    /**
     * Get the Workflow ID with given external ID which will be assigned for the subworkflows.
     *
     * @param externalId external ID
     * @return Workflow ID
     * @throws StoreException if there is no job with external ID
     */
    public String getWorkflowIdForExternalId(final String externalId) throws StoreException {
        ParamChecker.notEmpty(externalId, "externalId");
        String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() {
            public String call() throws SQLException, StoreException {
                String id = "";
                Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID");
                q.setParameter("externalId", externalId);
                List<String> w = q.getResultList();
                if (w.size() == 0) {
                    id = "";
                }
                else {
                    int index = w.size() - 1;
                    id = w.get(index);
                }
                return id;
            }
        });
        return wfId;
    }

    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;

    /**
     * Purge the Workflows Completed older than given days.
     *
     * @param olderThanDays number of days for which to preserve the workflows
     * @throws StoreException
     */
    public void purge(final long olderThanDays, final int limit) throws StoreException {
        doOperation("purge", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
                Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN");
                q.setParameter("endTime", maxEndTime);
                q.setMaxResults(limit);
                List<WorkflowJobBean> workflows = q.getResultList();
                int actionDeleted = 0;
                if (workflows.size() != 0) {
                    for (WorkflowJobBean w : workflows) {
                        String wfId = w.getId();
                        entityManager.remove(w);
                        Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
                        g.setParameter("wfId", wfId);
                        actionDeleted += g.executeUpdate();
                    }
                }
                XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted);
                return null;
            }
        });
    }

    private <V> V doOperation(String name, Callable<V> command) throws StoreException {
        try {
            Instrumentation.Cron cron = new Instrumentation.Cron();
            cron.start();
            V retVal;
            try {
                retVal = command.call();
            }
            finally {
                cron.stop();
            }
            Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
            return retVal;
        }
        catch (StoreException ex) {
            throw ex;
        }
        catch (SQLException ex) {
            throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
        }
        catch (Exception e) {
            throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);
        }
    }

    private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException,
            InterruptedException, StoreException {
        WorkflowJobBean wfBean = null;
        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
        /*
         * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ");
         * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch =
         * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE);
         * fetch.setLockTimeout(-1); // unlimited }
         */
        q.setParameter("id", id);
        List<WorkflowJobBean> w = q.getResultList();
        if (w.size() > 0) {
            wfBean = w.get(0);
        }
        return wfBean;
        // return getBeanForRunningWorkflow(wfBean);
    }

    private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException,
            InterruptedException, StoreException {
        WorkflowJobBean wfBean = null;
        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
        q.setParameter("id", id);
        List<WorkflowJobBean> w = q.getResultList();
        if (w.size() > 0) {
            wfBean = w.get(0);
            return getBeanForRunningWorkflow(wfBean);
        }
        return null;
    }

    private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException {
        WorkflowJobBean wfBean = new WorkflowJobBean();
        wfBean.setId(w.getId());
        wfBean.setAppName(w.getAppName());
        wfBean.setAppPath(w.getAppPath());
        wfBean.setConfBlob(w.getConfBlob());
        wfBean.setGroup(w.getGroup());
        wfBean.setRun(w.getRun());
        wfBean.setUser(w.getUser());
        wfBean.setCreatedTime(w.getCreatedTime());
        wfBean.setEndTime(w.getEndTime());
        wfBean.setExternalId(w.getExternalId());
        wfBean.setLastModifiedTime(w.getLastModifiedTime());
        wfBean.setLogToken(w.getLogToken());
        wfBean.setProtoActionConfBlob(w.getProtoActionConfBlob());
        wfBean.setSlaXmlBlob(w.getSlaXmlBlob());
        wfBean.setStartTime(w.getStartTime());
        wfBean.setStatus(w.getStatus());
        wfBean.setWfInstanceBlob(w.getWfInstanceBlob());
        return wfBean;
    }

    private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {

        WorkflowJobBean wfBean = new WorkflowJobBean();
        wfBean.setId((String) arr[0]);
        if (arr[1] != null) {
            wfBean.setAppName((String) arr[1]);
        }
        if (arr[2] != null) {
            wfBean.setStatus(Status.valueOf((String) arr[2]));
        }
        if (arr[3] != null) {
            wfBean.setRun((Integer) arr[3]);
        }
        if (arr[4] != null) {
            wfBean.setUser((String) arr[4]);
        }
        if (arr[5] != null) {
            wfBean.setGroup((String) arr[5]);
        }
        if (arr[6] != null) {
            wfBean.setCreatedTime((Timestamp) arr[6]);
        }
        if (arr[7] != null) {
            wfBean.setStartTime((Timestamp) arr[7]);
        }
        if (arr[8] != null) {
            wfBean.setLastModifiedTime((Timestamp) arr[8]);
        }
        if (arr[9] != null) {
            wfBean.setEndTime((Timestamp) arr[9]);
        }
        return wfBean;
    }

    private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException {
        if (a != null) {
            WorkflowActionBean action = new WorkflowActionBean();
            action.setId(a.getId());
            action.setConfBlob(a.getConfBlob());
            action.setConsoleUrl(a.getConsoleUrl());
            action.setDataBlob(a.getDataBlob());
            action.setStatsBlob(a.getStatsBlob());
            action.setExternalChildIDsBlob(a.getExternalChildIDsBlob());
            action.setErrorInfo(a.getErrorCode(), a.getErrorMessage());
            action.setExternalId(a.getExternalId());
            action.setExternalStatus(a.getExternalStatus());
            action.setName(a.getName());
            action.setCred(a.getCred());
            action.setRetries(a.getRetries());
            action.setTrackerUri(a.getTrackerUri());
            action.setTransition(a.getTransition());
            action.setType(a.getType());
            action.setEndTime(a.getEndTime());
            action.setExecutionPath(a.getExecutionPath());
            action.setLastCheckTime(a.getLastCheckTime());
            action.setLogToken(a.getLogToken());
            if (a.isPending() == true) {
                action.setPending();
            }
            action.setPendingAge(a.getPendingAge());
            action.setSignalValue(a.getSignalValue());
            action.setSlaXmlBlob(a.getSlaXmlBlob());
            action.setStartTime(a.getStartTime());
            action.setStatus(a.getStatus());
            action.setJobId(a.getWfId());
            action.setUserRetryCount(a.getUserRetryCount());
            action.setUserRetryInterval(a.getUserRetryInterval());
            action.setUserRetryMax(a.getUserRetryMax());
            return action;
        }
        return null;
    }
}

In my next blog post, I will talk more about the MapperLauncher, DAGEngine, and etc. Stay tuned.

Advertisements
Categories: Hadoop, Oozie Tags: ,

Interesting paper on accelerating Hadoop via Network Levitated Merge

Categories: Hadoop, Paper Tags: ,

YARN RPC Part II

YarnRPC is the abstract base class that defines several abstract methods to return RPC client proxy and RPC server. HadoopYarnProtoRPC extends YarnRPC to provide concrete implementations to the defined abstract methods.


public abstract class YarnRPC {

 public abstract Object getProxy(Class protocol, InetSocketAddress addr,
      Configuration conf);

  public abstract void stopProxy(Object proxy, Configuration conf);

  public abstract Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers, String portRangeConfig);

  public Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers) {
    return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
        null);
  }

}

Inside HadoopYarnProtoRPC, it uses RPCFactoryProvider which acts as a Singleton Factory to return different implementations of RPCServerFactory and RPCClientFactory interfaces.

Currently, we have RPCServerFactoryPBImpl which implements RPCServerFactory interface and RPCClientFactoryPBImpl which implements RPCClientFactory interface in YARN. These PB factories in turn allows us to inject different Protocol Buffer protocol implementations based on the protocol class in the creation of RPC server and client proxy by the underlying Hadoop RPC.

RpcClientAndServerFactory

Some examples of Protocol Buffer protocol implementations are

org.apache.hadoop.yarn.api.impl.pb.service
AMRMProtocolPBServiceImpl (will be injected if the protocol is AMRMProtocol)
ClientRMProtocolPBServiceImpl (will be injected if the protocol is ClientRMProtocol)

org.apache.hadoop.yarn.api.impl.pb.client
AMRMProtocolPBClientImpl
ClientRMProtocolClientImp


/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.factories.impl.pb;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.factories.RpcServerFactory;

import com.google.protobuf.BlockingService;

public class RpcServerFactoryPBImpl implements RpcServerFactory {

  private static final Log LOG = LogFactory.getLog(RpcServerFactoryPBImpl.class);
  private static final String PROTO_GEN_PACKAGE_NAME = "org.apache.hadoop.yarn.proto";
  private static final String PROTO_GEN_CLASS_SUFFIX = "Service";
  private static final String PB_IMPL_PACKAGE_SUFFIX = "impl.pb.service";
  private static final String PB_IMPL_CLASS_SUFFIX = "PBServiceImpl";
  
  private static final RpcServerFactoryPBImpl self = new RpcServerFactoryPBImpl();

  private Configuration localConf = new Configuration();
  private ConcurrentMap<Class<?>, Constructor<?>> serviceCache = new ConcurrentHashMap<Class<?>, Constructor<?>>();
  private ConcurrentMap<Class<?>, Method> protoCache = new ConcurrentHashMap<Class<?>, Method>();
  
  public static RpcServerFactoryPBImpl get() {
    return RpcServerFactoryPBImpl.self;
  }
  

  private RpcServerFactoryPBImpl() {
  }
  
  public Server getServer(Class<?> protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers)
      throws YarnException {
    return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
        null);
  }
  
  @Override
  public Server getServer(Class<?> protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
      String portRangeConfig)
      throws YarnException {
    
    Constructor<?> constructor = serviceCache.get(protocol);
    if (constructor == null) {
      Class<?> pbServiceImplClazz = null;
      try {
        pbServiceImplClazz = localConf
            .getClassByName(getPbServiceImplClassName(protocol));
      } catch (ClassNotFoundException e) {
        throw new YarnException("Failed to load class: ["
            + getPbServiceImplClassName(protocol) + "]", e);
      }
      try {
        constructor = pbServiceImplClazz.getConstructor(protocol);
        constructor.setAccessible(true);
        serviceCache.putIfAbsent(protocol, constructor);
      } catch (NoSuchMethodException e) {
        throw new YarnException("Could not find constructor with params: "
            + Long.TYPE + ", " + InetSocketAddress.class + ", "
            + Configuration.class, e);
      }
    }
    
    Object service = null;
    try {
      service = constructor.newInstance(instance);
    } catch (InvocationTargetException e) {
      throw new YarnException(e);
    } catch (IllegalAccessException e) {
      throw new YarnException(e);
    } catch (InstantiationException e) {
      throw new YarnException(e);
    }

    Class<?> pbProtocol = service.getClass().getInterfaces()[0];
    Method method = protoCache.get(protocol);
    if (method == null) {
      Class<?> protoClazz = null;
      try {
        protoClazz = localConf.getClassByName(getProtoClassName(protocol));
      } catch (ClassNotFoundException e) {
        throw new YarnException("Failed to load class: ["
            + getProtoClassName(protocol) + "]", e);
      }
      try {
        method = protoClazz.getMethod("newReflectiveBlockingService",
            pbProtocol.getInterfaces()[0]);
        method.setAccessible(true);
        protoCache.putIfAbsent(protocol, method);
      } catch (NoSuchMethodException e) {
        throw new YarnException(e);
      }
    }
    
    try {
      return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
          (BlockingService)method.invoke(null, service), portRangeConfig);
    } catch (InvocationTargetException e) {
      throw new YarnException(e);
    } catch (IllegalAccessException e) {
      throw new YarnException(e);
    } catch (IOException e) {
      throw new YarnException(e);
    }
  }
  
  private String getProtoClassName(Class<?> clazz) {
    String srcClassName = getClassName(clazz);
    return PROTO_GEN_PACKAGE_NAME + "." + srcClassName + "$" + srcClassName + PROTO_GEN_CLASS_SUFFIX;  
  }
  
  private String getPbServiceImplClassName(Class<?> clazz) {
    String srcPackagePart = getPackageName(clazz);
    String srcClassName = getClassName(clazz);
    String destPackagePart = srcPackagePart + "." + PB_IMPL_PACKAGE_SUFFIX;
    String destClassPart = srcClassName + PB_IMPL_CLASS_SUFFIX;
    return destPackagePart + "." + destClassPart;
  }
  
  private String getClassName(Class<?> clazz) {
    String fqName = clazz.getName();
    return (fqName.substring(fqName.lastIndexOf(".") + 1, fqName.length()));
  }
  
  private String getPackageName(Class<?> clazz) {
    return clazz.getPackage().getName();
  }

  private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, 
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
      BlockingService blockingService, String portRangeConfig) throws IOException {
    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
    RPC.Server server = RPC.getServer(pbProtocol, blockingService, 
        addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
        secretManager, portRangeConfig);
    LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
    return server;
  }
}

Categories: Hadoop, Hadoop Yarn Tags:

Events in MapReduce V2

In the last post, we looked into the Yarn event structure. Today, we will look at different events introduced in MapReduce V2. These events are defined in the org.apache.hadoop.mapreduce.v2.app.job.event package.

AllEventTypes

Image

You will find different event types defined in the JobEventType, TaskEventType, TaskAttemptEventType enum classes.


/**
 * Event types handled by Job.
 */
public enum JobEventType {

  //Producer:Client
  JOB_KILL,

  //Producer:MRAppMaster
  JOB_INIT,
  JOB_START,

  //Producer:Task
  JOB_TASK_COMPLETED,
  JOB_MAP_TASK_RESCHEDULED,
  JOB_TASK_ATTEMPT_COMPLETED,

  //Producer:CommitterEventHandler
  JOB_SETUP_COMPLETED,
  JOB_SETUP_FAILED,
  JOB_COMMIT_COMPLETED,
  JOB_COMMIT_FAILED,
  JOB_ABORT_COMPLETED,

  //Producer:Job
  JOB_COMPLETED,

  //Producer:Any component
  JOB_DIAGNOSTIC_UPDATE,
  INTERNAL_ERROR,
  JOB_COUNTER_UPDATE,

  //Producer:TaskAttemptListener
  JOB_TASK_ATTEMPT_FETCH_FAILURE,

  //Producer:RMContainerAllocator
  JOB_UPDATED_NODES

}

Hadoop YARN Event design

In Hadoop YARN, different events are used. They all extend the abstract class AbstractEvent which implements the Event interface. There is also an interface for EventHandler. You can build the logic of handling a specific event by implementing the EventHandler interface.

Image 

Hadoop YARN Event Bus

Hadoop YARN uses an event bus to centralize the dispatching of various events and handling of events by different declared event handlers. This means less plumbing codes to write and maintain. Also, it decouples the event handling logics from different interacting components into one place.

AsyncDispatcher is such event bus. It uses a linkedBlockingQueue to queue up incoming events. Also, it has an internal thread, called eventHandlingThread to listen on the above queue and take event out for processing by calling the dispatch(event) method. In this method, it basically retrieves the corresponding event handler to handle the event accordingly.

AsyncDispatcherUML

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.event;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;

/**
 * Dispatches events in a separate thread. Currently only single thread does
 * that. Potentially there could be multiple channels for each event type
 * class and a thread pool can be used to dispatch the events.
 */
@SuppressWarnings("rawtypes")
public class AsyncDispatcher extends AbstractService implements Dispatcher {

  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);

  private final BlockingQueue eventQueue;
  private volatile boolean stopped = false;

  private Thread eventHandlingThread;
  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
  private boolean exitOnDispatchException;

  public AsyncDispatcher() {
    this(new LinkedBlockingQueue());
  }

  public AsyncDispatcher(BlockingQueue eventQueue) {
    super("Dispatcher");
    this.eventQueue = eventQueue;
    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
  }

  Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);
          }
        }
      }
    };
  }

  @Override
  public synchronized void init(Configuration conf) {
    this.exitOnDispatchException =
        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
    super.init(conf);
  }

  @Override
  public void start() {
    //start all the components
    super.start();
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName("AsyncDispatcher event handler");
    eventHandlingThread.start();
  }

  @Override
  public void stop() {
    stopped = true;
    if (eventHandlingThread != null) {
      eventHandlingThread.interrupt();
      try {
        eventHandlingThread.join();
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted Exception while stopping", ie);
      }
    }

    // stop all the components
    super.stop();
  }

  @SuppressWarnings("unchecked")
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    }
    catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
  }

  @SuppressWarnings("unchecked")
  @Override
  public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler registeredHandler = (EventHandler)
    eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

  @Override
  public EventHandler getEventHandler() {
    return new GenericEventHandler();
  }

  class GenericEventHandler implements EventHandler {
    public void handle(Event event) {
      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize !=0 && qSize %1000 == 0) {
        LOG.info("Size of event-queue is " + qSize);
      }
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      }
      try {
        eventQueue.put(event);
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        }
        throw new YarnException(e);
      }
    };
  }

  /**
   * Multiplexing an event. Sending it to different handlers that
   * are interested in the event.
   * @param  the type of event these multiple handlers are interested in.
   */
  static class MultiListenerHandler implements EventHandler {
    List<EventHandler> listofHandlers;

    public MultiListenerHandler() {
      listofHandlers = new ArrayList<EventHandler>();
    }

    @Override
    public void handle(Event event) {
      for (EventHandler handler: listofHandlers) {
        handler.handle(event);
      }
    }

    void addHandler(EventHandler handler) {
      listofHandlers.add(handler);
    }

  }
}

Hadoop YARN RPC (part I)

I have spent some time digging into YARN RPC source codes. Personally, I like the use of Factory pattern to inject different RPC proxy client protocol and server implementations to the framework. It looks way cleaner and better compared to the older versions of Hadoop.

For example: RpcServerFactoryPBImpl is the implementation of RpcServerFactory interface to create Protobuf RPC Server. Basically, it delegates the creation using the standard Hadoop RPC class.

Look at the following snippet in RpcServerFactoryPBImpl,

private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, 
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
      BlockingService blockingService, String portRangeConfig) throws IOException {
    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
    RPC.Server server = RPC.getServer(pbProtocol, blockingService, 
        addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
        secretManager, portRangeConfig);
    LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
    return server;
  }

We could create a new type of RPC Server Factory called RpcServerFactoryMyOwnImpl that implements the above createServer method which would return our own RPC Server.

In YARN framework, HadoopYarnProtoRPC is the class uses these factories.
Basically, it calls RpcFactoryProvider.getServerFactory(conf) to get the right RpcServerFactory implementation.


public class HadoopYarnProtoRPC extends YarnRPC {

  private static final Log LOG = LogFactory.getLog(HadoopYarnProtoRPC.class);

  @Override
  public Object getProxy(Class protocol, InetSocketAddress addr,
      Configuration conf) {
    LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
    return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1,
        addr, conf);
  }

  @Override
  public void stopProxy(Object proxy, Configuration conf) {
    RpcFactoryProvider.getClientFactory(conf).stopClient(proxy);
  }

  @Override
  public Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers, String portRangeConfig) {
    LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol + 
        " with " + numHandlers + " handlers");
    
    return RpcFactoryProvider.getServerFactory(conf).getServer(protocol, 
        instance, addr, conf, secretManager, numHandlers, portRangeConfig);

  }

}

To be continued…

Categories: Hadoop, Hadoop Yarn Tags: ,