Hadoop YARN Event Bus

Hadoop YARN uses an event bus to centralize the dispatching of various events and handling of events by different declared event handlers. This means less plumbing codes to write and maintain. Also, it decouples the event handling logics from different interacting components into one place.

AsyncDispatcher is such event bus. It uses a linkedBlockingQueue to queue up incoming events. Also, it has an internal thread, called eventHandlingThread to listen on the above queue and take event out for processing by calling the dispatch(event) method. In this method, it basically retrieves the corresponding event handler to handle the event accordingly.


package org.apache.hadoop.yarn.event;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;

 * Dispatches events in a separate thread. Currently only single thread does
 * that. Potentially there could be multiple channels for each event type
 * class and a thread pool can be used to dispatch the events.
public class AsyncDispatcher extends AbstractService implements Dispatcher {

  private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class);

  private final BlockingQueue eventQueue;
  private volatile boolean stopped = false;

  private Thread eventHandlingThread;
  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
  private boolean exitOnDispatchException;

  public AsyncDispatcher() {
    this(new LinkedBlockingQueue());

  public AsyncDispatcher(BlockingQueue eventQueue) {
    this.eventQueue = eventQueue;
    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();

  Runnable createThread() {
    return new Runnable() {
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
          if (event != null) {

  public synchronized void init(Configuration conf) {
    this.exitOnDispatchException =

  public void start() {
    //start all the components
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName("AsyncDispatcher event handler");

  public void stop() {
    stopped = true;
    if (eventHandlingThread != null) {
      try {
      } catch (InterruptedException ie) {
        LOG.warn("Interrupted Exception while stopping", ie);

    // stop all the components

  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());

    Class<? extends Enum> type = event.getType().getDeclaringClass();

      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
      } else {
        throw new Exception("No handler for registered for " + type);
    catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
        LOG.info("Exiting, bbye..");

  public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler registeredHandler = (EventHandler)
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;

  public EventHandler getEventHandler() {
    return new GenericEventHandler();

  class GenericEventHandler implements EventHandler {
    public void handle(Event event) {
      /* all this method does is enqueue all the events onto the queue */
      int qSize = eventQueue.size();
      if (qSize !=0 && qSize %1000 == 0) {
        LOG.info("Size of event-queue is " + qSize);
      int remCapacity = eventQueue.remainingCapacity();
      if (remCapacity < 1000) {
        LOG.warn("Very low remaining capacity in the event-queue: "
            + remCapacity);
      try {
      } catch (InterruptedException e) {
        if (!stopped) {
          LOG.warn("AsyncDispatcher thread interrupted", e);
        throw new YarnException(e);

   * Multiplexing an event. Sending it to different handlers that
   * are interested in the event.
   * @param  the type of event these multiple handlers are interested in.
  static class MultiListenerHandler implements EventHandler {
    List<EventHandler> listofHandlers;

    public MultiListenerHandler() {
      listofHandlers = new ArrayList<EventHandler>();

    public void handle(Event event) {
      for (EventHandler handler: listofHandlers) {

    void addHandler(EventHandler handler) {

