Home > HBase > HBase Custom Filter

HBase Custom Filter

There are many different scan filters one could use in HBase. But, sometimes you simply need to build your own custom filter since none would satisfy your needs. With HBase 0.94.7, one could build his/her custom filter to be dynamically loaded by HBase without restarting the cluster.

It allows us to put our custom filter packaged as a jar to be placed in the following specified folder

${hbase.local.dir}/jars/

If you want to understand the mechanic of how HBase accomplishes this, you could look at the following classes in org.apache.hadoop.hbase.util package.

Basically, custom filters can be dropped into a pre-configured folder (hbase.dynamic.jars.dir), which can be in hdfs. In this way, region servers can pick them up dynamically, without the need to restart the cluster for the new filters to take effect.

Classes.java, ClassLoaderBase.java, DynamicClassLoader.java

Look at the createWritableForName(String className) in the Classes.java.  A very common way of instantiating class via reflection.


@SuppressWarnings("unchecked")
public static Filter createWritableForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>) Class.forName(className, true, CLASS_LOADER);
return (Filter)WritableFactories.newInstance(clazz, new Configuration());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);
}
}

/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.io.WritableFactories;

/**
* Utilities for class manipulation.
*/
public class Classes {

/**
* Dynamic class loader to load filter/comparators
*/
private final static ClassLoader CLASS_LOADER;

static {
ClassLoader parent = Classes.class.getClassLoader();
Configuration conf = HBaseConfiguration.create();
CLASS_LOADER = new DynamicClassLoader(conf, parent);
}

/**
* Equivalent of {@link Class#forName(String)} which also returns classes for
* primitives like <code>boolean</code>, etc.
*
* @param className
*          The name of the class to retrieve. Can be either a normal class or
*          a primitive class.
* @return The class specified by <code>className</code>
* @throws ClassNotFoundException
*           If the requested class can not be found.
*/
public static Class<?> extendedForName(String className)
throws ClassNotFoundException {
Class<?> valueType;
if (className.equals("boolean")) {
valueType = boolean.class;
} else if (className.equals("byte")) {
valueType = byte.class;
} else if (className.equals("short")) {
valueType = short.class;
} else if (className.equals("int")) {
valueType = int.class;
} else if (className.equals("long")) {
valueType = long.class;
} else if (className.equals("float")) {
valueType = float.class;
} else if (className.equals("double")) {
valueType = double.class;
} else if (className.equals("char")) {
valueType = char.class;
} else {
valueType = Class.forName(className);
}
return valueType;
}

@SuppressWarnings("rawtypes")
public static String stringify(Class[] classes) {
StringBuilder buf = new StringBuilder();
if (classes != null) {
for (Class c : classes) {
if (buf.length() > 0) {
buf.append(",");
}
buf.append(c.getName());
}
} else {
buf.append("NULL");
}
return buf.toString();
}

/**
* Used to dynamically load a filter class, and create a Writable filter.
* This filter class most likely extends Configurable.
*
* @param className the filter class name.
* @return a filter
*/
@SuppressWarnings("unchecked")
public static Filter createWritableForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>) Class.forName(className, true, CLASS_LOADER);
return (Filter)WritableFactories.newInstance(clazz, new Configuration());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);
}
}

/**
* This method is almost the same as #createWritableForName, except
* that this one doesn't expect the filter class to extends Configurable.
*
* @param className the filter class name.
* @return a filter
*/
@SuppressWarnings("unchecked")
public static Filter createForName(String className) {
try {
Class<? extends Filter> clazz =
(Class<? extends Filter>)Class.forName(className, true, CLASS_LOADER);
return (Filter)clazz.newInstance();
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can't find class " + className);
} catch (InstantiationException e) {
throw new RuntimeException("Couldn't instantiate " + className, e);
} catch (IllegalAccessException e) {
throw new RuntimeException("No access to " + className, e);
}
}
}

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.classification.InterfaceAudience;

import com.google.common.base.Preconditions;

