Using Scala code in PySpark applications

While there are solid reasons to develop Spark applications using the Python API, it is undeniable that Scala is Spark’s native tongue. If you need a feature unsupported by PySpark, or just want to use a Scala library in your Python application, this post will show how to mix the two and get the best of both worlds.

Calling Scala code in PySpark applications

Pyspark sets up a gateway between the interpreter and the JVM - Py4J - which can be used to move java objects around. Let’s code up the simplest of Scala objects:

package com.scalapyspark

object SelfHelp {
    def quoteRandall = println("Open unmarked doors")
}

We then build this and package it as a JAR, by using a tool such as maven or sbt:

$ mvn package
Building jar: .../target/scalapyspark-0.2.0-SNAPSHOT.jar

We are now able to launch the pyspark shell with this JAR on the –driver-class-path. Depending on the code we may also need to submit it in the –jars argument:

$ pyspark --master yarn --deploy-mode client --jars scalapyspark-0.2.0-SNAPSHOT.jar --driver-class-path scalapyspark-0.2.0-SNAPSHOT.jar

We can access our package by accessing the _jvm attribute of spark context (sc):

>>> selfHelper = sc._jvm.com.scalapyspark.SelfHelp
>>> selfHelper.quoteRandall()
Open unmarked doors

Voilà, we called our first Scala method from PySpark!

Real projects are never that simple

In an actual project, a couple things might differ from the simple example above, which introduces a bit of complexity:

Scala code with dependencies on external libraries.

In this case, I couldn’t always succeed by simply packaging my Scala code and submitting the PySpark job with the dependencies in –packages. The foolproof way to do it is to package a fat jar that also contains your Scala dependencies. We can use sbt assembly to accomplish this.

Passing Spark objects around.

Spark objects must be explicitly boxed/unboxed into java objects when passing them between environments. A few common examples are:

  • SparkContext

If your Scala code needs access to the SparkContext (sc), your python code must pass sc._jsc, and your Scala method should receive a JavaSparkContext parameter and unbox it to a Scala SparkContext.

import org.apache.spark.api.java.JavaSparkContext

def method(jsc: JavaSparkContext) = {
    val sc = JavaSparkContext.toSparkContext(jsc)
}
  • SQLContext

The Scala SQLContext can be passed from python by sending sqlContext._ssql_ctx. This will be usable without any transformations on the Scala side.

  • RDDs

You can pass them from Python to Scala via rdd._jrdd. On the Scala side, a JavaRDD (jrdd) can be unboxed by accessing jrdd.rdd. When converting it back to Python, one can do:

from pyspark.rdd import RDD

pythonRDD = RDD(jrdd, sc)
  • DataFrames

To send a DataFrame (df) from python, one must pass the df._jdf attribute. When returning a Scala DataFrame back to python, it can be converted on the python side by:

from pyspark.sql import DataFrame

pythonDf = DataFrame(jdf, sqlContext)

DataFrames can also be moved around by using registerTempTable and accessing them through the sqlContext.

Diogo Franco

Diogo Franco

I love data, distributed systems, machine learning, code and science!