Spark tips and troubleshooting

October 25, 2015

Spark is a great technology for building distributed applications, and its integration in DSS unleashes a huge potential for data scientists.

Being a new technology, it will however sometimes prove challenging to use. This article lists a series of common issues that one might run trying to build a predictive application with Spark, and also some helpful tips.

Also, don't forget to skim through the Spark documentation. Stack Overflow and the Spark mailing lists have a good deal of troubleshooting help too.


Connecting to datasets

What formats and datasets should I use?

DSS's integration with Spark lets you read and write all datasets using Spark. However, it is very highly recommended that for "real" usage, you only use Spark on HDFS datasets.

When Spark runs on HDFS datasets, the reading of the source datasets will be truly parallel. On other datasets, only a single executor will read the initial data (the rest of the processing will be distributed).

In addition, on HDFS, we recommend that you always use the Parquet format. On some extreme cases, the speedup when switching from CSV to Parquet can reach 100x.

The main reasons are:

  • Parquet is a binary format, much faster to parse than text-oriented formats like CSV
  • It uses faster compression algorithms
  • Its column-oriented nature makes aggregations faster (cache-aware algorithms)
  • Most importantly, its column-oriented nature is deeply integrated in Spark, and Spark will only read the columns that it has determined will actually be used for processing. On very wide datasets, this can lead to reading only a few percents of the data.

Parquet case sensitivity

By default, when you create a Parquet dataset in HDFS, all identifiers in the Parquet schema are lowercased. This is absolutely required for compatibility with Hive, which does not support mixed-case or upper-case identifiers in Parquet tables.

This can cause some fairly unexpected results with Spark. For example, if your dataset contains column Id, the Parquet file will say id, and Spark will actually keep id, not Id. So, in a Pyspark recipe, you should use: df.groupby("id"). Using df.groupby("Id") will fail complaining that Id does not exist.

Spark can't read my HDFS datasets

Symptom: all HDFS datasets fail to load in Spark, and you see some error messages about files not found, while everything looks OK.

One possible cause is that Spark isn't properly configured, and isn't using your HDFS configuration. You can confirm this by looking closely at the URLs of the files that are supposedly not found. If these URLs start by file:// instead of hdfs://, then that's the reason.

Since Spark doesn't know about your HDFS cluster, it falls back to trying to read local files.

You should configure and/or spark-defaults.conf to point to your Hadoop configuration. Exact method varies by Hadoop distribution, but it at least includes setting the HADOOP_CONF_DIR and/or HDFS_CONF_DIR and YARN_CONF_DIR variables in


Using Python libraries like Numpy

When using Pyspark, Python code can be executed:

  • In the Spark driver (ie, the "client" program that you actually write in a notebook or Pyspark recipe)
  • In the Spark executors (ie, the processes that run on every machines of your Spark cluster)

The code that executes in the driver is run in the regular DSS Python environment, and can use all Python packages installed in this environment. (For more info, see this Howto ).

For example, assuming that the tabulate package is installed in the Python environment:


df = dspark.get_dataframe(sql_context, dataset)
count_by_age = df.groupBy("age").count()

import tabulate

This code gets executed in the driver.

However, when you pass functions (or lambdas) to Spark commands, these functions get distributed. In that case, these functions don't have access to the regular DSS environment.

To solve that, you have to ensure that the required packages are available on every machine of the cluster (using pip or conda for example). You will also need to ensure that the "python27" command in the PATH of the executors points to a Python executable that contains the required packages.

If that's not the case (because you used a virtualenv for example), you can set in your the PYSPARK_PYTHON environment variable and have it to point it to a proper Python.

For example

export PYSPARK_PYTHON=/opt/datalab/python/virtualenv-with-packages/bin/python


Cannot use Hive language

As of DSS 2.1, the Spark SQL recipe only supports the basic "Spark SQL" language. It does not support the "hiveql" language.

In the Jupyter notebook, you can create a HiveContext instead of a SQLContext. However, beware that the behavior is not the same: in a HiveContext, you have implicit access to the tables that are referenced in the Hive metastore in addition to the tables that you manually register.

Caching and memory

What and how should I cache?

One of the strong points of Spark is that it can "cache" RDDs and DataFrames. Spark can cache using memory and disk (obviously, much better performance is achieved when caching uses memory).

Caching is useful for RDDs/DFs:

  • on which expensive computations have already been done
  • that will be reused several times as basis for further processing.

It is really important to understand how your RDD/DF wil be used. Blindly enabling Spark caching will actually strongly decrease performance by taking away memory that could be better used elsewhere. It will also very often lead to out of memory situations

For example, do not cache a source dataset that will be used for a single grouping operation:

  • No computation has yet been done, so reloading it would be almost as fast, especially if you use Parquet.

Error: OutOfMemoryError

While Spark is not an "in-memory processing engine", it has deep optimizations to run in memory. Spark eagerly uses memory, and you'll most often need to fine tune the memory settings for your Spark job.

Spark uses memory for two main things:

  • Caching parts of the RDDs and DataFrames. This is mostly useful for RDDs for which expensive computations have already been done.
  • Shuffling data. Unlike Hadoop, Spark will agressively try to shuffle data between executors using memory, only falling back to disk if not enough memory is availabl.

Both these operations use the Java heap. You will thus need to set the Java heap memory to a high enough value. Note that "high enough" is generally really high. We recommend that you never go below 4G of memory for Spark executors. The official recommendation for Spark is "from 8 to hundreds of gigabytes".

Setting the java heap size is done differently according to the Spark execution mode:

  • In local (non recommended mode), set the spark.driver.memory variable (to 8g for example)
  • In YARN or "master" mode, set spark.executor.memory

For better tuning of the memory, you'll want to read about the spark.shuffle.memoryFraction and variables. See the official Spark configuration guide


Error: Size exceeds Integer.MAX_VALUE

Spark divides the RDD that it manipulates into "partitions". A strong limit of Spark (as of Spark 1.5) is that at no moment can the data for any partition go above 2 GB.

  • When you read from HDFS, Spark automatically creates partitions of reasonable size
  • When you read from other datasets, DSS repartitions the dataset in 100 partitions by default (you can always override this in the recipe settings, or call "repartition()" yourself)

There are several things that could make a partition grow too much:

  • If a map operation strongly increases the size of its input. For example, a map that creates a dense vector
  • If a groupByKey operation actually does not reduce the size of the input but concatenates, and has skewed keys (ie, a key is highly over-represented).

These are among the complex topics when dealing with Spark. Several strategies are possible:

  • Decrease the size of input partitions of the operation
  • Make sure that you repartition just after the operation that created too large partitions

Error: Unable to acquire 1048576 bytes of memory

This is a known issue in Spark 1.5 with the Tungsten engine. See

Tungsten is a new optimized execution engine that aims to make Spark faster. It is however still fairly new!

You can try the following:

  • Set the spark.buffer.pageSize variable to a lower value, like 2m
  • If that still does not work, disable the new Tungsten engine: Set the spark.sql.tungsten.enabled variable to false