/**
* Base class loader that defines couple shared constants used
* by sub-classes. It also defined method getClassLoadingLock for parallel
* class loading and JDK 1.6 support. This method (getClassLoadingLock)
* is similar to the same method in the base class Java ClassLoader
* introduced in JDK 1.7, but not in JDK 1.6.
*/
@InterfaceAudience.Private
public class ClassLoaderBase extends URLClassLoader {

// Maps class name to the corresponding lock object
private final ConcurrentHashMap<String, Object> parallelLockMap
= new ConcurrentHashMap<String, Object>();

protected static final String DEFAULT_LOCAL_DIR = "/tmp/hbase-local-dir";
protected static final String LOCAL_DIR_KEY = "hbase.local.dir";

/**
* Parent class loader.
*/
protected final ClassLoader parent;

/**
* Creates a DynamicClassLoader that can load classes dynamically
* from jar files under a specific folder.
*
* @param parent the parent ClassLoader to set.
*/
public ClassLoaderBase(final ClassLoader parent) {
super(new URL[]{}, parent);
Preconditions.checkNotNull(parent, "No parent classloader!");
this.parent = parent;
}

/**
* Returns the lock object for class loading operations.
*/
protected Object getClassLoadingLock(String className) {
Object lock = parallelLockMap.get(className);
if (lock != null) {
return lock;
}

Object newLock = new Object();
lock = parallelLockMap.putIfAbsent(className, newLock);
if (lock == null) {
lock = newLock;
}
return lock;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
* This is a class loader that can load classes dynamically from new
* jar files under a configured folder. The paths to the jar files are
* converted to URLs, and URLClassLoader logic is actually used to load
* classes. This class loader always uses its parent class loader
* to load a class at first. Only if its parent class loader
* can not load a class, we will try to load it using the logic here.
*
* The configured folder can be a HDFS path. In this case, the jar files
* under that folder will be copied to local at first under ${hbase.local.dir}/jars/.
* The local copy will be updated if the remote copy is updated, according to its
* last modified timestamp.
*
* We can't unload a class already loaded. So we will use the existing
* jar files we already know to load any class which can't be loaded
* using the parent class loader. If we still can't load the class from
* the existing jar files, we will check if any new jar file is added,
* if so, we will load the new jar file and try to load the class again.
* If still failed, a class not found exception will be thrown.
*
* Be careful in uploading new jar files and make sure all classes
* are consistent, otherwise, we may not be able to load your
* classes properly.
*/
@InterfaceAudience.Private
public class DynamicClassLoader extends ClassLoaderBase {
private static final Log LOG =
LogFactory.getLog(DynamicClassLoader.class);

// Dynamic jars are put under ${hbase.local.dir}/jars/
private static final String DYNAMIC_JARS_DIR = File.separator
+ "jars" + File.separator;

private static final String DYNAMIC_JARS_DIR_KEY = "hbase.dynamic.jars.dir";

private File localDir;

// FileSystem of the remote path, set only if remoteDir != null
private FileSystem remoteDirFs;
private Path remoteDir;

// Last modified time of local jars
private HashMap<String, Long> jarModifiedTime;

/**
* Creates a DynamicClassLoader that can load classes dynamically
* from jar files under a specific folder.
*
* @param conf the configuration for the cluster.
* @param parent the parent ClassLoader to set.
*/
public DynamicClassLoader(
final Configuration conf, final ClassLoader parent) {
super(parent);

jarModifiedTime = new HashMap<String, Long>();
String localDirPath = conf.get(
LOCAL_DIR_KEY, DEFAULT_LOCAL_DIR) + DYNAMIC_JARS_DIR;
localDir = new File(localDirPath);
if (!localDir.mkdirs() && !localDir.isDirectory()) {
throw new RuntimeException("Failed to create local dir " + localDir.getPath()
+ ", DynamicClassLoader failed to init");
}

String remotePath = conf.get(DYNAMIC_JARS_DIR_KEY);
if (remotePath == null || remotePath.equals(localDirPath)) {
remoteDir = null;  // ignore if it is the same as the local path
} else {
remoteDir = new Path(remotePath);
try {
remoteDirFs = remoteDir.getFileSystem(conf);
} catch (IOException ioe) {
LOG.warn("Failed to identify the fs of dir "
+ remoteDir + ", ignored", ioe);
remoteDir = null;
}
}
}

@Override
public Class<?> loadClass(String name)
throws ClassNotFoundException {
try {
return parent.loadClass(name);
} catch (ClassNotFoundException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Class " + name + " not found - using dynamical class loader");
}

synchronized (getClassLoadingLock(name)) {
// Check whether the class has already been loaded:
Class<?> clasz = findLoadedClass(name);
if (clasz != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Class " + name + " already loaded");
}
}
else {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Finding class: " + name);
}
clasz = findClass(name);
} catch (ClassNotFoundException cnfe) {
// Load new jar files if any
if (LOG.isDebugEnabled()) {
LOG.debug("Loading new jar files, if any");
}
loadNewJars();

if (LOG.isDebugEnabled()) {
LOG.debug("Finding class again: " + name);
}
clasz = findClass(name);
}
}
return clasz;
}
}
}

private synchronized void loadNewJars() {
// Refresh local jar file lists
for (File file: localDir.listFiles()) {
String fileName = file.getName();
if (jarModifiedTime.containsKey(fileName)) {
continue;
}
if (file.isFile() && fileName.endsWith(".jar")) {
jarModifiedTime.put(fileName, Long.valueOf(file.lastModified()));
try {
URL url = file.toURI().toURL();
addURL(url);
} catch (MalformedURLException mue) {
// This should not happen, just log it
LOG.warn("Failed to load new jar " + fileName, mue);
}
}
}

// Check remote files
FileStatus[] statuses = null;
if (remoteDir != null) {
try {
statuses = remoteDirFs.listStatus(remoteDir);
} catch (IOException ioe) {
LOG.warn("Failed to check remote dir status " + remoteDir, ioe);
}
}
if (statuses == null || statuses.length == 0) {
return; // no remote files at all
}

for (FileStatus status: statuses) {
if (status.isDir()) continue; // No recursive lookup
Path path = status.getPath();
String fileName = path.getName();
if (!fileName.endsWith(".jar")) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignored non-jar file " + fileName);
}
continue; // Ignore non-jar files
}
Long cachedLastModificationTime = jarModifiedTime.get(fileName);
if (cachedLastModificationTime != null) {
long lastModified = status.getModificationTime();
if (lastModified < cachedLastModificationTime.longValue()) {
// There could be some race, for example, someone uploads
// a new one right in the middle the old one is copied to
// local. We can check the size as well. But it is still
// not guaranteed. This should be rare. Most likely,
// we already have the latest one.
// If you are unlucky to hit this race issue, you have
// to touch the remote jar to update its last modified time
continue;
}
}
try {
// Copy it to local
File dst = new File(localDir, fileName);
remoteDirFs.copyToLocalFile(path, new Path(dst.getPath()));
jarModifiedTime.put(fileName, Long.valueOf(dst.lastModified()));
URL url = dst.toURI().toURL();
addURL(url);
} catch (IOException ioe) {
LOG.warn("Failed to load new jar " + fileName, ioe);
}
}
}
}
Advertisements
Categories: HBase Tags: ,
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: