Archive for the ‘Uncategorized’ Category

g2.8xlarge does not support NVIDIA GPUDirect peer to peer transfers

Go to


run make and then execute


[./simpleP2P] – Starting…
Checking for multiple GPUs…
CUDA-capable device count: 4
> GPU0 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)
> GPU1 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)
> GPU2 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)
> GPU3 = ” GRID K520″ IS capable of Peer-to-Peer (P2P)

Checking GPU(s) for support of peer to peer memory access…
> Peer access from GRID K520 (GPU0) -> GRID K520 (GPU1) : No
> Peer access from GRID K520 (GPU0) -> GRID K520 (GPU2) : No
> Peer access from GRID K520 (GPU0) -> GRID K520 (GPU3) : No
> Peer access from GRID K520 (GPU1) -> GRID K520 (GPU0) : No
> Peer access from GRID K520 (GPU1) -> GRID K520 (GPU2) : No
> Peer access from GRID K520 (GPU1) -> GRID K520 (GPU3) : No
> Peer access from GRID K520 (GPU2) -> GRID K520 (GPU0) : No
> Peer access from GRID K520 (GPU2) -> GRID K520 (GPU1) : No
> Peer access from GRID K520 (GPU2) -> GRID K520 (GPU3) : No
> Peer access from GRID K520 (GPU3) -> GRID K520 (GPU0) : No
> Peer access from GRID K520 (GPU3) -> GRID K520 (GPU1) : No
> Peer access from GRID K520 (GPU3) -> GRID K520 (GPU2) : No
Two or more GPUs with SM 2.0 or higher capability are required for ./simpleP2P.
Peer to Peer access is not available amongst GPUs in the system, waiving test.

As you can see, g2.8xlarge does not support NVIDIA GPUDirect peer to peer transfers. Only P2 instances have the support.

Categories: Uncategorized

Hadoop YARN Event design

In Hadoop YARN, different events are used. They all extend the abstract class AbstractEvent which implements the Event interface. There is also an interface for EventHandler. You can build the logic of handling a specific event by implementing the EventHandler interface.


Hive ORCFile format

Here is a great presentation about new Hive ORC file format (Optimized Record Columnar file)

If you check out Hive 0.11, you can find the ORCFile codes in this package


As for information about the RCFile (Record Columnar File), you can take a look at the following paper.


Categories: Hive, Uncategorized Tags: , ,

Hadoop YARN Event Bus

Hadoop YARN uses an event bus to centralize the dispatching of various events and handling of events by different declared event handlers. This means less plumbing codes to write and maintain. Also, it decouples the event handling logics from different interacting components into one place.

AsyncDispatcher is such event bus. It uses a linkedBlockingQueue to queue up incoming events. Also, it has an internal thread, called eventHandlingThread to listen on the above queue and take event out for processing by calling the dispatch(event) method. In this method, it basically retrieves the corresponding event handler to handle the event accordingly.


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.hadoop.yarn.event;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;

 * Dispatches events in a separate thread. Currently only single thread does
 * that. Potentially there could be multiple channels for each event type
 * class and a thread pool can be used to dispatch the events.
