Archive

Posts Tagged ‘Hadoop’

Sqoop Mysql Data Import

In Sqoop, We can import data from MySQL and write to HDFS. It basically utilizes Mapper that opens up a pipe to mysqldump and pulls data directly. If you look at the following MySQLDumpMapper, it creates CopyingAsyncSink to process the MySQLDump stream via the reader thread, CopyingStreamThread. The thread reads data from mysqldump and cleans up any extra header/characters before writing to HDFS.

On the other hand, to export data from HDFS back to MySQL, it uses MySQLExportMapper that starts a mysqlimport process and uses that to export rows from HDFS to a MySQL database at high speed. Basically, it creates FIFO, a named pipe (using mknod –mode=0) to interface with mysqlimport behind the scene. A BufferedOutputStream is used to write bytes straight to mysqlimport process via the above named pipe, which is basically can be thought as a file.


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.sqoop.mapreduce;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.util.AsyncSink;
import org.apache.sqoop.util.JdbcUrl;
import org.apache.sqoop.util.PerfCounters;
import com.cloudera.sqoop.lib.DelimiterSet;
import com.cloudera.sqoop.lib.FieldFormatter;
import com.cloudera.sqoop.lib.RecordParser;
import com.cloudera.sqoop.manager.MySQLUtils;
import com.cloudera.sqoop.util.ErrorableAsyncSink;
import com.cloudera.sqoop.util.ErrorableThread;
import com.cloudera.sqoop.util.LoggingAsyncSink;

/**
 * Mapper that opens up a pipe to mysqldump and pulls data directly.
 */
