g2.8xlarge does not support NVIDIA GPUDirect peer to peer transfers

Go to


run make and then execute


[./simpleP2P] – Starting…
Checking for multiple GPUs…
CUDA-capable device count: 4
> GPU0 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)
> GPU1 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)
> GPU2 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)
> GPU3 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)

Checking GPU(s) for support of peer to peer memory access…
> Peer access from GRID K520 (GPU0) -> GRID K520 (GPU1) : No
> Peer access from GRID K520 (GPU0) -> GRID K520 (GPU2) : No
> Peer access from GRID K520 (GPU0) -> GRID K520 (GPU3) : No
> Peer access from GRID K520 (GPU1) -> GRID K520 (GPU0) : No
> Peer access from GRID K520 (GPU1) -> GRID K520 (GPU2) : No
> Peer access from GRID K520 (GPU1) -> GRID K520 (GPU3) : No
> Peer access from GRID K520 (GPU2) -> GRID K520 (GPU0) : No
> Peer access from GRID K520 (GPU2) -> GRID K520 (GPU1) : No
> Peer access from GRID K520 (GPU2) -> GRID K520 (GPU3) : No
> Peer access from GRID K520 (GPU3) -> GRID K520 (GPU0) : No
> Peer access from GRID K520 (GPU3) -> GRID K520 (GPU1) : No
> Peer access from GRID K520 (GPU3) -> GRID K520 (GPU2) : No
Two or more GPUs with SM 2.0 or higher capability are required for ./simpleP2P.
Peer to Peer access is not available amongst GPUs in the system, waiving test.

As you can see, g2.8xlarge does not support NVIDIA GPUDirect peer to peer transfers. Only P2 instances have the support.

Categories: Uncategorized

Sqoop Mysql Data Import

In Sqoop, We can import data from MySQL and write to HDFS. It basically utilizes Mapper that opens up a pipe to mysqldump and pulls data directly. If you look at the following MySQLDumpMapper, it creates CopyingAsyncSink to process the MySQLDump stream via the reader thread, CopyingStreamThread. The thread reads data from mysqldump and cleans up any extra header/characters before writing to HDFS.

On the other hand, to export data from HDFS back to MySQL, it uses MySQLExportMapper that starts a mysqlimport process and uses that to export rows from HDFS to a MySQL database at high speed. Basically, it creates FIFO, a named pipe (using mknod –mode=0) to interface with mysqlimport behind the scene. A BufferedOutputStream is used to write bytes straight to mysqlimport process via the above named pipe, which is basically can be thought as a file.

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.sqoop.mapreduce;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.util.AsyncSink;
import org.apache.sqoop.util.JdbcUrl;
import org.apache.sqoop.util.PerfCounters;
import com.cloudera.sqoop.lib.DelimiterSet;
import com.cloudera.sqoop.lib.FieldFormatter;
import com.cloudera.sqoop.lib.RecordParser;
import com.cloudera.sqoop.manager.MySQLUtils;
import com.cloudera.sqoop.util.ErrorableAsyncSink;
import com.cloudera.sqoop.util.ErrorableThread;
import com.cloudera.sqoop.util.LoggingAsyncSink;

 * Mapper that opens up a pipe to mysqldump and pulls data directly.