public class AsyncDispatcher extends AbstractService implements Dispatcher {

  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);

  private final BlockingQueue eventQueue;
  private volatile boolean stopped = false;

  private Thread eventHandlingThread;
  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
  private boolean exitOnDispatchException;

  public AsyncDispatcher() {
    this(new LinkedBlockingQueue());

  public AsyncDispatcher(BlockingQueue eventQueue) {
    this.eventQueue = eventQueue;
    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();

  Runnable createThread() {
    return new Runnable() {
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
          if (event != null) {

  public synchronized void init(Configuration conf) {
    this.exitOnDispatchException =

  public void start() {
    //start all the components
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName("AsyncDispatcher event handler");

  public void stop() {
    stopped = true;
    if (eventHandlingThread != null) {
      try {
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted Exception while stopping", ie);

    // stop all the components

  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());

    Class<? extends Enum> type = event.getType().getDeclaringClass();

      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
      } else {
        throw new Exception("No handler for registered for " + type);
    catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false) {"Exiting, bbye..");

  public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler registeredHandler = (EventHandler)
    eventDispatchers.get(eventType);"Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;

  public EventHandler getEventHandler() {
    return new GenericEventHandler();

  class GenericEventHandler implements EventHandler {
    public void handle(Event event) {
      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize !=0 && qSize %1000 == 0) {"Size of event-queue is " + qSize);
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      try {
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        throw new YarnException(e);

   * Multiplexing an event. Sending it to different handlers that
   * are interested in the event.
   * @param  the type of event these multiple handlers are interested in.
  static class MultiListenerHandler implements EventHandler {
    List<EventHandler> listofHandlers;

    public MultiListenerHandler() {
      listofHandlers = new ArrayList<EventHandler>();

    public void handle(Event event) {
      for (EventHandler handler: listofHandlers) {

    void addHandler(EventHandler handler) {


Hadoop RPC

There are two RPC servers (org.apache.hadoop.ipc.Server) inside NameNode. One is mainly for communication with clients while the other is for communication between Datanodes, BackupNode and NameNode.

RPC server has a Listener thread listening for incoming connection. Listener thread creates a server socket in non blocking mode and uses Selector to listen for connection accept event. At the same time, it instantiates an array of Reader threads. Each reader thread is given a read Selector. This allows multiple reader threads to read incoming remote RPC method calls.

Server start method: start the Listener thread, responder thread, and an array of Handler threads

public synchronized void start() {
    handlers = new Handler[handlerCount];
    for (int i = 0; i &amp;lt; handlerCount; i++) {
      handlers[i] = new Handler(i);

In Listener thread constructor, configure ServerSocketChannel to listen for new incoming connection and
initialize an array of Reader threads with registered separate read Selector

public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel =;

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      readers = new Reader[readThreads];
      for (int i = 0; i &amp;lt; readThreads; i++) {
        Reader reader = new Reader(
            &amp;quot;Socket Reader #&amp;quot; + (i + 1) + &amp;quot; for port &amp;quot; + port);
        readers[i] = reader;

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName(&amp;quot;IPC Server listener on &amp;quot; + port);

In the Listener thread run method, if there is any event detected, it will poll the selectedKeys from the Selector and figure out whether it is a connection accepted.

 public void run() { + ": starting");
      while (running) {
        SelectionKey key = null;
        try {
          Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
          while (iter.hasNext()) {
            key =;
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
            } catch (IOException e) {
            key = null;
        } catch (OutOfMemoryError e) {
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give 
          // some thread(s) a chance to finish
          LOG.warn("Out of Memory in server select", e);
          closeCurrentConnection(key, e);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (Exception e) {
          closeCurrentConnection(key, e);
      }"Stopping " + this.getName());

      synchronized (this) {
        try {
        } catch (IOException e) { }

        selector= null;
        acceptChannel= null;
        // clean up all connections
        while (!connectionList.isEmpty()) {

When there is new incoming network connection, the following doAccept method will be called and
one Reader object will be selected (round robin) and new channel with Read Selector will be registered to listen for read event.

void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel);
      SocketChannel channel;
      while ((channel = server.accept()) != null) {

        Reader reader = getReader();
        try {
          SelectionKey readKey = reader.registerChannel(channel);
          c = new Connection(readKey, channel, System.currentTimeMillis());
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
          if (LOG.isDebugEnabled())
            LOG.debug("Server connection from " + c.toString() +
                "; # active connections: " + numConnections +
                "; # queued calls: " + callQueue.size());          
        } finally {

When a reader receives a read event from the read Selector, the following method is called.

 void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();
      if (c == null) {
      try {
        count = c.readAndProcess();
      } catch (InterruptedException ieo) { + ": readAndProcess caught InterruptedException", ieo);
        throw ieo;
      } catch (Exception e) { + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
        count = -1; //so that the (count < 0) block is executed
      if (count < 0) {
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + ": disconnecting client " + 
                    c.getHostAddress() + ". Number of active connections: "+
        c = null;
      else {

The use of multiple Reader threads with separate read Selectors is an improvement introduced in version Hadoop 0.21.0. Prior to 0.21.0, the same accept Selector object that is used for listening for connection is also used as read Selector among all other threads. As a result, all the threads are sharing a single Selector object. To learn more, please refer to the ticket

Whenever a new connection is established, a reader thread will be ready to read data (method call) from client. After receiving the input bytes from client, it reads the bytes data to figure out the method name and required params. Once it has the method name/params, it goes ahead to create Call object encapsulating the method name/params and enqueue it into Call queue.

Multiple Handler threads are waiting on the above Call queue (BlockingQueue callQueue) and poll Call for processing.

The next question is can we do even better ?

HBase RPC basically is a copy of Hadoop RPC. It retains most of the Hadoop RPC codes.

Categories: Hadoop, Uncategorized

IPC HBaseClient

HBaseClient (org.apache.hadoop.hbase.ipc.HBaseClient)
In the previous post, we look at how the HBaseRPC client proxy intercepts method call via Invoker (an InvocationHandler implementation) and marshalls the method call and arguments via HBaseClient.

Now, let’s dive into HBaseClient source code.

Whenever the client proxy makes a method call, the invoker which implements the InvocationHandler will intercept the call and will delegate it to HBaseClient. HBaseClient will get the corresponding Connection thread from a Hashtable that stores the mapping of ConnectionId and Connection.
The connection will then sends the data to the remote server via a socket DataOutputStream. In the same time, it also stores the Call in a ConcurrentSkipListMap. Connection thread uses producer and consumer pattern and wait on the availability of Call in ConcurrentSkipListMap. Once it receives response from the socket DataInputStream, it will then remove the corresponding Call from the ConcurrentSkipListMap and set the Call’s done variable to true.

In the Invoker’s invoke method, the following HBaseClient method will be called. In this method, it first gets the corresponding Connection thread and sends the Call. It will then wait till the Call is completed. (wait on Call’s done variable)

public Writable call(Writable param, InetSocketAddress addr,
                       Class<? extends VersionedProtocol> protocol,
                       User ticket, int rpcTimeout)
      throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);
    connection.sendParam(call);                 // send the parameter
    boolean interrupted = false;
    //noinspection SynchronizationOnLocalVariableOrMethodParameter
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ignored) {
          // save the fact that we were interrupted
          interrupted = true;

      if (interrupted) {
        // set the interrupt flag now that we are done waiting

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          throw call.error;
        // local exception
        throw wrapException(addr, call.error);
      return call.value;
Categories: Hadoop, Uncategorized

Dive into HTable internals

HTable is the class we can use to interact with a HBase table. Internally, it contains an instance of HConnection. This connection is returned by calling

HConnectionManager.getConnection(Configuration conf);

HConnectionManager is a Singleton non-instantiable class that manages HConnections. If you take a look at this HConnectionManager, there is a LinkedHashMap<Configuration, HConnectionImplementation> called HBASE_INSTANCES. It provides the mapping between Configuration and HConnection. This is to ensure that the same Configuration object would result in the same HConnection being returned by HConnectionManager. Different HTable clients would end up using the same HConnection if they were using the same Configuration. This allows they all to share the same cache of region locations information and avoid the repetitive region locations discovery. Also, the same zookeeper watcher/tracker could be reused.

The implementation of the HConnection is called HConnectionImplementation. When HConnection is instantiated, it setup zookeeper trackers/watchers on HBase master location/address and the .ROOT. region.

HConnectionImplementation provides RPC connection to HMaster, -ROOT- and .META. hosting servers. In Hadoop/HBase, RPC communication is implemented using java dynamic proxy pattern. For example, HMasterInterface acts as the proxy to call remote method on HBase master server.

To understand the underlying mechanism of RPC invocation involved in HTable, take a look at the put(Put put) method.

public void put(final Put put) throws IOException { doPut(Arrays.asList(put)); }

It will in turn call processBatchCallback method in HConnectionImplementation.

public &amp;amp;lt;R&amp;amp;gt; void processBatchCallback(
      List&amp;amp;lt;? extends Row&amp;amp;gt; list,
        byte[] tableName,
        ExecutorService pool,
        Object[] results,
        Batch.Callback&amp;amp;lt;R&amp;amp;gt; callback)
    throws IOException, InterruptedException

In this method, given a list of Put operations, basically it groups them by the region servers. All those Put operations destined for the same regionserver would be grouped together so that they could be dispatched together to the same region server. It uses the following hashmap to maintain the groupings.

Map&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; actionsByServer = new HashMap&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt;();

It groups all the actions/operations by the HServerAddress. HServerAddress is a “label” for a HBase server made of host and port number.

Iterating through the map, it then fires up the requests to the corresponding region servers by creating Callable to be executed on the executor thread pool

for (Entry&amp;amp;lt;HServerAddress, MultiAction&amp;amp;gt; e : actionsByServer.entrySet()) {
 futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));

Let’s look at the createCallable method

private &amp;amp;lt;R&amp;amp;gt; Callable&amp;amp;lt;MultiResponse&amp;amp;gt; createCallable(final HRegionLocation loc,
        final MultiAction&amp;amp;lt;R&amp;amp;gt; multi, final byte [] tableName) {
      final HConnection connection = this;
      return new Callable&amp;amp;lt;MultiResponse&amp;amp;gt;() {
       public MultiResponse call() throws IOException {
         return getRegionServerWithoutRetries(
             new ServerCallable&amp;amp;lt;MultiResponse&amp;amp;gt;(connection, tableName, null) {
               public MultiResponse call() throws IOException {
                 return server.multi(multi);
               public void connect(boolean reload) throws IOException {
                 server =
                   connection.getHRegionConnection(loc.getHostname(), loc.getPort());

An anonymous Callable class with the implemented call() method is returned by the above method and will be executed in the executor thread pool. In the implemented call method, the RPC client proxy, HRegionInterface server makes the remote RPC method call, server.multi(multi), marshalling the method and params to the remote server. The connect(boolean reload) method is to establish RPC connection to the region server.

server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());

In the getHRegionConnection method, it uses HBaseRPC to create a dynamic proxy to the region server.

server = (HRegionInterface) HBaseRPC.waitForProxy(
                  serverInterfaceClass, HRegionInterface.VERSION,
                  address, this.conf,
                  this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);

Stay tuned for my next post about HBaseRPC internals. We will dive into the mechanic of RPC call implementation in HBase.