public class MySQLDumpMapper
    extends SqoopMapper {

  public static final Log LOG = LogFactory.getLog(
      MySQLDumpMapper.class.getName());

  private Configuration conf;

  // AsyncSinks used to import data from mysqldump directly into HDFS.

  /**
   * Copies data directly from mysqldump into HDFS, after stripping some
   * header and footer characters that are attached to each line in mysqldump.
   */
  public static class CopyingAsyncSink extends ErrorableAsyncSink {
    private final MySQLDumpMapper.Context context;
    private final PerfCounters counters;

    protected CopyingAsyncSink(final MySQLDumpMapper.Context context,
        final PerfCounters ctrs) {
      this.context = context;
      this.counters = ctrs;
    }

    public void processStream(InputStream is) {
      child = new CopyingStreamThread(is, context, counters);
      child.start();
    }

    private static class CopyingStreamThread extends ErrorableThread {
      public static final Log LOG = LogFactory.getLog(
          CopyingStreamThread.class.getName());

      private final MySQLDumpMapper.Context context;
      private final InputStream stream;
      private final PerfCounters counters;

      CopyingStreamThread(final InputStream is,
          final Context c, final PerfCounters ctrs) {
        this.context = c;
        this.stream = is;
        this.counters = ctrs;
      }

      public void run() {
        BufferedReader r = null;

        try {
          r = new BufferedReader(new InputStreamReader(this.stream));

          // Actually do the read/write transfer loop here.
          int preambleLen = -1; // set to this for "undefined"
          while (true) {
            String inLine = r.readLine();
            if (null == inLine) {
              break; // EOF.
            }

            if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
              continue; // comments and empty lines are ignored
            }

            // this line is of the form "INSERT .. VALUES ( actual value text
            // );" strip the leading preamble up to the '(' and the trailing
            // ');'.
            if (preambleLen == -1) {
              // we haven't determined how long the preamble is. It's constant
              // across all lines, so just figure this out once.
              String recordStartMark = "VALUES (";
              preambleLen = inLine.indexOf(recordStartMark)
                  + recordStartMark.length();
            }

            // chop off the leading and trailing text as we write the
            // output to HDFS.
            int len = inLine.length() - 2 - preambleLen;
            context.write(inLine.substring(preambleLen, inLine.length() - 2)
                + "\n", null);
            counters.addBytes(1 + len);
          }
        } catch (IOException ioe) {
          LOG.error("IOException reading from mysqldump: " + ioe.toString());
          // flag this error so we get an error status back in the caller.
          setError();
        } catch (InterruptedException ie) {
          LOG.error("InterruptedException reading from mysqldump: "
              + ie.toString());
          // flag this error so we get an error status back in the caller.
          setError();
        } finally {
          if (null != r) {
            try {
              r.close();
            } catch (IOException ioe) {
              LOG.info("Error closing FIFO stream: " + ioe.toString());
            }
          }
        }
      }
    }
  }


  /**
   * The ReparsingAsyncSink will instantiate a RecordParser to read mysqldump's
   * output, and re-emit the text in the user's specified output format.
   */
  public static class ReparsingAsyncSink extends ErrorableAsyncSink {
    private final MySQLDumpMapper.Context context;
    private final Configuration conf;
    private final PerfCounters counters;

    protected ReparsingAsyncSink(final MySQLDumpMapper.Context c,
        final Configuration conf, final PerfCounters ctrs) {
      this.context = c;
      this.conf = conf;
      this.counters = ctrs;
    }

    public void processStream(InputStream is) {
      child = new ReparsingStreamThread(is, context, conf, counters);
      child.start();
    }

    private static class ReparsingStreamThread extends ErrorableThread {
      public static final Log LOG = LogFactory.getLog(
          ReparsingStreamThread.class.getName());

      private final MySQLDumpMapper.Context context;
      private final Configuration conf;
      private final InputStream stream;
      private final PerfCounters counters;

      ReparsingStreamThread(final InputStream is,
          final MySQLDumpMapper.Context c, Configuration conf,
          final PerfCounters ctrs) {
        this.context = c;
        this.conf = conf;
        this.stream = is;
        this.counters = ctrs;
      }

      private static final char MYSQL_FIELD_DELIM = ',';
      private static final char MYSQL_RECORD_DELIM = '\n';
      private static final char MYSQL_ENCLOSE_CHAR = '\'';
      private static final char MYSQL_ESCAPE_CHAR = '\\';
      private static final boolean MYSQL_ENCLOSE_REQUIRED = false;

      private static final RecordParser MYSQLDUMP_PARSER;

      static {
        // build a record parser for mysqldump's format
        MYSQLDUMP_PARSER = new RecordParser(DelimiterSet.MYSQL_DELIMITERS);
      }

      public void run() {
        BufferedReader r = null;

        try {
          r = new BufferedReader(new InputStreamReader(this.stream));

          // Configure the output with the user's delimiters.
          char outputFieldDelim = (char) conf.getInt(
              MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
              DelimiterSet.NULL_CHAR);
          String outputFieldDelimStr = "" + outputFieldDelim;
          char outputRecordDelim = (char) conf.getInt(
              MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
              DelimiterSet.NULL_CHAR);
          String outputRecordDelimStr = "" + outputRecordDelim;
          char outputEnclose = (char) conf.getInt(
              MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
              DelimiterSet.NULL_CHAR);
          char outputEscape = (char) conf.getInt(
              MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
              DelimiterSet.NULL_CHAR);
          boolean outputEncloseRequired = conf.getBoolean(
              MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY, false);

          DelimiterSet delimiters = new DelimiterSet(
             outputFieldDelim,
             outputRecordDelim,
             outputEnclose,
             outputEscape,
             outputEncloseRequired);

          // Actually do the read/write transfer loop here.
          int preambleLen = -1; // set to this for "undefined"
          while (true) {
            String inLine = r.readLine();
            if (null == inLine) {
              break; // EOF.
            }

            if (inLine.trim().length() == 0 || inLine.startsWith("--")) {
              continue; // comments and empty lines are ignored
            }

            // this line is of the form "INSERT .. VALUES ( actual value text
            // );" strip the leading preamble up to the '(' and the trailing
            // ');'.
            if (preambleLen == -1) {
              // we haven't determined how long the preamble is. It's constant
              // across all lines, so just figure this out once.
              String recordStartMark = "VALUES (";
              preambleLen = inLine.indexOf(recordStartMark)
                  + recordStartMark.length();
            }

            // Wrap the input string in a char buffer that ignores the leading
            // and trailing text.
            CharBuffer charbuf = CharBuffer.wrap(inLine, preambleLen,
                inLine.length() - 2);

            // Pass this along to the parser
            List fields = null;
            try {
              fields = MYSQLDUMP_PARSER.parseRecord(charbuf);
            } catch (RecordParser.ParseError pe) {
              LOG.warn("ParseError reading from mysqldump: "
                  + pe.toString() + "; record skipped");
              continue; // Skip emitting this row.
            }

            // For all of the output fields, emit them using the delimiters
            // the user chooses.
            boolean first = true;
            StringBuilder sb = new StringBuilder();
            int recordLen = 1; // for the delimiter.
            for (String field : fields) {
              if (!first) {
                sb.append(outputFieldDelimStr);
              } else {
                first = false;
              }

              String fieldStr = FieldFormatter.escapeAndEnclose(field,
                  delimiters);
              sb.append(fieldStr);
              recordLen += fieldStr.length();
            }

            sb.append(outputRecordDelimStr);
            context.write(sb.toString(), null);
            counters.addBytes(recordLen);
          }
        } catch (IOException ioe) {
          LOG.error("IOException reading from mysqldump: " + ioe.toString());
          // flag this error so the parent can handle it appropriately.
          setError();
        } catch (InterruptedException ie) {
          LOG.error("InterruptedException reading from mysqldump: "
              + ie.toString());
          // flag this error so we get an error status back in the caller.
          setError();
        } finally {
          if (null != r) {
            try {
              r.close();
            } catch (IOException ioe) {
              LOG.info("Error closing FIFO stream: " + ioe.toString());
            }
          }
        }
      }
    }
  }

  // TODO(aaron): Refactor this method to be much shorter.
  // CHECKSTYLE:OFF
  /**
   * Import the table into HDFS by using mysqldump to pull out the data from
   * the database and upload the files directly to HDFS.
   */
  public void map(String splitConditions, NullWritable val, Context context)
      throws IOException, InterruptedException {

    LOG.info("Beginning mysqldump fast path import");

    ArrayList args = new ArrayList();
    String tableName = conf.get(MySQLUtils.TABLE_NAME_KEY);

    // We need to parse the connect string URI to determine the database name.
    // Using java.net.URL directly on the connect string will fail because
    // Java doesn't respect arbitrary JDBC-based schemes. So we chop off the
    // scheme (everything before '://') and replace it with 'http', which we
    // know will work.
    String connectString = conf.get(MySQLUtils.CONNECT_STRING_KEY);
    String databaseName = JdbcUrl.getDatabaseName(connectString);
    String hostname = JdbcUrl.getHostName(connectString);
    int port = JdbcUrl.getPort(connectString);

    if (null == databaseName) {
      throw new IOException("Could not determine database name");
    }

    LOG.info("Performing import of table " + tableName + " from database "
        + databaseName);

    args.add(MySQLUtils.MYSQL_DUMP_CMD); // requires that this is on the path.

    String password = conf.get(MySQLUtils.PASSWORD_KEY);
    String passwordFile = null;

    Process p = null;
    AsyncSink sink = null;
    AsyncSink errSink = null;
    PerfCounters counters = new PerfCounters();
    try {
      // --defaults-file must be the first argument.
      if (null != password && password.length() > 0) {
        passwordFile = MySQLUtils.writePasswordFile(conf);
        args.add("--defaults-file=" + passwordFile);
      }

      // Don't use the --where="" version because spaces in it can
      // confuse Java, and adding in surrounding quotes confuses Java as well.
      String whereClause = conf.get(MySQLUtils.WHERE_CLAUSE_KEY, "(1=1)")
          + " AND (" + splitConditions + ")";
      args.add("-w");
      args.add(whereClause);

      args.add("--host=" + hostname);
      if (-1 != port) {
        args.add("--port=" + Integer.toString(port));
      }
      args.add("--skip-opt");
      args.add("--compact");
      args.add("--no-create-db");
      args.add("--no-create-info");
      args.add("--quick"); // no buffering
      args.add("--single-transaction");

      String username = conf.get(MySQLUtils.USERNAME_KEY);
      if (null != username) {
        args.add("--user=" + username);
      }

      // If the user supplied extra args, add them here.
      String [] extra = conf.getStrings(MySQLUtils.EXTRA_ARGS_KEY);
      if (null != extra) {
        for (String arg : extra) {
          args.add(arg);
        }
      }

      args.add(databaseName);
      args.add(tableName);

      // begin the import in an external process.
      LOG.debug("Starting mysqldump with arguments:");
      for (String arg : args) {
        LOG.debug("  " + arg);
      }

      // Actually start the mysqldump.
      p = Runtime.getRuntime().exec(args.toArray(new String[0]));

      // read from the stdout pipe into the HDFS writer.
      InputStream is = p.getInputStream();

      if (MySQLUtils.outputDelimsAreMySQL(conf)) {
        LOG.debug("Output delimiters conform to mysqldump; "
            + "using straight copy");
        sink = new CopyingAsyncSink(context, counters);
      } else {
        LOG.debug("User-specified delimiters; using reparsing import");
        LOG.info("Converting data to use specified delimiters.");
        LOG.info("(For the fastest possible import, use");
        LOG.info("--mysql-delimiters to specify the same field");
        LOG.info("delimiters as are used by mysqldump.)");
        sink = new ReparsingAsyncSink(context, conf, counters);
      }

      // Start an async thread to read and upload the whole stream.
      counters.startClock();
      sink.processStream(is);

      // Start an async thread to send stderr to log4j.
      errSink = new LoggingAsyncSink(LOG);
      errSink.processStream(p.getErrorStream());
    } finally {

      // block until the process is done.
      int result = 0;
      if (null != p) {
        while (true) {
          try {
            result = p.waitFor();
          } catch (InterruptedException ie) {
            // interrupted; loop around.
            continue;
          }

          break;
        }
      }

      // Remove the password file.
      if (null != passwordFile) {
        if (!new File(passwordFile).delete()) {
          LOG.error("Could not remove mysql password file " + passwordFile);
          LOG.error("You should remove this file to protect your credentials.");
        }
      }

      // block until the stream sink is done too.
      int streamResult = 0;
      if (null != sink) {
        while (true) {
          try {
            streamResult = sink.join();
          } catch (InterruptedException ie) {
            // interrupted; loop around.
            continue;
          }

          break;
        }
      }

      // Try to wait for stderr to finish, but regard any errors as advisory.
      if (null != errSink) {
        try {
          if (0 != errSink.join()) {
            LOG.info("Encountered exception reading stderr stream");
          }
        } catch (InterruptedException ie) {
          LOG.info("Thread interrupted waiting for stderr to complete: "
              + ie.toString());
        }
      }

      LOG.info("Transfer loop complete.");

      if (0 != result) {
        throw new IOException("mysqldump terminated with status "
            + Integer.toString(result));
      }

      if (0 != streamResult) {
        throw new IOException("Encountered exception in stream sink");
      }

      counters.stopClock();
      LOG.info("Transferred " + counters.toString());
    }
  }
  // CHECKSTYLE:ON

  @Override
  protected void setup(Context context)
    throws IOException, InterruptedException {
    super.setup(context);
    this.conf = context.getConfiguration();
  }
}

Advertisements
Categories: Sqoop Tags: ,

Interesting paper on accelerating Hadoop via Network Levitated Merge

Categories: Hadoop, Paper Tags: ,

Hadoop YARN Event design

In Hadoop YARN, different events are used. They all extend the abstract class AbstractEvent which implements the Event interface. There is also an interface for EventHandler. You can build the logic of handling a specific event by implementing the EventHandler interface.

Image 

Hadoop YARN Event Bus

Hadoop YARN uses an event bus to centralize the dispatching of various events and handling of events by different declared event handlers. This means less plumbing codes to write and maintain. Also, it decouples the event handling logics from different interacting components into one place.

AsyncDispatcher is such event bus. It uses a linkedBlockingQueue to queue up incoming events. Also, it has an internal thread, called eventHandlingThread to listen on the above queue and take event out for processing by calling the dispatch(event) method. In this method, it basically retrieves the corresponding event handler to handle the event accordingly.

AsyncDispatcherUML

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.event;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;

/**
 * Dispatches events in a separate thread. Currently only single thread does
 * that. Potentially there could be multiple channels for each event type
 * class and a thread pool can be used to dispatch the events.
 */
@SuppressWarnings("rawtypes")
public class AsyncDispatcher extends AbstractService implements Dispatcher {

  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);

  private final BlockingQueue eventQueue;
  private volatile boolean stopped = false;

  private Thread eventHandlingThread;
  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
  private boolean exitOnDispatchException;

  public AsyncDispatcher() {
    this(new LinkedBlockingQueue());
  }

  public AsyncDispatcher(BlockingQueue eventQueue) {
    super("Dispatcher");
    this.eventQueue = eventQueue;
    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
  }

  Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);
          }
        }
      }
    };
  }

  @Override
  public synchronized void init(Configuration conf) {
    this.exitOnDispatchException =
        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
    super.init(conf);
  }

  @Override
  public void start() {
    //start all the components
    super.start();
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName("AsyncDispatcher event handler");
    eventHandlingThread.start();
  }

  @Override
  public void stop() {
    stopped = true;
    if (eventHandlingThread != null) {
      eventHandlingThread.interrupt();
      try {
        eventHandlingThread.join();
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted Exception while stopping", ie);
      }
    }

    // stop all the components
    super.stop();
  }

  @SuppressWarnings("unchecked")
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    }
    catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
  }

  @SuppressWarnings("unchecked")
  @Override
  public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler registeredHandler = (EventHandler)
    eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

  @Override
  public EventHandler getEventHandler() {
    return new GenericEventHandler();
  }

  class GenericEventHandler implements EventHandler {
    public void handle(Event event) {
      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize !=0 && qSize %1000 == 0) {
        LOG.info("Size of event-queue is " + qSize);
      }
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      }
      try {
        eventQueue.put(event);
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        }
        throw new YarnException(e);
      }
    };
  }

  /**
   * Multiplexing an event. Sending it to different handlers that
   * are interested in the event.
   * @param  the type of event these multiple handlers are interested in.
   */
  static class MultiListenerHandler implements EventHandler {
    List<EventHandler> listofHandlers;

    public MultiListenerHandler() {
      listofHandlers = new ArrayList<EventHandler>();
    }

    @Override
    public void handle(Event event) {
      for (EventHandler handler: listofHandlers) {
        handler.handle(event);
      }
    }

    void addHandler(EventHandler handler) {
      listofHandlers.add(handler);
    }

  }
}

Hadoop YARN RPC (part I)

I have spent some time digging into YARN RPC source codes. Personally, I like the use of Factory pattern to inject different RPC proxy client protocol and server implementations to the framework. It looks way cleaner and better compared to the older versions of Hadoop.

For example: RpcServerFactoryPBImpl is the implementation of RpcServerFactory interface to create Protobuf RPC Server. Basically, it delegates the creation using the standard Hadoop RPC class.

Look at the following snippet in RpcServerFactoryPBImpl,

private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, 
      SecretManager<? extends TokenIdentifier> secretManager, int numHandlers, 
      BlockingService blockingService, String portRangeConfig) throws IOException {
    RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
    RPC.Server server = RPC.getServer(pbProtocol, blockingService, 
        addr.getHostName(), addr.getPort(), numHandlers, false, conf, 
        secretManager, portRangeConfig);
    LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
    server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
    return server;
  }

We could create a new type of RPC Server Factory called RpcServerFactoryMyOwnImpl that implements the above createServer method which would return our own RPC Server.

In YARN framework, HadoopYarnProtoRPC is the class uses these factories.
Basically, it calls RpcFactoryProvider.getServerFactory(conf) to get the right RpcServerFactory implementation.


public class HadoopYarnProtoRPC extends YarnRPC {