public class MySQLDumpMapper
    extends SqoopMapper {

  public static final Log LOG = LogFactory.getLog(

  private Configuration conf;

  // AsyncSinks used to import data from mysqldump directly into HDFS.

   * Copies data directly from mysqldump into HDFS, after stripping some
   * header and footer characters that are attached to each line in mysqldump.
  public static class CopyingAsyncSink extends ErrorableAsyncSink {
    private final MySQLDumpMapper.Context context;
    private final PerfCounters counters;

    protected CopyingAsyncSink(final MySQLDumpMapper.Context context,
        final PerfCounters ctrs) {
      this.context = context;
      this.counters = ctrs;

    public void processStream(InputStream is) {
      child = new CopyingStreamThread(is, context, counters);

    private static class CopyingStreamThread extends ErrorableThread {
      public static final Log LOG = LogFactory.getLog(

      private final MySQLDumpMapper.Context context;
      private final InputStream stream;
      private final PerfCounters counters;

      CopyingStreamThread(final InputStream is,
          final Context c, final PerfCounters ctrs) {
        this.context = c;
        this.stream = is;
        this.counters = ctrs;

      public void run() {
        BufferedReader r = null;

        try {
          r = new BufferedReader(new InputStreamReader(this.stream));

          // Actually do the read/write transfer loop here.
          int preambleLen = -1; // set to this for "undefined"
          while (true) {
            String inLine = r.readLine();
            if (null == inLine) {
              break; // EOF.

            if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
              continue; // comments and empty lines are ignored

            // this line is of the form "INSERT .. VALUES ( actual value text
            // );" strip the leading preamble up to the '(' and the trailing
            // ');'.
            if (preambleLen == -1) {
              // we haven't determined how long the preamble is. It's constant
              // across all lines, so just figure this out once.
              String recordStartMark = "VALUES (";
              preambleLen = inLine.indexOf(recordStartMark)
                  + recordStartMark.length();

            // chop off the leading and trailing text as we write the
            // output to HDFS.
            int len = inLine.length() - 2 - preambleLen;
            context.write(inLine.substring(preambleLen, inLine.length() - 2)
                + "\n", null);
            counters.addBytes(1 + len);
        } catch (IOException ioe) {
          LOG.error("IOException reading from mysqldump: " + ioe.toString());
          // flag this error so we get an error status back in the caller.
        } catch (InterruptedException ie) {
          LOG.error("InterruptedException reading from mysqldump: "
              + ie.toString());
          // flag this error so we get an error status back in the caller.
        } finally {
          if (null != r) {
            try {
            } catch (IOException ioe) {
              LOG.info("Error closing FIFO stream: " + ioe.toString());

   * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
   * output, and re-emit the text in the user's specified output format.
  public static class ReparsingAsyncSink extends ErrorableAsyncSink {
    private final MySQLDumpMapper.Context context;
    private final Configuration conf;
    private final PerfCounters counters;

    protected ReparsingAsyncSink(final MySQLDumpMapper.Context c,
        final Configuration conf, final PerfCounters ctrs) {
      this.context = c;
      this.conf = conf;
      this.counters = ctrs;

    public void processStream(InputStream is) {
      child = new ReparsingStreamThread(is, context, conf, counters);

    private static class ReparsingStreamThread extends ErrorableThread {
      public static final Log LOG = LogFactory.getLog(

      private final MySQLDumpMapper.Context context;
      private final Configuration conf;
      private final InputStream stream;
      private final PerfCounters counters;

      ReparsingStreamThread(final InputStream is,
          final MySQLDumpMapper.Context c, Configuration conf,
          final PerfCounters ctrs) {
        this.context = c;
        this.conf = conf;
        this.stream = is;
        this.counters = ctrs;

      private static final char MYSQL_FIELD_DELIM = ',';
      private static final char MYSQL_RECORD_DELIM = '\n';
      private static final char MYSQL_ENCLOSE_CHAR = '\'';
      private static final char MYSQL_ESCAPE_CHAR = '\\';
      private static final boolean MYSQL_ENCLOSE_REQUIRED = false;

      private static final RecordParser MYSQLDUMP_PARSER;

      static {
        // build a record parser for mysqldump's format
        MYSQLDUMP_PARSER = new RecordParser(DelimiterSet.MYSQL_DELIMITERS);

      public void run() {
        BufferedReader r = null;

        try {
          r = new BufferedReader(new InputStreamReader(this.stream));

          // Configure the output with the user's delimiters.
          char outputFieldDelim = (char) conf.getInt(
          String outputFieldDelimStr = "" + outputFieldDelim;
          char outputRecordDelim = (char) conf.getInt(
          String outputRecordDelimStr = "" + outputRecordDelim;
          char outputEnclose = (char) conf.getInt(
          char outputEscape = (char) conf.getInt(
          boolean outputEncloseRequired = conf.getBoolean(
              MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);

          DelimiterSet delimiters = new DelimiterSet(

          // Actually do the read/write transfer loop here.
          int preambleLen = -1; // set to this for "undefined"
          while (true) {
            String inLine = r.readLine();
            if (null == inLine) {
              break; // EOF.

            if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
              continue; // comments and empty lines are ignored

            // this line is of the form "INSERT .. VALUES ( actual value text
            // );" strip the leading preamble up to the '(' and the trailing
            // ');'.
            if (preambleLen == -1) {
              // we haven't determined how long the preamble is. It's constant
              // across all lines, so just figure this out once.
              String recordStartMark = "VALUES (";
              preambleLen = inLine.indexOf(recordStartMark)
                  + recordStartMark.length();

            // Wrap the input string in a char buffer that ignores the leading
            // and trailing text.
            CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen,
                inLine.length() - 2);

            // Pass this along to the parser
            List fields = null;
            try {
              fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
            } catch (RecordParser.ParseError pe) {
              LOG.warn("ParseError reading from mysqldump: "
                  + pe.toString() + "; record skipped");
              continue; // Skip emitting this row.

            // For all of the output fields, emit them using the delimiters
            // the user chooses.
            boolean first = true;
            StringBuilder sb = new StringBuilder();
            int recordLen = 1; // for the delimiter.
            for (String field : fields) {
              if (!first) {
              } else {
                first = false;

              String fieldStr = FieldFormatter.escapeAndEnclose(field,
              recordLen += fieldStr.length();

            context.write(sb.toString(), null);
        } catch (IOException ioe) {
          LOG.error("IOException reading from mysqldump: " + ioe.toString());
          // flag this error so the parent can handle it appropriately.
        } catch (InterruptedException ie) {
          LOG.error("InterruptedException reading from mysqldump: "
              + ie.toString());
          // flag this error so we get an error status back in the caller.
        } finally {
          if (null != r) {
            try {
            } catch (IOException ioe) {
              LOG.info("Error closing FIFO stream: " + ioe.toString());

  // TODO(aaron): Refactor this method to be much shorter.
   * Import the table into HDFS by using mysqldump to pull out the data from
   * the database and upload the files directly to HDFS.
  public void map(String splitConditions, NullWritable val, Context context)
      throws IOException, InterruptedException {

    LOG.info("Beginning mysqldump fast path import");

    ArrayList args = new ArrayList();
    String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY);

    // We need to parse the connect string URI to determine the database name.
    // Using java.net.URL directly on the connect string will fail because
    // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
    // scheme (everything before '://') and replace it with 'http', which we
    // know will work.
    String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
    String databaseName = JdbcUrl.getDatabaseName(connectString);
    String hostname = JdbcUrl.getHostName(connectString);
    int port = JdbcUrl.getPort(connectString);

    if (null == databaseName) {
      throw new IOException("Could not determine database name");

    LOG.info("Performing import of table " + tableName + " from database "
        + databaseName);

    args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path.

    String password = conf.get(MySQLUtils.PASSWORD_KEY);
    String passwordFile = null;

    Process p = null;
    AsyncSink sink = null;
    AsyncSink errSink = null;
    PerfCounters counters = new PerfCounters();
    try {
      // --defaults-file must be the first argument.
      if (null != password && password.length() > 0) {
        passwordFile = MySQLUtils.writePasswordFile(conf);
        args.add("--defaults-file=" + passwordFile);

      // Don't use the --where="" version because spaces in it can
      // confuse Java, and adding in surrounding quotes confuses Java as well.
      String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)")
          + " AND (" + splitConditions + ")";

      args.add("--host=" + hostname);
      if (-1 != port) {
        args.add("--port=" + Integer.toString(port));
      args.add("--quick"); // no buffering

      String username = conf.get(MySQLUtils.USERNAME_KEY);
      if (null != username) {
        args.add("--user=" + username);

      // If the user supplied extra args, add them here.
      String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY);
      if (null != extra) {
        for (String arg : extra) {


      // begin the import in an external process.
      LOG.debug("Starting mysqldump with arguments:");
      for (String arg : args) {
        LOG.debug("  " + arg);

      // Actually start the mysqldump.
      p = Runtime.getRuntime().exec(args.toArray(new String[0]));

      // read from the stdout pipe into the HDFS writer.
      InputStream is = p.getInputStream();

      if (MySQLUtils.outputDelimsAreMySQL(conf)) {
        LOG.debug("Output delimiters conform to mysqldump; "
            + "using straight copy");
        sink = new CopyingAsyncSink(context, counters);
      } else {
        LOG.debug("User-specified delimiters; using reparsing import");
        LOG.info("Converting data to use specified delimiters.");
        LOG.info("(For the fastest possible import, use");
        LOG.info("--mysql-delimiters to specify the same field");
        LOG.info("delimiters as are used by mysqldump.)");
        sink = new ReparsingAsyncSink(context, conf, counters);

      // Start an async thread to read and upload the whole stream.

      // Start an async thread to send stderr to log4j.
      errSink = new LoggingAsyncSink(LOG);
    } finally {

      // block until the process is done.
      int result = 0;
      if (null != p) {
        while (true) {
          try {
            result = p.waitFor();
          } catch (InterruptedException ie) {
            // interrupted; loop around.


      // Remove the password file.
      if (null != passwordFile) {
        if (!new File(passwordFile).delete()) {
          LOG.error("Could not remove mysql password file " + passwordFile);
          LOG.error("You should remove this file to protect your credentials.");

      // block until the stream sink is done too.
      int streamResult = 0;
      if (null != sink) {
        while (true) {
          try {
            streamResult = sink.join();
          } catch (InterruptedException ie) {
            // interrupted; loop around.


      // Try to wait for stderr to finish, but regard any errors as advisory.
      if (null != errSink) {
        try {
          if (0 != errSink.join()) {
            LOG.info("Encountered exception reading stderr stream");
        } catch (InterruptedException ie) {
          LOG.info("Thread interrupted waiting for stderr to complete: "
              + ie.toString());

      LOG.info("Transfer loop complete.");

      if (0 != result) {
        throw new IOException("mysqldump terminated with status "
            + Integer.toString(result));

      if (0 != streamResult) {
        throw new IOException("Encountered exception in stream sink");

      LOG.info("Transferred " + counters.toString());

  protected void setup(Context context)
    throws IOException, InterruptedException {
    this.conf = context.getConfiguration();

Categories: Sqoop Tags: ,

A look into Oozie internals

November 24, 2013 Leave a comment

I have been looking into the Oozie code base trying to understand its underlying architecture.
Here are some of my findings:

There are different Services in the Oozie web app. Among them are ActionService, AuthorizationService, CoordinatorStoreService, DagEngineService, SchemaService, UserGroupInformationService, etc. They are in the org.apache.oozie.service package. These services implemented the Service interface. See the Service interface below.

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *      http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.oozie.service;

 * A service is component managed by the {@link Services} singleton.
public interface Service {
    public static final String DEFAULT_LOCK_TIMEOUT = "oozie.service.default.lock.timeout";

     * Prefix for all services configuration properties.
    public static final String CONF_PREFIX = "oozie.service.";

     * Constant for XCommand
    public static final String USE_XCOMMAND = "oozie.useXCommand";

     * Initialize the service. <p/> Invoked by the {@link Service} singleton at start up time.
     * @param services services singleton initializing the service.
     * @throws ServiceException thrown if the service could not initialize.
    public void init(Services services) throws ServiceException;

     * Destroy the service. <p/> Invoked by the {@link Service} singleton at shutdown time.
    public void destroy();

     * Return the public interface of the service. <p/> Services are retrieved by its public interface. Specializations
     * of services must return the public interface.
     * @return the interface of the service.
    public Class<? extends Service> getInterface();

     * Lock timeout value if service is only allowed to have one single running instance.
    public static long lockTimeout = Services.get().getConf().getLong(DEFAULT_LOCK_TIMEOUT, 5 * 1000);


The way Oozie makes use of these services is mostly through the app.getService call.
Incoming requests are handled by different servlets which would invoke different services to get thing done.

Also, there are different executors. Each extends the abstract ActionExecutor class. Among them are EmailActionExecutor, HiveActionExecutor.

There is another kind of Executor, JPAExecutor which implements a different interface.
They are used in JPAService. JPAService is the basic JPA service which provides database execution service, eg. execute the select query, update query, or JPAExecutor operation. OpenJPA is used in the ORM layer.

The standard ORM entity beans are used to represent DB entity object with named queries.
For example, WorkflowJobBean as shown below.

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *      http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.oozie;

import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.JsonUtils;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.IOException;
import java.io.DataOutput;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import javax.persistence.Entity;
import javax.persistence.Column;
import javax.persistence.Id;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Basic;
import javax.persistence.Lob;
import javax.persistence.Table;
import javax.persistence.Transient;

import java.sql.Timestamp;

import org.apache.openjpa.persistence.jdbc.Index;
import org.apache.openjpa.persistence.jdbc.Strategy;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;



    @NamedQuery(name = "UPDATE_WORKFLOW", query = "update WorkflowJobBean w set w.appName = :appName, w.appPath = :appPath, w.conf = :conf, w.group = :groupName, w.run = :run, w.user = :user, w.createdTimestamp = :createdTime, w.endTimestamp = :endTime, w.externalId = :externalId, w.lastModifiedTimestamp = :lastModTime,w.logToken = :logToken, w.protoActionConf = :protoActionConf, w.slaXml =:slaXml, w.startTimestamp = :startTime, w.statusStr = :status, w.wfInstance = :wfInstance where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_MODTIME", query = "update WorkflowJobBean w set w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_MODTIME", query = "update WorkflowJobBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_PARENT_MODIFIED", query = "update WorkflowJobBean w set w.parentId = :parentId, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime, w.endTimestamp = :endTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END", query = "update WorkflowJobBean w set w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime, w.startTimestamp = :startTime, w.endTimestamp = :endTime where w.id = :id"),

    @NamedQuery(name = "UPDATE_WORKFLOW_RERUN", query = "update WorkflowJobBean w set w.appName = :appName, w.protoActionConf = :protoActionConf, w.appPath = :appPath, w.conf = :conf, w.logToken = :logToken, w.user = :user, w.group = :group, w.externalId = :externalId, w.endTimestamp = :endTime, w.run = :run, w.statusStr = :status, w.wfInstance = :wfInstance, w.lastModifiedTimestamp = :lastModTime where w.id = :id"),

    @NamedQuery(name = "DELETE_WORKFLOW", query = "delete from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOWS", query = "select OBJECT(w) from WorkflowJobBean w order by w.startTimestamp desc"),

    @NamedQuery(name = "GET_WORKFLOWS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp, w.externalId from WorkflowJobBean w order by w.createdTimestamp desc"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT", query = "select count(w) from WorkflowJobBean w"),

    @NamedQuery(name = "GET_COMPLETED_WORKFLOWS_OLDER_THAN", query = "select w from WorkflowJobBean w where w.endTimestamp < :endTime"),

    @NamedQuery(name = "GET_COMPLETED_WORKFLOWS_WITH_NO_PARENT_OLDER_THAN", query = "select w.id from WorkflowJobBean w where w.endTimestamp < :endTime and w.parentId is null"),

    @NamedQuery(name = "GET_WORKFLOW", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_STARTTIME", query = "select w.id, w.startTimestamp from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_USER_GROUP", query = "select w.user, w.group from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_SUSPEND", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance  from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_RERUN", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.run, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_DEFINITION", query = "select w.id, w.user, w.group, w.appName, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_ACTION_OP", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_KILL", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.slaXml from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_RESUME", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_FOR_UPDATE", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_FOR_SLA", query = "select w.id, w.statusStr, w.startTimestamp, w.endTimestamp from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_ID_FOR_EXTERNAL_ID", query = "select  w.id from WorkflowJobBean w where w.externalId = :externalId"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS", query = "select count(w) from WorkflowJobBean w where w.statusStr = :status"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS", query = "select count(w) from WorkflowJobBean w where w.statusStr = :status and w.lastModifiedTimestamp > :lastModTime"),

    @NamedQuery(name = "GET_WORKFLOWS_WITH_WORKFLOW_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId = :parentId"),

    @NamedQuery(name = "GET_WORKFLOWS_WITH_COORD_PARENT_ID", query = "select w.id from WorkflowJobBean w where w.parentId like :parentId"), // when setting parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%")

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_WORKFLOW_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId = :parentId and (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' OR w.endTimestamp >= :endTime)"),

    @NamedQuery(name = "GET_WORKFLOWS_COUNT_WITH_COORD_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from WorkflowJobBean w where w.parentId like :parentId and (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'SUSPENDED' OR w.endTimestamp >= :endTime)"), // when setting parentId parameter, make sure to append a '%' (percent symbol) at the end (e.g. 0000004-130709155224435-oozie-rkan-C%")

    @NamedQuery(name = "GET_WORKFLOW_FOR_USER", query = "select w.user from WorkflowJobBean w where w.id = :id"),

    @NamedQuery(name = "GET_WORKFLOW_STATUS", query = "select w.statusStr from WorkflowJobBean w where w.id = :id")
@Table(name = "WF_JOBS")
public class WorkflowJobBean implements Writable, WorkflowJob, JsonBean {

    private String id;

    @Column(name = "proto_action_conf")
    private StringBlob protoActionConf;

    @Column(name = "log_token")
    private String logToken = null;

    @Column(name = "external_id")
    private String externalId = null;

    @Column(name = "status")
    private String statusStr = WorkflowJob.Status.PREP.toString();

    @Column(name = "created_time")
    private java.sql.Timestamp createdTimestamp = null;

    @Column(name = "start_time")
    private java.sql.Timestamp startTimestamp = null;

    @Column(name = "end_time")
    private java.sql.Timestamp endTimestamp = null;

    @Column(name = "last_modified_time")
    private java.sql.Timestamp lastModifiedTimestamp = null;

    @Column(name = "wf_instance")
    private BinaryBlob wfInstance ;

    @Column(name = "sla_xml")
    private StringBlob slaXml;

    @Column(name = "app_name")
    private String appName = null;

    @Column(name = "app_path")
    private String appPath = null;

    @Column(name = "conf")
    private StringBlob conf;

    @Column(name = "user_name")
    private String user = null;

    @Column(name = "group_name")
    private String group;

    @Column(name = "run")
    private int run = 1;

    @Column(name = "parent_id")
    private String parentId;

    private String consoleUrl;

    private List<WorkflowActionBean> actions;

     * Default constructor.
    public WorkflowJobBean() {
        actions = new ArrayList<WorkflowActionBean>();

     * Serialize the workflow bean to a data output.
     * @param dataOutput data output.
     * @throws IOException thrown if the workflow bean could not be serialized.
    public void write(DataOutput dataOutput) throws IOException {
        WritableUtils.writeStr(dataOutput, getAppPath());
        WritableUtils.writeStr(dataOutput, getAppName());
        WritableUtils.writeStr(dataOutput, getId());
        WritableUtils.writeStr(dataOutput, getParentId());
        WritableUtils.writeStr(dataOutput, getConf());
        WritableUtils.writeStr(dataOutput, getStatusStr());
        dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
        dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
        dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
        dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
        WritableUtils.writeStr(dataOutput, getUser());
        WritableUtils.writeStr(dataOutput, getGroup());
        WritableUtils.writeStr(dataOutput, logToken);
        WritableUtils.writeStr(dataOutput, getProtoActionConf());

     * Deserialize a workflow bean from a data input.
     * @param dataInput data input.
     * @throws IOException thrown if the workflow bean could not be deserialized.
    public void readFields(DataInput dataInput) throws IOException {
        // setStatus(WritableUtils.readStr(dataInput));
        long d = dataInput.readLong();
        if (d != -1) {
            setCreatedTime(new Date(d));
        d = dataInput.readLong();
        if (d != -1) {
        setStartTime(new Date(d));
        d = dataInput.readLong();
        if (d != -1) {
            setLastModifiedTime(new Date(d));
        d = dataInput.readLong();
        if (d != -1) {
            setEndTime(new Date(d));
        logToken = WritableUtils.readStr(dataInput);

    public boolean inTerminalState() {
        boolean inTerminalState = false;
        switch (WorkflowJob.Status.valueOf(statusStr)) {
            case FAILED:
            case KILLED:
            case SUCCEEDED:
                inTerminalState = true;
        return inTerminalState;

    public String getLogToken() {
        return logToken;

    public void setLogToken(String logToken) {
        this.logToken = logToken;

    public String getSlaXml() {
        return slaXml == null ? null : slaXml.getString();

    public void setSlaXml(String slaXml) {
        if (this.slaXml == null) {
            this.slaXml = new StringBlob(slaXml);
        else {

    public void setSlaXmlBlob(StringBlob slaXml) {
        this.slaXml = slaXml;

    public StringBlob getSlaXmlBlob() {
        return this.slaXml;

    public WorkflowInstance getWorkflowInstance() {
        return wfInstance == null ? null : get(wfInstance.getBytes());

    public BinaryBlob getWfInstanceBlob() {
        return this.wfInstance;

    public void setWorkflowInstance(WorkflowInstance workflowInstance) {
        if (this.wfInstance == null) {
            this.wfInstance = new BinaryBlob(WritableUtils.toByteArray((LiteWorkflowInstance) workflowInstance), true);
        else {
            this.wfInstance.setBytes(WritableUtils.toByteArray((LiteWorkflowInstance) workflowInstance));

    public void setWfInstanceBlob(BinaryBlob wfInstance) {
        this.wfInstance = wfInstance;

    public String getProtoActionConf() {
        return protoActionConf == null ? null : protoActionConf.getString();

    public void setProtoActionConf(String protoActionConf) {
        if (this.protoActionConf == null) {
            this.protoActionConf = new StringBlob(protoActionConf);
        else {

    public void setProtoActionConfBlob (StringBlob protoBytes) {
        this.protoActionConf = protoBytes;

    public StringBlob getProtoActionConfBlob() {
        return this.protoActionConf;

    public String getlogToken() {
        return logToken;

    public Timestamp getLastModifiedTimestamp() {
        return lastModifiedTimestamp;

    public Timestamp getStartTimestamp() {
        return startTimestamp;

    public Timestamp getCreatedTimestamp() {
        return createdTimestamp;

    public Timestamp getEndTimestamp() {
        return endTimestamp;

    public void setStatusStr (String statusStr) {
        this.statusStr = statusStr;

    public void setStatus(Status val) {
        this.statusStr = val.toString();

    public Status getStatus() {
        return Status.valueOf(statusStr);

    public String getStatusStr() {
        return statusStr;

    public void setExternalId(String externalId) {
        this.externalId = externalId;

    public String getExternalId() {
        return externalId;

    public void setLastModifiedTime(Date lastModifiedTime) {
        this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);

    public Date getLastModifiedTime() {
        return DateUtils.toDate(lastModifiedTimestamp);

    public Date getCreatedTime() {
        return DateUtils.toDate(createdTimestamp);

    public void setCreatedTime(Date createdTime) {
        this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);

    public Date getStartTime() {
        return DateUtils.toDate(startTimestamp);

    public void setStartTime(Date startTime) {
        this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);

    public Date getEndTime() {
        return DateUtils.toDate(endTimestamp);

    public void setEndTime(Date endTime) {
        this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);

    private WorkflowInstance get(byte[] array) {
        LiteWorkflowInstance pInstance = WritableUtils.fromByteArray(array, LiteWorkflowInstance.class);
        return pInstance;

    public JSONObject toJSONObject() {
        return toJSONObject("GMT");

    public JSONObject toJSONObject(String timeZoneId) {
        JSONObject json = new JSONObject();
        json.put(JsonTags.WORKFLOW_APP_PATH, getAppPath());
        json.put(JsonTags.WORKFLOW_APP_NAME, getAppName());
        json.put(JsonTags.WORKFLOW_ID, getId());
        json.put(JsonTags.WORKFLOW_EXTERNAL_ID, getExternalId());
        json.put(JsonTags.WORKFLOW_PARENT_ID, getParentId());
        json.put(JsonTags.WORKFLOW_CONF, getConf());
        json.put(JsonTags.WORKFLOW_STATUS, getStatus().toString());
        json.put(JsonTags.WORKFLOW_LAST_MOD_TIME, JsonUtils.formatDateRfc822(getLastModifiedTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_CREATED_TIME, JsonUtils.formatDateRfc822(getCreatedTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_START_TIME, JsonUtils.formatDateRfc822(getStartTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_END_TIME, JsonUtils.formatDateRfc822(getEndTime(), timeZoneId));
        json.put(JsonTags.WORKFLOW_USER, getUser());
        json.put(JsonTags.WORKFLOW_GROUP, getGroup());
        json.put(JsonTags.WORKFLOW_ACL, getAcl());
        json.put(JsonTags.WORKFLOW_RUN, (long) getRun());
        json.put(JsonTags.WORKFLOW_CONSOLE_URL, getConsoleUrl());
        json.put(JsonTags.WORKFLOW_ACTIONS, WorkflowActionBean.toJSONArray(actions, timeZoneId));
        json.put(JsonTags.TO_STRING, toString());
        return json;

    public String getAppPath() {
        return appPath;

    public void setAppPath(String appPath) {
        this.appPath = appPath;

    public String getAppName() {
        return appName;

    public void setAppName(String appName) {
        this.appName = appName;

    public String getId() {
        return id;

    public void setId(String id) {
        this.id = id;

    public String getConf() {
        return conf == null ? null : conf.getString();

    public void setConf(String conf) {
        if (this.conf == null) {
            this.conf = new StringBlob(conf);
        else {

    public void setConfBlob(StringBlob conf) {
        this.conf = conf;

    public StringBlob getConfBlob() {
        return this.conf;

    public String getUser() {
        return user;

    public void setUser(String user) {
        this.user = user;

    public String getGroup() {
        return group;

    public String getAcl() {
        return getGroup();

    public void setGroup(String group) {
        this.group = group;

    public int getRun() {
        return run;

    public void setRun(int run) {
        this.run = run;

     * Return the workflow job console URL.
     * @return the workflow job console URL.
    public String getConsoleUrl() {
        return consoleUrl;

     * Return the corresponding Action ID, if any.
     * @return the coordinator Action Id.
    public String getParentId() {
        return parentId;

     * Set coordinator action id
     * @param parentId : coordinator action id
    public void setParentId(String parentId) {
        this.parentId = parentId;

     * Set the workflow job console URL.
     * @param consoleUrl the workflow job console URL.
    public void setConsoleUrl(String consoleUrl) {
        this.consoleUrl = consoleUrl;

    public List<WorkflowAction> getActions() {
        return (List) actions;

    public void setActions(List<WorkflowActionBean> nodes) {
        this.actions = (nodes != null) ? nodes : new ArrayList<WorkflowActionBean>();

    public String toString() {
        return MessageFormat.format("Workflow id[{0}] status[{1}]", getId(), getStatus());

     * Convert a workflows list into a JSONArray.
     * @param workflows workflows list.
     * @param timeZoneId time zone to use for dates in the JSON array.
     * @return the corresponding JSON array.
    public static JSONArray toJSONArray(List<WorkflowJobBean> workflows, String timeZoneId) {
        JSONArray array = new JSONArray();
        if (workflows != null) {
            for (WorkflowJobBean node : workflows) {
        return array;


If you are interested in the relational table schema used in Oozie, you can take a look at the OozieSchema in org.apache.oozie.store package which encapsulates the schema in java code

Oozie has the following DB tables
Process Instance Table, WorkflowJob Table, Actions Table, Version Table

      // Process Instance Table
        PI_wfId(OozieTable.WF_PROCESS_INSTANCE, String.class, true, 100),
        PI_state(OozieTable.WF_PROCESS_INSTANCE, Blob.class, false),

        // WorkflowJob Table
        WF_id(OozieTable.WORKFLOWS, String.class, true, 100),
        WF_externalId(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appPath(OozieTable.WORKFLOWS, String.class, false, 255),
        WF_conf(OozieTable.WORKFLOWS, String.class, false),
        WF_protoActionConf(OozieTable.WORKFLOWS, String.class, false),
        WF_logToken(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_status(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_run(OozieTable.WORKFLOWS, Long.class, false),
        WF_lastModTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_createdTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_startTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_endTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_userName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_groupName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_authToken(OozieTable.WORKFLOWS, String.class, false),

        // Actions Table
        ACTIONS_id(OozieTable.ACTIONS, String.class, true, 100),
        ACTIONS_name(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_type(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_wfId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_conf(OozieTable.ACTIONS, String.class, false),
        ACTIONS_status(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_externalStatus(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorCode(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorMessage(OozieTable.ACTIONS, String.class, false),
        ACTIONS_transition(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_retries(OozieTable.ACTIONS, Long.class, false),
        ACTIONS_startTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_endTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_lastCheckTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_data(OozieTable.ACTIONS, String.class, false),
        ACTIONS_externalId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_trackerUri(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_consoleUrl(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_executionPath(OozieTable.ACTIONS, String.class, false, 255),
        ACTIONS_pending(OozieTable.ACTIONS, Boolean.class, false),
        ACTIONS_pendingAge(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_signalValue(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_logToken(OozieTable.ACTIONS, String.class, false, 100),

        // Version Table
        VER_versionNumber(OozieTable.VERSION, String.class, false, 255);

OozieSchema which defines the DB tables used in Oozie.

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *      http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.oozie.store;

import java.sql.Blob;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.oozie.util.db.Schema;
import org.apache.oozie.util.db.Schema.Column;
import org.apache.oozie.util.db.Schema.DBType;
import org.apache.oozie.util.db.Schema.Index;
import org.apache.oozie.util.db.Schema.Table;

public class OozieSchema {

    private static String oozieDbName;

    private static final String OOZIE_VERSION = "0.1";

    public static final Map<Table, List<Column>> TABLE_COLUMNS = new HashMap<Table, List<Column>>();

    static {
        for (Column column : OozieColumn.values()) {
            List<Column> tColumns = TABLE_COLUMNS.get(column.table());
            if (tColumns == null) {
                tColumns = new ArrayList<Column>();
                TABLE_COLUMNS.put(column.table(), tColumns);

    public static void setOozieDbName(String dbName) {
        oozieDbName = dbName;

    public static enum OozieTable implements Table {

        public String toString() {
            return oozieDbName + "." + name().toUpperCase();

    public static enum OozieIndex implements Index {


        final Column column;

        OozieIndex(Column column) {
            this.column = column;

        public Column column() {
            return column;

     public static enum OozieColumn implements Column {
        // Process Instance Table
        PI_wfId(OozieTable.WF_PROCESS_INSTANCE, String.class, true, 100),
        PI_state(OozieTable.WF_PROCESS_INSTANCE, Blob.class, false),

        // WorkflowJob Table
        WF_id(OozieTable.WORKFLOWS, String.class, true, 100),
        WF_externalId(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_appPath(OozieTable.WORKFLOWS, String.class, false, 255),
        WF_conf(OozieTable.WORKFLOWS, String.class, false),
        WF_protoActionConf(OozieTable.WORKFLOWS, String.class, false),
        WF_logToken(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_status(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_run(OozieTable.WORKFLOWS, Long.class, false),
        WF_lastModTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_createdTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_startTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_endTime(OozieTable.WORKFLOWS, Timestamp.class, false),
        WF_userName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_groupName(OozieTable.WORKFLOWS, String.class, false, 100),
        WF_authToken(OozieTable.WORKFLOWS, String.class, false),

        // Actions Table
        ACTIONS_id(OozieTable.ACTIONS, String.class, true, 100),
        ACTIONS_name(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_type(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_wfId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_conf(OozieTable.ACTIONS, String.class, false),
        ACTIONS_status(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_externalStatus(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorCode(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_errorMessage(OozieTable.ACTIONS, String.class, false),
        ACTIONS_transition(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_retries(OozieTable.ACTIONS, Long.class, false),
        ACTIONS_startTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_endTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_lastCheckTime(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_data(OozieTable.ACTIONS, String.class, false),
        ACTIONS_externalId(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_trackerUri(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_consoleUrl(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_executionPath(OozieTable.ACTIONS, String.class, false, 255),
        ACTIONS_pending(OozieTable.ACTIONS, Boolean.class, false),
        ACTIONS_pendingAge(OozieTable.ACTIONS, Timestamp.class, false),
        ACTIONS_signalValue(OozieTable.ACTIONS, String.class, false, 100),
        ACTIONS_logToken(OozieTable.ACTIONS, String.class, false, 100),

        // Version Table
        VER_versionNumber(OozieTable.VERSION, String.class, false, 255);

        final Table table;
        final Class<?> type;
        int length = -1;
        final boolean isPrimaryKey;

        OozieColumn(Table table, Class<?> type, boolean isPrimaryKey) {
            this(table, type, isPrimaryKey, -1);

        OozieColumn(Table table, Class<?> type, boolean isPrimaryKey, int length) {
            this.table = table;
            this.type = type;
            this.isPrimaryKey = isPrimaryKey;
            this.length = length;

        private String getName() {
            String tName = table.name();
            return tName + "." + columnName();

        public String columnName() {
            return name().split("_")[1].toLowerCase();

        public String toString() {
            return getName();

        public Table table() {
            return table;

        public Class<?> getType() {
            return type;

        public int getLength() {
            return length;

        public String asLabel() {
            return name().toUpperCase();

        public boolean isPrimaryKey() {
            return isPrimaryKey;

     * Generates the create table SQL Statement
     * @param table
     * @param dbType
     * @return SQL Statement to create the table
    public static String generateCreateTableScript(Table table, DBType dbType) {
        return Schema.generateCreateTableScript(table, dbType, TABLE_COLUMNS.get(table));

     * Gets the query that will be used to validate the connection
     * @param dbName
     * @return
    public static String getValidationQuery(String dbName) {
        return "select count(" + OozieColumn.VER_versionNumber.columnName() + ") from " + dbName + "."
                + OozieTable.VERSION.name().toUpperCase();

     * Generates the Insert statement to insert the OOZIE_VERSION to table
     * @param dbName
     * @return
    public static String generateInsertVersionScript(String dbName) {
        return "INSERT INTO " + dbName + "." + OozieTable.VERSION.name().toUpperCase() + "("
                + OozieColumn.VER_versionNumber.columnName() + ") VALUES(" + OOZIE_VERSION + ")";

     * Gets the Oozie Schema Version
     * @return
    public static String getOozieVersion() {
        return OOZIE_VERSION;

In Oozie, there are different DB stores, eg. Coord Store, Workflow Store, SLA Store
They are in the package org.apache.oozie.store

Store is the abstract class to separate Entities from Actual store implementation
It allows one to get the EntityManager, getConnection, perform transaction control and rollback

The DB implementations are
CoordinatorStore, DB Implementation of Coord Store
WorkflowStore, DB Implementation of Workflow Store

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *      http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.oozie.store;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import javax.persistence.EntityManager;
import javax.persistence.Query;

import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.WorkflowsInfo;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob.Status;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.SchemaService.SchemaName;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.openjpa.persistence.OpenJPAEntityManager;
import org.apache.openjpa.persistence.OpenJPAPersistence;
import org.apache.openjpa.persistence.OpenJPAQuery;
import org.apache.openjpa.persistence.jdbc.FetchDirection;
import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
import org.apache.openjpa.persistence.jdbc.ResultSetType;

 * DB Implementation of Workflow Store
public class WorkflowStore extends Store {
    private Connection conn;
    private EntityManager entityManager;
    private boolean selectForUpdate;
    private static final String INSTR_GROUP = "db";
    public static final int LOCK_TIMEOUT = 50000;
    private static final String seletStr = "Select w.id, w.appName, w.statusStr, w.run, w.user, w.group, w.createdTimestamp, "
            + "w.startTimestamp, w.lastModifiedTimestamp, w.endTimestamp from WorkflowJobBean w";
    private static final String countStr = "Select count(w) from WorkflowJobBean w";

    public WorkflowStore() {

    public WorkflowStore(Connection connection, boolean selectForUpdate) throws StoreException {
        conn = ParamChecker.notNull(connection, "conn");
        entityManager = getEntityManager();
        this.selectForUpdate = selectForUpdate;

    public WorkflowStore(Connection connection, Store store, boolean selectForUpdate) throws StoreException {
        conn = ParamChecker.notNull(connection, "conn");
        entityManager = getEntityManager();
        this.selectForUpdate = selectForUpdate;

    public WorkflowStore(boolean selectForUpdate) throws StoreException {
        entityManager = getEntityManager();
        javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
        OpenJPAEntityManager kem = OpenJPAPersistence.cast(entityManager);
        conn = (Connection) kem.getConnection();
        this.selectForUpdate = selectForUpdate;

    public WorkflowStore(Store store, boolean selectForUpdate) throws StoreException {
        entityManager = getEntityManager();
        this.selectForUpdate = selectForUpdate;

     * Create a Workflow and return a WorkflowJobBean. It also creates the process instance for the job.
     * @param workflow workflow bean
     * @throws StoreException

    public void insertWorkflow(final WorkflowJobBean workflow) throws StoreException {
        ParamChecker.notNull(workflow, "workflow");

        doOperation("insertWorkflow", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                return null;

     * Load the Workflow into a Bean and return it. Also load the Workflow Instance into the bean. And lock the Workflow
     * depending on the locking parameter.
     * @param id Workflow ID
     * @param locking true if Workflow is to be locked
     * @return WorkflowJobBean
     * @throws StoreException
    public WorkflowJobBean getWorkflow(final String id, final boolean locking) throws StoreException {
        ParamChecker.notEmpty(id, "WorkflowID");
        WorkflowJobBean wfBean = doOperation("getWorkflow", new Callable<WorkflowJobBean>() {
            public WorkflowJobBean call() throws SQLException, StoreException, WorkflowException, InterruptedException {
                WorkflowJobBean wfBean = null;
                wfBean = getWorkflowOnly(id, locking);
                if (wfBean == null) {
                    throw new StoreException(ErrorCode.E0604, id);
                 * WorkflowInstance wfInstance; //krishna and next line
                 * wfInstance = workflowLib.get(id); wfInstance =
                 * wfBean.get(wfBean.getWfInstance());
                 * wfBean.setWorkflowInstance(wfInstance);
                 * wfBean.setWfInstance(wfInstance);
                return wfBean;
        return wfBean;

     * Get the number of Workflows with the given status.
     * @param status Workflow Status.
     * @return number of Workflows with given status.
     * @throws StoreException
    public int getWorkflowCountWithStatus(final String status) throws StoreException {
        ParamChecker.notEmpty(status, "status");
        Integer cnt = doOperation("getWorkflowCountWithStatus", new Callable<Integer>() {
            public Integer call() throws SQLException {
                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS");
                q.setParameter("status", status);
                Long count = (Long) q.getSingleResult();
                return Integer.valueOf(count.intValue());
        return cnt.intValue();

     * Get the number of Workflows with the given status which was modified in given time limit.
     * @param status Workflow Status.
     * @param secs No. of seconds within which the workflow got modified.
     * @return number of Workflows modified within given time with given status.
     * @throws StoreException
    public int getWorkflowCountWithStatusInLastNSeconds(final String status, final int secs) throws StoreException {
        ParamChecker.notEmpty(status, "status");
        ParamChecker.notEmpty(status, "secs");
        Integer cnt = doOperation("getWorkflowCountWithStatusInLastNSecs", new Callable<Integer>() {
            public Integer call() throws SQLException {
                Query q = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT_WITH_STATUS_IN_LAST_N_SECS");
                Timestamp ts = new Timestamp(System.currentTimeMillis() - (secs * 1000));
                q.setParameter("status", status);
                q.setParameter("lastModTime", ts);
                Long count = (Long) q.getSingleResult();
                return Integer.valueOf(count.intValue());
        return cnt.intValue();

     * Update the data from Workflow Bean to DB along with the workflow instance data. Action table is not updated
     * @param wfBean Workflow Bean
     * @throws StoreException If Workflow doesn't exist
    public void updateWorkflow(final WorkflowJobBean wfBean) throws StoreException {
        ParamChecker.notNull(wfBean, "WorkflowJobBean");
        doOperation("updateWorkflow", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
                        WorkflowJobQueryExecutor.WorkflowJobQuery.UPDATE_WORKFLOW, wfBean);
                return null;

     * Create a new Action record in the ACTIONS table with the given Bean.
     * @param action WorkflowActionBean
     * @throws StoreException If the action is already present
    public void insertAction(final WorkflowActionBean action) throws StoreException {
        ParamChecker.notNull(action, "WorkflowActionBean");
        doOperation("insertAction", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                return null;

     * Load the action data and returns a bean.
     * @param id Action Id
     * @param locking true if the action is to be locked
     * @return Action Bean
     * @throws StoreException If action doesn't exist
    public WorkflowActionBean getAction(final String id, final boolean locking) throws StoreException {
        ParamChecker.notEmpty(id, "ActionID");
        WorkflowActionBean action = doOperation("getAction", new Callable<WorkflowActionBean>() {
            public WorkflowActionBean call() throws SQLException, StoreException, WorkflowException,
                    InterruptedException {
                Query q = entityManager.createNamedQuery("GET_ACTION");
                 * if (locking) { OpenJPAQuery oq = OpenJPAPersistence.cast(q);
                 * FetchPlan fetch = oq.getFetchPlan();
                 * fetch.setReadLockMode(LockModeType.WRITE);
                 * fetch.setLockTimeout(1000); // 1 seconds }
                WorkflowActionBean action = null;
                q.setParameter("id", id);
                List<WorkflowActionBean> actions = q.getResultList();
                // action = (WorkflowActionBean) q.getSingleResult();
                if (actions.size() > 0) {
                    action = actions.get(0);
                else {
                    throw new StoreException(ErrorCode.E0605, id);

                 * if (locking) return action; else
                // return action;
                return getBeanForRunningAction(action);
        return action;

     * Update the given action bean to DB.
     * @param action Action Bean
     * @throws StoreException if action doesn't exist
    public void updateAction(final WorkflowActionBean action) throws StoreException {
        ParamChecker.notNull(action, "WorkflowActionBean");
        doOperation("updateAction", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException, JPAExecutorException {
                        WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action);
                return null;

     * Delete the Action with given id.
     * @param id Action ID
     * @throws StoreException if Action doesn't exist
    public void deleteAction(final String id) throws StoreException {
        ParamChecker.notEmpty(id, "ActionID");
        doOperation("deleteAction", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                 * Query q = entityManager.createNamedQuery("DELETE_ACTION");
                 * q.setParameter("id", id); q.executeUpdate();
                WorkflowActionBean action = entityManager.find(WorkflowActionBean.class, id);
                if (action != null) {
                return null;

     * Loads all the actions for the given Workflow. Also locks all the actions if locking is true.
     * @param wfId Workflow ID
     * @param locking true if Actions are to be locked
     * @return A List of WorkflowActionBean
     * @throws StoreException
    public List<WorkflowActionBean> getActionsForWorkflow(final String wfId, final boolean locking)
            throws StoreException {
        ParamChecker.notEmpty(wfId, "WorkflowID");
        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
                                                       new Callable<List<WorkflowActionBean>>() {
                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
                                                                   InterruptedException {
                                                               List<WorkflowActionBean> actions;
                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
                                                               try {
                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");

                                                                   * OpenJPAQuery oq = OpenJPAPersistence.cast(q);
                                                                   * if (locking) { //
                                                                   * q.setHint("openjpa.FetchPlan.ReadLockMode"
                                                                   * ,"WRITE"); FetchPlan fetch = oq.getFetchPlan();
                                                                   * fetch.setReadLockMode(LockModeType.WRITE);
                                                                   * fetch.setLockTimeout(1000); // 1 seconds }
                                                                   q.setParameter("wfId", wfId);
                                                                   actions = q.getResultList();
                                                                   for (WorkflowActionBean a : actions) {
                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
                                                               catch (IllegalStateException e) {
                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                                                               * if (locking) { return actions; } else {
                                                               return actionList;
                                                               // }
        return actions;

     * Loads given number of actions for the given Workflow. Also locks all the actions if locking is true.
     * @param wfId Workflow ID
     * @param start offset for select statement
     * @param len number of Workflow Actions to be returned
     * @param locking true if Actions are to be locked
     * @return A List of WorkflowActionBean
     * @throws StoreException
    public List<WorkflowActionBean> getActionsSubsetForWorkflow(final String wfId, final int start, final int len)
            throws StoreException {
        ParamChecker.notEmpty(wfId, "WorkflowID");
        List<WorkflowActionBean> actions = doOperation("getActionsForWorkflow",
                                                       new Callable<List<WorkflowActionBean>>() {
                                                           public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException,
                                                                   InterruptedException {
                                                               List<WorkflowActionBean> actions;
                                                               List<WorkflowActionBean> actionList = new ArrayList<WorkflowActionBean>();
                                                               try {
                                                                   Query q = entityManager.createNamedQuery("GET_ACTIONS_FOR_WORKFLOW");
                                                                   OpenJPAQuery oq = OpenJPAPersistence.cast(q);
                                                                   q.setParameter("wfId", wfId);
                                                                   q.setFirstResult(start - 1);
                                                                   actions = q.getResultList();
                                                                   for (WorkflowActionBean a : actions) {
                                                                       WorkflowActionBean aa = getBeanForRunningAction(a);
                                                               catch (IllegalStateException e) {
                                                                   throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                                                               return actionList;
        return actions;

     * Load All the actions that are pending for more than given time.
     * @param minimumPendingAgeSecs Minimum Pending age in seconds
     * @return List of action beans
     * @throws StoreException
    public List<WorkflowActionBean> getPendingActions(final long minimumPendingAgeSecs) throws StoreException {
        List<WorkflowActionBean> actions = doOperation("getPendingActions", new Callable<List<WorkflowActionBean>>() {
            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
                Timestamp ts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
                List<WorkflowActionBean> actionList = null;
                try {
                    Query q = entityManager.createNamedQuery("GET_PENDING_ACTIONS");
                    q.setParameter("pendingAge", ts);
                    actionList = q.getResultList();
                catch (IllegalStateException e) {
                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);
                return actionList;
        return actions;

     * Load All the actions that are running and were last checked after now - miminumCheckAgeSecs
     * @param checkAgeSecs check age in seconds.
     * @return List of action beans.
     * @throws StoreException
    public List<WorkflowActionBean> getRunningActions(final long checkAgeSecs) throws StoreException {
        List<WorkflowActionBean> actions = doOperation("getRunningActions", new Callable<List<WorkflowActionBean>>() {

            public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
                List<WorkflowActionBean> actions = new ArrayList<WorkflowActionBean>();
                Timestamp ts = new Timestamp(System.currentTimeMillis() - checkAgeSecs * 1000);
                try {
                    Query q = entityManager.createNamedQuery("GET_RUNNING_ACTIONS");
                    q.setParameter("lastCheckTime", ts);
                    actions = q.getResultList();
                catch (IllegalStateException e) {
                    throw new StoreException(ErrorCode.E0601, e.getMessage(), e);

                return actions;
        return actions;

     * Load All the actions that are START_RETRY or START_MANUAL or END_RETRY or END_MANUAL.
     * @param wfId String
     * @return List of action beans
     * @throws StoreException
    public List<WorkflowActionBean> getRetryAndManualActions(final String wfId) throws StoreException {
        List<WorkflowActionBean> actions = doOperation("GET_RETRY_MANUAL_ACTIONS",
                new Callable<List<WorkflowActionBean>>() {
                    public List<WorkflowActionBean> call() throws SQLException, StoreException, WorkflowException {
                        List<WorkflowActionBean> actionList = null;
                        try {
                            Query q = entityManager.createNamedQuery("GET_RETRY_MANUAL_ACTIONS");
                            q.setParameter("wfId", wfId);
                            actionList = q.getResultList();
                        catch (IllegalStateException e) {
                            throw new StoreException(ErrorCode.E0601, e.getMessage(), e);

                        return actionList;
        return actions;

     * Loads all the jobs that are satisfying the given filter condition. Filters can be applied on user, group,
     * appName, status.
     * @param filter Filter condition
     * @param start offset for select statement
     * @param len number of Workflows to be returned
     * @return A list of workflows
     * @throws StoreException
    public WorkflowsInfo getWorkflowsInfo(final Map<String, List<String>> filter, final int start, final int len)
            throws StoreException {

        WorkflowsInfo workFlowsInfo = doOperation("getWorkflowsInfo", new Callable<WorkflowsInfo>() {
            public WorkflowsInfo call() throws SQLException, StoreException {

                List<String> orArray = new ArrayList<String>();
                List<String> colArray = new ArrayList<String>();
                List<String> valArray = new ArrayList<String>();
                StringBuilder sb = new StringBuilder("");
                boolean isStatus = false;
                boolean isGroup = false;
                boolean isAppName = false;
                boolean isUser = false;
                boolean isEnabled = false;
                int index = 0;
                for (Map.Entry<String, List<String>> entry : filter.entrySet()) {
                    String colName = null;
                    String colVar = null;
                    if (entry.getKey().equals(OozieClient.FILTER_GROUP)) {
                        List<String> values = filter.get(OozieClient.FILTER_GROUP);
                        colName = "group";
                        for (int i = 0; i < values.size(); i++) {
                            colVar = "group";
                            colVar = colVar + index;
                            if (!isEnabled && !isGroup) {
                                sb.append(seletStr).append(" where w.group IN (:group" + index);
                                isGroup = true;
                                isEnabled = true;
                            else {
                                if (isEnabled && !isGroup) {
                                    sb.append(" and w.group IN (:group" + index);
                                    isGroup = true;
                                else {
                                    if (isGroup) {
                                        sb.append(", :group" + index);
                            if (i == values.size() - 1) {
                    else {
                        if (entry.getKey().equals(OozieClient.FILTER_STATUS)) {
                            List<String> values = filter.get(OozieClient.FILTER_STATUS);
                            colName = "status";
                            for (int i = 0; i < values.size(); i++) {
                                colVar = "status";
                                colVar = colVar + index;
                                if (!isEnabled && !isStatus) {
                                    sb.append(seletStr).append(" where w.statusStr IN (:status" + index);
                                    isStatus = true;
                                    isEnabled = true;
                                else {
                                    if (isEnabled && !isStatus) {
                                        sb.append(" and w.statusStr IN (:status" + index);
                                        isStatus = true;
                                    else {
                                        if (isStatus) {
                                            sb.append(", :status" + index);
                                if (i == values.size() - 1) {
                        else {
                            if (entry.getKey().equals(OozieClient.FILTER_NAME)) {
                                List<String> values = filter.get(OozieClient.FILTER_NAME);
                                colName = "appName";
                                for (int i = 0; i < values.size(); i++) {
                                    colVar = "appName";
                                    colVar = colVar + index;
                                    if (!isEnabled && !isAppName) {
                                        sb.append(seletStr).append(" where w.appName IN (:appName" + index);
                                        isAppName = true;
                                        isEnabled = true;
                                    else {
                                        if (isEnabled && !isAppName) {
                                            sb.append(" and w.appName IN (:appName" + index);
                                            isAppName = true;
                                        else {
                                            if (isAppName) {
                                                sb.append(", :appName" + index);
                                    if (i == values.size() - 1) {
                            else {
                                if (entry.getKey().equals(OozieClient.FILTER_USER)) {
                                    List<String> values = filter.get(OozieClient.FILTER_USER);
                                    colName = "user";
                                    for (int i = 0; i < values.size(); i++) {
                                        colVar = "user";
                                        colVar = colVar + index;
                                        if (!isEnabled && !isUser) {
                                            sb.append(seletStr).append(" where w.user IN (:user" + index);
                                            isUser = true;
                                            isEnabled = true;
                                        else {
                                            if (isEnabled && !isUser) {
                                                sb.append(" and w.user IN (:user" + index);
                                                isUser = true;
                                            else {
                                                if (isUser) {
                                                    sb.append(", :user" + index);
                                        if (i == values.size() - 1) {

                int realLen = 0;

                Query q = null;
                Query qTotal = null;
                if (orArray.size() == 0) {
                    q = entityManager.createNamedQuery("GET_WORKFLOWS_COLUMNS");
                    q.setFirstResult(start - 1);
                    qTotal = entityManager.createNamedQuery("GET_WORKFLOWS_COUNT");
                else {
                    if (orArray.size() > 0) {
                        StringBuilder sbTotal = new StringBuilder(sb);
                        sb.append(" order by w.startTimestamp desc ");
                        XLog.getLog(getClass()).debug("Created String is **** " + sb.toString());
                        q = entityManager.createQuery(sb.toString());
                        q.setFirstResult(start - 1);
                        qTotal = entityManager.createQuery(sbTotal.toString().replace(seletStr, countStr));
                        for (int i = 0; i < orArray.size(); i++) {
                            q.setParameter(colArray.get(i), valArray.get(i));
                            qTotal.setParameter(colArray.get(i), valArray.get(i));

                OpenJPAQuery kq = OpenJPAPersistence.cast(q);
                JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
                List<?> resultList = q.getResultList();
                List<Object[]> objectArrList = (List<Object[]>) resultList;
                List<WorkflowJobBean> wfBeansList = new ArrayList<WorkflowJobBean>();

                for (Object[] arr : objectArrList) {
                    WorkflowJobBean ww = getBeanForWorkflowFromArray(arr);

                realLen = ((Long) qTotal.getSingleResult()).intValue();

                return new WorkflowsInfo(wfBeansList, start, len, realLen);
        return workFlowsInfo;


     * Load the Workflow and all Action details and return a WorkflowJobBean. Workflow Instance is not loaded
     * @param id Workflow Id
     * @return Workflow Bean
     * @throws StoreException If Workflow doesn't exist
    public WorkflowJobBean getWorkflowInfo(final String id) throws StoreException {
        ParamChecker.notEmpty(id, "WorkflowID");
        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
                WorkflowJobBean wfBean = null;
                wfBean = getWorkflowforInfo(id, false);
                if (wfBean == null) {
                    throw new StoreException(ErrorCode.E0604, id);
                else {
                    wfBean.setActions(getActionsForWorkflow(id, false));
                return wfBean;
        return wfBean;

     * Load the Workflow and subset Actions details and return a WorkflowJobBean. Workflow Instance is not loaded
     * @param id Workflow Id
     * @param start offset for select statement for actions
     * @param len number of Workflow Actions to be returned
     * @return Workflow Bean
     * @throws StoreException If Workflow doesn't exist
    public WorkflowJobBean getWorkflowInfoWithActionsSubset(final String id, final int start, final int len) throws StoreException {
        ParamChecker.notEmpty(id, "WorkflowID");
        WorkflowJobBean wfBean = doOperation("getWorkflowInfo", new Callable<WorkflowJobBean>() {
            public WorkflowJobBean call() throws SQLException, StoreException, InterruptedException {
                WorkflowJobBean wfBean = null;
                wfBean = getWorkflowforInfo(id, false);
                if (wfBean == null) {
                    throw new StoreException(ErrorCode.E0604, id);
                else {
                    wfBean.setActions(getActionsSubsetForWorkflow(id, start, len));
                return wfBean;
        return wfBean;

     * Get the Workflow ID with given external ID which will be assigned for the subworkflows.
     * @param externalId external ID
     * @return Workflow ID
     * @throws StoreException if there is no job with external ID
    public String getWorkflowIdForExternalId(final String externalId) throws StoreException {
        ParamChecker.notEmpty(externalId, "externalId");
        String wfId = doOperation("getWorkflowIdForExternalId", new Callable<String>() {
            public String call() throws SQLException, StoreException {
                String id = "";
                Query q = entityManager.createNamedQuery("GET_WORKFLOW_ID_FOR_EXTERNAL_ID");
                q.setParameter("externalId", externalId);
                List<String> w = q.getResultList();
                if (w.size() == 0) {
                    id = "";
                else {
                    int index = w.size() - 1;
                    id = w.get(index);
                return id;
        return wfId;

    private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;

     * Purge the Workflows Completed older than given days.
     * @param olderThanDays number of days for which to preserve the workflows
     * @throws StoreException
    public void purge(final long olderThanDays, final int limit) throws StoreException {
        doOperation("purge", new Callable<Void>() {
            public Void call() throws SQLException, StoreException, WorkflowException {
                Timestamp maxEndTime = new Timestamp(System.currentTimeMillis() - (olderThanDays * DAY_IN_MS));
                Query q = entityManager.createNamedQuery("GET_COMPLETED_WORKFLOWS_OLDER_THAN");
                q.setParameter("endTime", maxEndTime);
                List<WorkflowJobBean> workflows = q.getResultList();
                int actionDeleted = 0;
                if (workflows.size() != 0) {
                    for (WorkflowJobBean w : workflows) {
                        String wfId = w.getId();
                        Query g = entityManager.createNamedQuery("DELETE_ACTIONS_FOR_WORKFLOW");
                        g.setParameter("wfId", wfId);
                        actionDeleted += g.executeUpdate();
                XLog.getLog(getClass()).debug("ENDED Workflow Purge deleted jobs :" + workflows.size() + " and actions " + actionDeleted);
                return null;

    private <V> V doOperation(String name, Callable<V> command) throws StoreException {
        try {
            Instrumentation.Cron cron = new Instrumentation.Cron();
            V retVal;
            try {
                retVal = command.call();
            finally {
            Services.get().get(InstrumentationService.class).get().addCron(INSTR_GROUP, name, cron);
            return retVal;
        catch (StoreException ex) {
            throw ex;
        catch (SQLException ex) {
            throw new StoreException(ErrorCode.E0611, name, ex.getMessage(), ex);
        catch (Exception e) {
            throw new StoreException(ErrorCode.E0607, name, e.getMessage(), e);

    private WorkflowJobBean getWorkflowOnly(final String id, boolean locking) throws SQLException,
            InterruptedException, StoreException {
        WorkflowJobBean wfBean = null;
        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
         * if (locking) { // q.setHint("openjpa.FetchPlan.ReadLockMode","READ");
         * OpenJPAQuery oq = OpenJPAPersistence.cast(q); FetchPlan fetch =
         * oq.getFetchPlan(); fetch.setReadLockMode(LockModeType.WRITE);
         * fetch.setLockTimeout(-1); // unlimited }
        q.setParameter("id", id);
        List<WorkflowJobBean> w = q.getResultList();
        if (w.size() > 0) {
            wfBean = w.get(0);
        return wfBean;
        // return getBeanForRunningWorkflow(wfBean);

    private WorkflowJobBean getWorkflowforInfo(final String id, boolean locking) throws SQLException,
            InterruptedException, StoreException {
        WorkflowJobBean wfBean = null;
        Query q = entityManager.createNamedQuery("GET_WORKFLOW");
        q.setParameter("id", id);
        List<WorkflowJobBean> w = q.getResultList();
        if (w.size() > 0) {
            wfBean = w.get(0);
            return getBeanForRunningWorkflow(wfBean);
        return null;

    private WorkflowJobBean getBeanForRunningWorkflow(WorkflowJobBean w) throws SQLException {
        WorkflowJobBean wfBean = new WorkflowJobBean();
        return wfBean;

    private WorkflowJobBean getBeanForWorkflowFromArray(Object[] arr) {

        WorkflowJobBean wfBean = new WorkflowJobBean();
        wfBean.setId((String) arr[0]);
        if (arr[1] != null) {
            wfBean.setAppName((String) arr[1]);
        if (arr[2] != null) {
            wfBean.setStatus(Status.valueOf((String) arr[2]));
        if (arr[3] != null) {
            wfBean.setRun((Integer) arr[3]);
        if (arr[4] != null) {
            wfBean.setUser((String) arr[4]);
        if (arr[5] != null) {
            wfBean.setGroup((String) arr[5]);
        if (arr[6] != null) {
            wfBean.setCreatedTime((Timestamp) arr[6]);
        if (arr[7] != null) {
            wfBean.setStartTime((Timestamp) arr[7]);
        if (arr[8] != null) {
            wfBean.setLastModifiedTime((Timestamp) arr[8]);
        if (arr[9] != null) {
            wfBean.setEndTime((Timestamp) arr[9]);
        return wfBean;

    private WorkflowActionBean getBeanForRunningAction(WorkflowActionBean a) throws SQLException {
        if (a != null) {
            WorkflowActionBean action = new WorkflowActionBean();
            action.setErrorInfo(a.getErrorCode(), a.getErrorMessage());
            if (a.isPending() == true) {
            return action;
        return null;

In my next blog post, I will talk more about the MapperLauncher, DAGEngine, and etc. Stay tuned.

Categories: Hadoop, Oozie Tags: ,

HBase Multiversion Concurrency Control

November 12, 2013 Leave a comment

For those who are interested in Multiversion Concurrency Control MVCC in HBase, check out the below post, a very good introduction post.


Also, there is a good paper on the Theory and Algorithms of Multiversion Concurrency Control.

We can find the implementation of Multiversion Concurrency Control class in HBase code base, org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.
It is used by org.apache.hadoop.hbase.regionserver.HRegion.

 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.hadoop.hbase.regionserver;

import java.util.LinkedList;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;

import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

 * Manages the read/write consistency within memstore. This provides
 * an interface for readers to determine what entries to ignore, and
 * a mechanism for writers to obtain new write numbers, then "commit"
 * the new writes for readers to read (thus forming atomic transactions).
public class MultiVersionConsistencyControl {
  private volatile long memstoreRead = 0;
  private volatile long memstoreWrite = 0;

  private final Object readWaiters = new Object();

  // This is the pending queue of writes.
  private final LinkedList<WriteEntry> writeQueue =
      new LinkedList<WriteEntry>();

  private static final ThreadLocal<Long> perThreadReadPoint =
      new ThreadLocal<Long>() {
       Long initialValue() {
         return Long.MAX_VALUE;

   * Default constructor. Initializes the memstoreRead/Write points to 0.
  public MultiVersionConsistencyControl() {
    this.memstoreRead = this.memstoreWrite = 0;

   * Initializes the memstoreRead/Write points appropriately.
   * @param startPoint
  public void initialize(long startPoint) {
    synchronized (writeQueue) {
      if (this.memstoreWrite != this.memstoreRead) {
        throw new RuntimeException("Already used this mvcc. Too late to initialize");

      this.memstoreRead = this.memstoreWrite = startPoint;

   * Get this thread's read point. Used primarily by the memstore scanner to
   * know which values to skip (ie: have not been completed/committed to
   * memstore).
  public static long getThreadReadPoint() {
      return perThreadReadPoint.get();

   * Set the thread read point to the given value. The thread MVCC
   * is used by the Memstore scanner so it knows which values to skip.
   * Give it a value of 0 if you want everything.
  public static void setThreadReadPoint(long readPoint) {

   * Set the thread MVCC read point to whatever the current read point is in
   * this particular instance of MVCC.  Returns the new thread read point value.
  public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) {
    return getThreadReadPoint();

   * Set the thread MVCC read point to 0 (include everything).
  public static void resetThreadReadPoint() {

   * Generate and return a {@link WriteEntry} with a new write number.
   * To complete the WriteEntry and wait for it to be visible,
   * call {@link #completeMemstoreInsert(WriteEntry)}.
  public WriteEntry beginMemstoreInsert() {
    synchronized (writeQueue) {
      long nextWriteNumber = ++memstoreWrite;
      WriteEntry e = new WriteEntry(nextWriteNumber);
      return e;

   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
   * At the end of this call, the global read point is at least as large as the write point
   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
  public void completeMemstoreInsert(WriteEntry e) {

   * Mark the {@link WriteEntry} as complete and advance the read point as
   * much as possible.
   * How much is the read point advanced?
   * Let S be the set of all write numbers that are completed and where all previous write numbers
   * are also completed.  Then, the read point is advanced to the supremum of S.
   * @param e
   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
  boolean advanceMemstore(WriteEntry e) {
    synchronized (writeQueue) {

      long nextReadValue = -1;
      boolean ranOnce=false;
      while (!writeQueue.isEmpty()) {
        WriteEntry queueFirst = writeQueue.getFirst();

        if (nextReadValue > 0) {
          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
                + nextReadValue + " next: " + queueFirst.getWriteNumber());

        if (queueFirst.isCompleted()) {
          nextReadValue = queueFirst.getWriteNumber();
        } else {

      if (!ranOnce) {
        throw new RuntimeException("never was a first");

      if (nextReadValue > 0) {
        synchronized (readWaiters) {
          memstoreRead = nextReadValue;
      if (memstoreRead >= e.getWriteNumber()) {
        return true;
      return false;

   * Wait for the global readPoint to advance upto
   * the specified transaction number.
  public void waitForRead(WriteEntry e) {
    boolean interrupted = false;
    synchronized (readWaiters) {
      while (memstoreRead < e.getWriteNumber()) {
        try {
        } catch (InterruptedException ie) {
          // We were interrupted... finish the loop -- i.e. cleanup --and then
          // on our way out, reset the interrupt flag.
          interrupted = true;
    if (interrupted) Thread.currentThread().interrupt();

  public long memstoreReadPoint() {
    return memstoreRead;

  public static class WriteEntry {
    private long writeNumber;
    private boolean completed = false;
    WriteEntry(long writeNumber) {
      this.writeNumber = writeNumber;
    void markCompleted() {
      this.completed = true;
    boolean isCompleted() {
      return this.completed;
    long getWriteNumber() {
      return this.writeNumber;

  public static final long FIXED_SIZE = ClassSize.align(
      ClassSize.OBJECT +
      2 * Bytes.SIZEOF_LONG +
      2 * ClassSize.REFERENCE);


HBase 0.96.0 release: Protobufs as wire format

HBase 0.96.0 was released recently on October 18th, 2013.

Some notable changes are
1) Adopted protocol buffer as the wire format
2) Reduced mean time to recovery
3) Removed ROOT table

Let’s take a look at all these new changes in depth to understand the new design and explore the differences/changes introduced in the codebase.

In this first blog, we will take a look at protocol buffer adoption in HBase and examine the differences it introduced in the code base
The main impact of the change is all the classes which previously implemented Writable interface
to be marshaled over the RPC wire do not need to implement the interface anymore.
In another word, both manual marshaling/demarshaling write(final DataOutput out) and readFields(final DataInput in) are not required anymore. Instead, it relies on protobufs to send the Get request over the RPC wire. Take a look at the following

Get class after 0.96.0, does not need to implement Writable interface

public class Get extends OperationWithAttributes
  implements Row, Comparable

Old Get class definition implements Writable interface
with readFields(final DataInput in) and write(final DataOutput out) implemented

public class Get extends OperationWithAttributes
  implements Writable, Row, Comparable {

 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version &gt; GET_VERSION) {
      throw new IOException("unsupported version");
    this.row = Bytes.readByteArray(in);
    this.lockId = in.readLong();
    this.maxVersions = in.readInt();
    boolean hasFilter = in.readBoolean();
    if (hasFilter) {
      this.filter = Classes.createWritableForName(
    this.cacheBlocks = in.readBoolean();
    this.tr = new TimeRange();
    int numFamilies = in.readInt();
    this.familyMap =
      new TreeMap&lt;byte [],NavigableSet&gt;(Bytes.BYTES_COMPARATOR);
    for(int i=0; i&lt;numFamilies; i++) {
      byte [] family = Bytes.readByteArray(in);
      boolean hasColumns = in.readBoolean();
      NavigableSet set = null;
      if(hasColumns) {
        int numColumns = in.readInt();
        set = new TreeSet(Bytes.BYTES_COMPARATOR);
        for(int j=0; j&lt;numColumns; j++) {
          byte [] qualifier = Bytes.readByteArray(in);
      this.familyMap.put(family, set);

  public void write(final DataOutput out)
  throws IOException {
    Bytes.writeByteArray(out, this.row);
    if(this.filter == null) {
    } else {
      Bytes.writeByteArray(out, Bytes.toBytes(filter.getClass().getName()));
    for(Map.Entry&lt;byte [], NavigableSet&gt; entry :
      familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      NavigableSet columnSet = entry.getValue();
      if(columnSet == null) {
      } else {
        for(byte [] qualifier : columnSet) {
          Bytes.writeByteArray(out, qualifier);

New Put class definition after 0.96.0

public class Put extends Mutation implements HeapSize, Comparable

Old Put class definition

public class Put extends Mutation
  implements HeapSize, Writable, Comparable {
 public void readFields(final DataInput in)
  throws IOException {
    int version = in.readByte();
    if (version &gt; PUT_VERSION) {
      throw new IOException("version not supported");
    this.row = Bytes.readByteArray(in);
    this.ts = in.readLong();
    this.lockId = in.readLong();
    this.writeToWAL = in.readBoolean();
    int numFamilies = in.readInt();
    if (!this.familyMap.isEmpty()) this.familyMap.clear();
    for(int i=0;i&lt;numFamilies;i++) {
      byte [] family = Bytes.readByteArray(in);
      int numKeys = in.readInt();
      List keys = new ArrayList(numKeys);
      int totalLen = in.readInt();
      byte [] buf = new byte[totalLen];
      int offset = 0;
      for (int j = 0; j  1) {

  public void write(final DataOutput out)
  throws IOException {
    Bytes.writeByteArray(out, this.row);
    for (Map.Entry&lt;byte [], List&gt; entry : familyMap.entrySet()) {
      Bytes.writeByteArray(out, entry.getKey());
      List keys = entry.getValue();
      int totalLen = 0;
      for(KeyValue kv : keys) {
        totalLen += kv.getLength();
      for(KeyValue kv : keys) {
        out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());

New Scan class definition after 0.96.0

public class Scan extends OperationWithAttributes 

Old Scan class definition

public class Scan extends OperationWithAttributes implements Writable {

You can find protobuf definition files in the src/main/protobuf folder of the hbase-protocol project

In HTable, we can see how the protobuf is being used in action
in the get(final Get get) method, RegionServerCallable is built which uses ProtobufUtil to
make the actual protobuf GetRequest to the server


  public Result get(final Get get) throws IOException {
    RegionServerCallable callable = new RegionServerCallable(this.connection,
        getName(), get.getRow()) {
      public Result call() throws IOException {
        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
    return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);

ProtobufUtil is in package org.apache.hadoop.hbase.protobuf
is the protobuf util class to help build protobuf calls using the auto generated protobufs classes.

package org.apache.hadoop.hbase.protobuf;

   * A helper to invoke a Get using client protocol.
   * @param client
   * @param regionName
   * @param get
   * @return the result of the Get
   * @throws IOException
  public static Result get(final ClientService.BlockingInterface client,
      final byte[] regionName, final Get get) throws IOException {
    GetRequest request =
      RequestConverter.buildGetRequest(regionName, get);
    try {
      GetResponse response = client.get(null, request);
      if (response == null) return null;
      return toResult(response.getResult());
    } catch (ServiceException se) {
      throw getRemoteException(se);


   * Create a protocol buffer GetRequest for a client Get
   * @param regionName the name of the region to get
   * @param get the client Get
   * @return a protocol buffer GetRequest
  public static GetRequest buildGetRequest(final byte[] regionName,
      final Get get) throws IOException {
    GetRequest.Builder builder = GetRequest.newBuilder();
    RegionSpecifier region = buildRegionSpecifier(
      RegionSpecifierType.REGION_NAME, regionName);
    return builder.build();

A look inside HRegion’s Stores initialization

In HBase, a table is split into multiple regions and each region server hosts a certain set of regions

In each region (org.apache.hadoop.hbase.regionserver.HRegion), it maintains one or more Stores where each store is responsible for one column family. If you take a look at the org.apache.hadoop.hbase.regionserver.Store class, you will notice its member variables, MemStore and list of StoreFiles.

During the initialization of HRegion, it has to load these multiple Stores. During the process, each Store will also have to initialize and load up its Memstore and the list of StoreFiles. HRegion performs these initializations in multiple background threads instead of loading them sequentially. Basically, HRegion constructs a Callable to be executed via the thread pool to speed up the process.  After the callable submission, it makes use of Future and CompletionService to query about the loading status.

 if (this.htableDescriptor != null &&
        !htableDescriptor.getFamilies().isEmpty()) {
      // initialize the thread pool for opening stores in parallel.
      ThreadPoolExecutor storeOpenerThreadPool =
          "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString());
      CompletionService<Store> completionService =
        new ExecutorCompletionService<Store>(storeOpenerThreadPool);

      // initialize each store in parallel
      for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
        status.setStatus("Instantiating store for column family " + family);
        completionService.submit(new Callable<Store>() {
          public Store call() throws IOException {
            return instantiateHStore(tableDir, family);
      try {
        for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
          Future<Store> future = completionService.take();
          Store store = future.get();

          this.stores.put(store.getColumnFamilyName().getBytes(), store);
          long storeSeqId = store.getMaxSequenceId();
          if (maxSeqId == -1 || storeSeqId > maxSeqId) {
            maxSeqId = storeSeqId;
          long maxStoreMemstoreTS = store.getMaxMemstoreTS();
          if (maxStoreMemstoreTS > maxMemstoreTS) {
            maxMemstoreTS = maxStoreMemstoreTS;
      } catch (InterruptedException e) {
        throw new IOException(e);
      } catch (ExecutionException e) {
        throw new IOException(e.getCause());
      } finally {

There is another interesting class, org.apache.hadoop.hbase.regionserver.MemStoreLAB. The main purpose of this class is to prevent heap fragmentation and long GC pauses. Take a look at the following excellent blog about the design of MSLAB (MemStore-Local Allocation Buffers) and how it solve the fragmentation issue.


Categories: HBase Tags:

Mesos vs YARN

Categories: Hadoop Yarn, Mesos Tags: ,

Interesting paper on accelerating Hadoop via Network Levitated Merge

Categories: Hadoop, Paper Tags: ,

Bucket in OpenTSDB

In OpenTSDB, it introduces the notion of time bucket, basically a way of grouping all data points fall within the specific time bucket. OpenTSDB uses the hourly time bucket for partitioning of data with the cell value is either of type integer or float. One could easily extend the same design to other domains storing more complex data types, for example, complex Avro type.

Even better, different time buckets could be designed such as daily, weekly, and monthly buckets. The sky is the limit here.

In openTSDB, the timestamp is broken into two parts, encoded both in the row key and qualifier.

The first one part is the hourly basetime encoded in the row key and the other is the delta seconds from the hourly bucket encoded in the qualifier.

The following code snippet from OpenTSDB shows how the base_time (hour bucket) is computed.
final long base_time = timestamp – (timestamp % Const.MAX_TIMESPAN);

private long updateBaseTime(final long timestamp) {

// We force the starting timestamp to be on a MAX_TIMESPAN boundary

// so that all TSDs create rows with the same base time.  Otherwise

// we'd need to coordinate TSDs to avoid creating rows that cover

// overlapping time periods.

final long base_time = timestamp - (timestamp % Const.MAX_TIMESPAN);

// Clone the row key since we're going to change it.  We must clone it

// because the HBase client may still hold a reference to it in its

// internal datastructures.

row = Arrays.copyOf(row, row.length);

Bytes.setInt(row, (int) base_time, tsdb.metrics.width());

tsdb.scheduleForCompaction(row, (int) base_time);

return base_time;


Categories: HBase, OpenTSDB Tags: ,

HBase Custom Filter

September 29, 2013 Leave a comment

There are many different scan filters one could use in HBase. But, sometimes you simply need to build your own custom filter since none would satisfy your needs. With HBase 0.94.7, one could build his/her custom filter to be dynamically loaded by HBase without restarting the cluster.

It allows us to put our custom filter packaged as a jar to be placed in the following specified folder


If you want to understand the mechanic of how HBase accomplishes this, you could look at the following classes in org.apache.hadoop.hbase.util package.

Basically, custom filters can be dropped into a pre-configured folder (hbase.dynamic.jars.dir), which can be in hdfs. In this way, region servers can pick them up dynamically, without the need to restart the cluster for the new filters to take effect.

Classes.java, ClassLoaderBase.java, DynamicClassLoader.java

Look at the createWritableForName(String className) in the Classes.java.  A very common way of instantiating class via reflection.

public static Filter createWritableForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>) Class.forName(className, true, CLASS_LOADER);
return (Filter)WritableFactories.newInstance(clazz, new Configuration());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);

* Copyright The Apache Software Foundation
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*     http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.

package org.apache.hadoop.hbase.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.io.WritableFactories;

* Utilities for class manipulation.
public class Classes {

* Dynamic class loader to load filter/comparators
private final static ClassLoader CLASS_LOADER;

static {
ClassLoader parent = Classes.class.getClassLoader();
Configuration conf = HBaseConfiguration.create();
CLASS_LOADER = new DynamicClassLoader(conf, parent);

* Equivalent of {@link Class#forName(String)} which also returns classes for
* primitives like <code>boolean</code>, etc.
* @param className
*          The name of the class to retrieve. Can be either a normal class or
*          a primitive class.
* @return The class specified by <code>className</code>
* @throws ClassNotFoundException
*           If the requested class can not be found.
public static Class<?> extendedForName(String className)
throws ClassNotFoundException {
Class<?> valueType;
if (className.equals("boolean")) {
valueType = boolean.class;
} else if (className.equals("byte")) {
valueType = byte.class;
} else if (className.equals("short")) {
valueType = short.class;
} else if (className.equals("int")) {
valueType = int.class;
} else if (className.equals("long")) {
valueType = long.class;
} else if (className.equals("float")) {
valueType = float.class;
} else if (className.equals("double")) {
valueType = double.class;
} else if (className.equals("char")) {
valueType = char.class;
} else {
valueType = Class.forName(className);
return valueType;

public static String stringify(Class[] classes) {
StringBuilder buf = new StringBuilder();
if (classes != null) {
for (Class c : classes) {
if (buf.length() > 0) {
} else {
return buf.toString();

* Used to dynamically load a filter class, and create a Writable filter.
* This filter class most likely extends Configurable.
* @param className the filter class name.
* @return a filter
public static Filter createWritableForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>) Class.forName(className, true, CLASS_LOADER);
return (Filter)WritableFactories.newInstance(clazz, new Configuration());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);

* This method is almost the same as #createWritableForName, except
* that this one doesn't expect the filter class to extends Configurable.
* @param className the filter class name.
* @return a filter
public static Filter createForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>)Class.forName(className, true, CLASS_LOADER);
return (Filter)clazz.newInstance();
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);
} catch (InstantiationException e) {
throw new RuntimeException("Couldn't instantiate " + className, e);
} catch (IllegalAccessException e) {
throw new RuntimeException("No access to " + className, e);

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*     http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.util;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.classification.InterfaceAudience;

import com.google.common.base.Preconditions;

* Base class loader that defines couple shared constants used
* by sub-classes. It also defined method getClassLoadingLock for parallel
* class loading and JDK 1.6 support. This method (getClassLoadingLock)
* is similar to the same method in the base class Java ClassLoader
* introduced in JDK 1.7, but not in JDK 1.6.
public class ClassLoaderBase extends URLClassLoader {

// Maps class name to the corresponding lock object
private final ConcurrentHashMap<String, Object> parallelLockMap
= new ConcurrentHashMap<String, Object>();

protected static final String DEFAULT_LOCAL_DIR = "/tmp/hbase-local-dir";
protected static final String LOCAL_DIR_KEY = "hbase.local.dir";

* Parent class loader.
protected final ClassLoader parent;

* Creates a DynamicClassLoader that can load classes dynamically
* from jar files under a specific folder.
* @param parent the parent ClassLoader to set.
public ClassLoaderBase(final ClassLoader parent) {
super(new URL[]{}, parent);
Preconditions.checkNotNull(parent, "No parent classloader!");
this.parent = parent;

* Returns the lock object for class loading operations.
protected Object getClassLoadingLock(String className) {
Object lock = parallelLockMap.get(className);
if (lock != null) {
return lock;

Object newLock = new Object();
lock = parallelLockMap.putIfAbsent(className, newLock);
if (lock == null) {
lock = newLock;
return lock;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*     http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.util;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

* This is a class loader that can load classes dynamically from new
* jar files under a configured folder. The paths to the jar files are
* converted to URLs, and URLClassLoader logic is actually used to load
* classes. This class loader always uses its parent class loader
* to load a class at first. Only if its parent class loader
* can not load a class, we will try to load it using the logic here.
* The configured folder can be a HDFS path. In this case, the jar files
* under that folder will be copied to local at first under ${hbase.local.dir}/jars/.
* The local copy will be updated if the remote copy is updated, according to its
* last modified timestamp.
* We can't unload a class already loaded. So we will use the existing
* jar files we already know to load any class which can't be loaded
* using the parent class loader. If we still can't load the class from
* the existing jar files, we will check if any new jar file is added,
* if so, we will load the new jar file and try to load the class again.
* If still failed, a class not found exception will be thrown.
* Be careful in uploading new jar files and make sure all classes
* are consistent, otherwise, we may not be able to load your
* classes properly.
public class DynamicClassLoader extends ClassLoaderBase {
private static final Log LOG =

// Dynamic jars are put under ${hbase.local.dir}/jars/
private static final String DYNAMIC_JARS_DIR = File.separator
+ "jars" + File.separator;

private static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir";

private File localDir;

// FileSystem of the remote path, set only if remoteDir != null
private FileSystem remoteDirFs;
private Path remoteDir;

// Last modified time of local jars
private HashMap<String, Long> jarModifiedTime;

* Creates a DynamicClassLoader that can load classes dynamically
* from jar files under a specific folder.
* @param conf the configuration for the cluster.
* @param parent the parent ClassLoader to set.
public DynamicClassLoader(
final Configuration conf, final ClassLoader parent) {

jarModifiedTime = new HashMap<String, Long>();
String localDirPath = conf.get(
localDir = new File(localDirPath);
if (!localDir.mkdirs() && !localDir.isDirectory()) {
throw new RuntimeException("Failed to create local dir " + localDir.getPath()
+ ", DynamicClassLoader failed to init");

String remotePath = conf.get(DYNAMIC_JARS_DIR_KEY);
if (remotePath == null || remotePath.equals(localDirPath)) {
remoteDir = null;  // ignore if it is the same as the local path
} else {
remoteDir = new Path(remotePath);
try {
remoteDirFs = remoteDir.getFileSystem(conf);
} catch (IOException ioe) {
LOG.warn("Failed to identify the fs of dir "
+ remoteDir + ", ignored", ioe);
remoteDir = null;

public Class<?> loadClass(String name)
throws ClassNotFoundException {
try {
return parent.loadClass(name);
} catch (ClassNotFoundException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Class " + name + " not found - using dynamical class loader");

synchronized (getClassLoadingLock(name)) {
// Check whether the class has already been loaded:
Class<?> clasz = findLoadedClass(name);
if (clasz != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Class " + name + " already loaded");
else {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Finding class: " + name);
clasz = findClass(name);
} catch (ClassNotFoundException cnfe) {
// Load new jar files if any
if (LOG.isDebugEnabled()) {
LOG.debug("Loading new jar files, if any");

if (LOG.isDebugEnabled()) {
LOG.debug("Finding class again: " + name);
clasz = findClass(name);
return clasz;

private synchronized void loadNewJars() {
// Refresh local jar file lists
for (File file: localDir.listFiles()) {
String fileName = file.getName();
if (jarModifiedTime.containsKey(fileName)) {
if (file.isFile() && fileName.endsWith(".jar")) {
jarModifiedTime.put(fileName, Long.valueOf(file.lastModified()));
try {
URL url = file.toURI().toURL();
} catch (MalformedURLException mue) {
// This should not happen, just log it
LOG.warn("Failed to load new jar " + fileName, mue);

// Check remote files
FileStatus[] statuses = null;
if (remoteDir != null) {
try {
statuses = remoteDirFs.listStatus(remoteDir);
} catch (IOException ioe) {
LOG.warn("Failed to check remote dir status " + remoteDir, ioe);
if (statuses == null || statuses.length == 0) {
return; // no remote files at all

for (FileStatus status: statuses) {
if (status.isDir()) continue; // No recursive lookup
Path path = status.getPath();
String fileName = path.getName();
if (!fileName.endsWith(".jar")) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignored non-jar file " + fileName);
continue; // Ignore non-jar files
Long cachedLastModificationTime = jarModifiedTime.get(fileName);
if (cachedLastModificationTime != null) {
long lastModified = status.getModificationTime();
if (lastModified < cachedLastModificationTime.longValue()) {
// There could be some race, for example, someone uploads
// a new one right in the middle the old one is copied to
// local. We can check the size as well. But it is still
// not guaranteed. This should be rare. Most likely,
// we already have the latest one.
// If you are unlucky to hit this race issue, you have
// to touch the remote jar to update its last modified time
try {
// Copy it to local
File dst = new File(localDir, fileName);
remoteDirFs.copyToLocalFile(path, new Path(dst.getPath()));
jarModifiedTime.put(fileName, Long.valueOf(dst.lastModified()));
URL url = dst.toURI().toURL();
} catch (IOException ioe) {
LOG.warn("Failed to load new jar " + fileName, ioe);
Categories: HBase Tags: ,