Interacting With HBase from PySpark

Interacting with HBase from PySpark

This post shows multiple examples of how to interact with HBase from Spark in Python. Because the ecosystem around Hadoop and Spark keeps evolving rapidly, it is possible that your specific cluster configuration or software versions are incompatible with some of these strategies, but I hope there’s enough in here to help people with every setup.

Let’s start by creating an HBase table called books, and fill it with data to play around with the examples.

create 'books', 'info', 'analytics'
put 'books', 'In Search of Lost Time', 'info:author', 'Marcel Proust'
put 'books', 'In Search of Lost Time', 'info:year', '1922'
put 'books', 'In Search of Lost Time', 'analytics:views', '3298'
put 'books', 'Godel, Escher, Bach', 'info:author', 'Douglas Hofstadter'
put 'books', 'Godel, Escher, Bach', 'info:year', '1979'
put 'books', 'Godel, Escher, Bach', 'analytics:views', '820'

Hive

If you’re operating on HBase from Spark, there’s a good chance that you are on a Hadoop cluster with Apache Hive laying around. Hive implements an HBase Storage Handler, which allows us to create external tables on top of HBase. Thus, one of the most low-friction ways to interact with HBase from Spark is to do it indirectly via Hive. Such table can be created in the following way:

CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`books_ext` (
    `title` string,
    `author` string,
    `year` int,
    `views` double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
    'hbase.columns.mapping'=':key,info:author,info:year,analytics:views'
)
TBLPROPERTIES (
    'hbase.mapred.output.outputtable'='books',
    'hbase.table.name'='books'
);

You can now query the newly created table from Spark using the SQLContext:

>>> sqlContext.table("default.books_ext").show()
+--------------------+------------------+----+------+
|               title|            author|year| views|
+--------------------+------------------+----+------+
| Godel, Escher, Bach|Douglas Hofstadter|1979| 820.0|
|In Search of Lost...|     Marcel Proust|1922|3298.0|
+--------------------+------------------+----+------+

Some considerations regarding this strategy:

  • When performing scans, tweaking the cache parameters on your session can really help with performance, for example:
sqlContext.sql('SET hbase.scan.cache=10000')
sqlContext.sql('SET hbase.client.scanner.cache=10000')
  • When book_ext is used directly from Hive, we can see with explain that Hive optimizes simple predicates into the correct HBase statements:
    • where title = 'something' becomes an HBase get.
    • where title >= 'something' and title < 'something' is correctly translated to a range scan. This is not the case if using the between operator, for example.
  • While the Hive HBaseStorageHandler understands and correctly translates simple query predicates, the spark engine is not as smart:
    • Queries are immediately translated to full table scans with filtering afterwards within Spark, so that Hive doesn’t get to optimize and pushdown filters. Dataframe.explain() will show you the Spark physical plan.

The last point means that accessing HBase from Spark through Hive is only a good option when doing operations on the entire table, such as full table scans. Otherwise, keep reading!

Spark-HBase Connector

The Spark-HBase connector comes out of the box with HBase, giving this method the advantage of having no external dependencies. You should be able to get this working in PySpark, in the following way:

export SPARK_CLASSPATH=$(hbase classpath)
pyspark --master yarn
df = sqlContext.read.format('org.apache.hadoop.hbase.spark') \
    .option('hbase.table','books') \
    .option('hbase.columns.mapping', \
            'title STRING :key, \
            author STRING info:author, \
            year STRING info:year, \
            views STRING analytics:views') \
    .option('hbase.use.hbase.context', False) \
    .option('hbase.config.resources', 'file:///etc/hbase/conf/hbase-site.xml') \
    .option('hbase-push.down.column.filter', False) \
    .load()

df.show()
+--------------------+------------------+----+------+
|               title|            author|year| views|
+--------------------+------------------+----+------+
| Godel, Escher, Bach|Douglas Hofstadter|1979| 820.0|
|In Search of Lost...|     Marcel Proust|1922|3298.0|
+--------------------+------------------+----+------+

In all my tests this method worked to read entire HBase tables, but failed when using any sort of predicate push down filter on string colums (such as the rowkey), e.g. df.where(df['title'] = b'Godel, Escher, Bach').show() fails with NoClassDefFoundError: scala/collection/immutable/StringOps. This might have to do with the Scala version against which my Spark was compiled (although I tried on multiple Spark versions and the issue persisted). There are however reports of people for whom this method works to read HBase tables, so I believe it is worth a try in your setup.

SHC

SHC is a well maintained package from Hortonworks to interact with HBase from Spark. It allows querying HBase via Spark-SQL and the DataFrame abstraction, and supports predicate pushdown and data locality optimizations. This library is tailored towards Scala, but you might be able to use SHC with PySpark as described below. This is however a bit of a fragile solution, as it doesn’t work in some version combinations of Spark, HBase and Scala. To try it out, fire up a Spark REPL with the SHC package, making sure to change the hbase-site.xml file path to the one in your own cluster:

pyspark --master local --packages com.hortonworks:shc-core:1.1.1-1.6-s_2.10 --repositories http://repo.hortonworks.com/content/groups/public/ --files /etc/hbase/conf.cloudera.hbase/hbase-site.xml

We can now use SHC to scan the HBase table in the following manner:

catalog = ''.join("""
    {
        'table': {
            'namespace': 'default',
            'name': 'books'
        },
        'rowkey': 'key',
        'columns': {
            'title': {'cf': 'rowkey', 'col': 'key', 'type': 'string'},
            'author': {'cf': 'info', 'col': 'author', 'type': 'string'}
        }
    }
""".split())