  private static final Log LOG = LogFactory.getLog(HadoopYarnProtoRPC.class);

  @Override
  public Object getProxy(Class protocol, InetSocketAddress addr,
      Configuration conf) {
    LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol " + protocol);
    return RpcFactoryProvider.getClientFactory(conf).getClient(protocol, 1,
        addr, conf);
  }

  @Override
  public void stopProxy(Object proxy, Configuration conf) {
    RpcFactoryProvider.getClientFactory(conf).stopClient(proxy);
  }

  @Override
  public Server getServer(Class protocol, Object instance,
      InetSocketAddress addr, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      int numHandlers, String portRangeConfig) {
    LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol + 
        " with " + numHandlers + " handlers");
    
    return RpcFactoryProvider.getServerFactory(conf).getServer(protocol, 
        instance, addr, conf, secretManager, numHandlers, portRangeConfig);

  }

}

To be continued…

Categories: Hadoop, Hadoop Yarn Tags: ,

Dive into HTable internals

HTable is the class we can use to interact with a HBase table. Internally, it contains an instance of HConnection. This connection is returned by calling

HConnectionManager.getConnection(Configuration conf);

HConnectionManager is a Singleton non-instantiable class that manages HConnections. If you take a look at this HConnectionManager, there is a LinkedHashMap<Configuration, HConnectionImplementation> called HBASE_INSTANCES. It provides the mapping between Configuration and HConnection. This is to ensure that the same Configuration object would result in the same HConnection being returned by HConnectionManager. Different HTable clients would end up using the same HConnection if they were using the same Configuration. This allows they all to share the same cache of region locations information and avoid the repetitive region locations discovery. Also, the same zookeeper watcher/tracker could be reused.

The implementation of the HConnection is called HConnectionImplementation. When HConnection is instantiated, it setup zookeeper trackers/watchers on HBase master location/address and the .ROOT. region.

HConnectionImplementation provides RPC connection to HMaster, -ROOT- and .META. hosting servers. In Hadoop/HBase, RPC communication is implemented using java dynamic proxy pattern. For example, HMasterInterface acts as the proxy to call remote method on HBase master server.

To understand the underlying mechanism of RPC invocation involved in HTable, take a look at the put(Put put) method.

public void put(final Put put) throws IOException { doPut(Arrays.asList(put)); }

It will in turn call processBatchCallback method in HConnectionImplementation.

public &amp;amp;lt;R&amp;amp;gt; void processBatchCallback(
      List&amp;amp;lt;? extends Row&amp;amp;gt; list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback&amp;amp;lt;R&amp;amp;gt; callback)
    throws IOException, InterruptedException

In this method, given a list of Put operations, basically it groups them by the region servers. All those Put operations destined for the same regionserver would be grouped together so that they could be dispatched together to the same region server. It uses the following hashmap to maintain the groupings.

Map&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; actionsByServer = new HashMap&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt;();

It groups all the actions/operations by the HServerAddress. HServerAddress is a “label” for a HBase server made of host and port number.

Iterating through the map, it then fires up the requests to the corresponding region servers by creating Callable to be executed on the executor thread pool

for (Entry&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; e : actionsByServer.entrySet()) {
 futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
 }

Let’s look at the createCallable method

private &amp;amp;lt;R&amp;amp;gt; Callable&amp;amp;lt;MultiResponse&amp;amp;gt; createCallable(final HRegionLocation loc,
        final MultiAction&amp;amp;lt;R&amp;amp;gt; multi, final byte [] tableName) {
      final HConnection connection = this;
      return new Callable&amp;amp;lt;MultiResponse&amp;amp;gt;() {
       public MultiResponse call() throws IOException {
         return getRegionServerWithoutRetries(
             new ServerCallable&amp;amp;lt;MultiResponse&amp;amp;gt;(connection, tableName, null) {
               public MultiResponse call() throws IOException {
                 return server.multi(multi);
               }
               @Override
               public void connect(boolean reload) throws IOException {
                 server =
                   connection.getHRegionConnection(loc.getHostname(), loc.getPort());
               }
             }
         );
       }
     };
   }

An anonymous Callable class with the implemented call() method is returned by the above method and will be executed in the executor thread pool. In the implemented call method, the RPC client proxy, HRegionInterface server makes the remote RPC method call, server.multi(multi), marshalling the method and params to the remote server. The connect(boolean reload) method is to establish RPC connection to the region server.

server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());

In the getHRegionConnection method, it uses HBaseRPC to create a dynamic proxy to the region server.

server = (HRegionInterface) HBaseRPC.waitForProxy(
                  serverInterfaceClass, HRegionInterface.VERSION,
                  address, this.conf,
                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);

Stay tuned for my next post about HBaseRPC internals. We will dive into the mechanic of RPC call implementation in HBase.

TaskTracker

TaskTracker is essentially a Thread running on a node. Internally it launches two inner TaskLauncher threads called mapLauncher and reduceLauncher. TaskLauncher thread uses a List to keep track of assigned tasks and starts the task in another TaskRunner thread. TaskRunner thread will then launch a JVM process for the task.

If we dig inside the codes deeper, you will find that TaskRunner contains  a JVMmanager instance. Inside JVMmanager, it contains two different JvmManagerForType instances, one for map tasks (mapJvmManager) and another for reduce tasks (reduceJvmManager).  Each is responsible of spawning JvmRunner thread which will actually launch a JVM process.