df = sqlContext \
    .read \
    .options(catalog=catalog) \
    .format('org.apache.spark.sql.execution.datasources.hbase') \
    .load()

df.show()

If this works for you, I recommend you check the documentation and examples on the SHC repo for more complex read and write operations.

Happybase

Happybase is a pure Python library to connect with HBase via its Thrift API. The fact that it can work outside of Spark can be an advantage in some scenarios, but it also means that it will run solely on the driver if used naively. However, to (for example) perform a big write in batch, one could explicitly setup a connection to HBase on each executor and write in parallel from there (with something like forEachPartition or mapPartitions).

Checkout the Happybase documentation which already contains various examples for the most common HBase operations.

Scala Code

Finally, if nothing else works for you, there is always the possibility of writing a Scala object to interact with HBase and call it from PySpark. Checkout my previous blog post about using Scala code in PySpark.

While this solution is more complex than the above, it is also one that is guaranteed to work with any setup, as accessing HBase from Scala is usually much simpler than from Python. Let’s go through a complete example that uses the Scala only nerdammer connector, and exposes a read method on the books table in PySpark. We’ll perform a range scan to get the titles that start with a letter bewteen A and H:

package org.somepackage
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.{DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import it.nerdammer.spark.hbase._
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.hive.HiveContext
 
object BookScanner {
   def read(jsc: JavaSparkContext, sqlContext: HiveContext): DataFrame = {
    val sc = JavaSparkContext.toSparkContext(jsc)
 
    val hBaseRDD = sc.hbaseTable[(String, String)]("books")
      .select("author")
      .inColumnFamily("info")
      .withStartRow("A")
      .withStopRow("H")
     
    return sqlContext.createDataFrame(hBaseRDD);
  }
}

The code above should be compiled with sbt assembly, to generate a jar that includes the needed dependency. Put it src/org/main/scala/somepackage/BookScanner.scala and create a build.sbt in the root that packages our own class as well as the HBase connector library:

name := "BookScanner"
 
scalaVersion := "2.10.5"
 
// Resolvers
resolvers += "Apache HBase"  at "https://repository.apache.org/content/repositories/releases"
resolvers += "Thrift"        at "http://people.apache.org/~rawson/repo/"
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += "Cloudera Repo" at "https://repository.cloudera.com/content/repositories/releases"
 
// Dependencies
 
// ------------- Spark -----------
libraryDependencies ++= Seq (
  "org.apache.spark" %% "spark-core" % "1.6.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "1.6.0" % "provided",
  "org.apache.spark" %% "spark-hive" % "1.6.0" % "provided"
)
 
// ----------- Hadoop -------------------
libraryDependencies ++= Seq (
  "org.apache.hadoop" % "hadoop-core" % "1.2.0" % "provided"
)
 
// Nerdammer Spark-HBase Connector
 
libraryDependencies ++= Seq (
    "it.nerdammer.bigdata" % "spark-hbase-connector_2.10" % "1.0.3"
)
 
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
 
assemblyMergeStrategy in assembly := {
  case x if x.matches("META-INF/services/.*") => MergeStrategy.filterDistinctLines
  case x if x.matches(".*(xml|dtd|xsd|xml|html|properties|class|thrift)$") => MergeStrategy.first
  case "reference.conf" => MergeStrategy.concat
  case old =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(old)
}
 
assemblyExcludedJars in assembly := {
  val cp = (fullClasspath in assembly).value
  cp filter {_.data.getName != "spark-hbase-connector_2.10-1.0.3.jar"}
}

To use sbt assembly, you’ll also need a project/assembly.sbt file with the following content:

resolvers += Resolver.url("bintray-sbt-plugins", url("http://dl.bintray.com/sbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")

We can now type in sbt assembly, which will create a BookScanner-assembly-0.1-SNAPSHOT.jar on target/scala-2.10/.

Let’s launch a PySpark REPL with this jar. We will also need to define the spark.hbase.host configuration parameter containing the HBase Zookeeper quorum:

pyspark --master yarn --jars BookScanner-assembly-0.1-SNAPSHOT.jar --driver-class-path BookScanner-assembly-0.1-SNAPSHOT.jar --conf spark.hbase.host='<host1:port>,<host2:port>'

We can then call our read method, making sure to unbox the python sparkContext and sqlContext into their underlying java objects:

book_scanner = sc._jvm.org.somepackage.BookScanner
book_scanner.read(sc._jsc, sqlContext._ssql_ctx).show()
+-------------------+------------------+
|              title|            author|
+-------------------+------------------+
|Godel, Escher, Bach|Douglas Hofstadter|
+-------------------+------------------+

Conclusion

Clearly, accessing HBase through PySpark can be a bit messy, as most approaches are directed towards Scala. I hope, however, that there’s something in this post that will work regardless of your needs, configuration and software versions:

  • Operating on HBase through the Hive HBaseStorageHandler is a simple option when working on the entire table (or with small datasets).
  • Using the native Spark-HBase connector can also be useful for some usecases as there are no dependencies to install in not too outdated versions of HBase and Spark.
  • SHC is worth a try, as it is a quite full featured library in the environments where it functions properly in Python.
  • Happybase in a pure Python option to interact with HBase, but can only take advantage of the distributed nature of Spark if we explicitly define the parallel computations.
  • Lastly, a slightly more complex option to access HBase is to through Scala code, which is usually much less problematic, followed by calling it from PySpark.

Cheers!

Diogo Franco

Diogo Franco

I love data, distributed systems, machine learning, code and science!