Spark is a great technology for building distributed applications, and its integration in DSS unleashes a huge potential for data scientists.
Being a new technology, it will however sometimes prove challenging to use. This article lists a series of common issues that one might run trying to build a predictive application with Spark, and also some helpful tips.
Also, don't forget to skim through the Spark documentation. Stack Overflow and the Spark mailing lists have a good deal of troubleshooting help too.
Connecting to datasets
Caching and memory
Connecting to datasets
DSS's integration with Spark lets you read and write all datasets using Spark. However, it is very highly recommended that for "real" usage, you only use Spark on HDFS datasets.
When Spark runs on HDFS datasets, the reading of the source datasets will be truly parallel. On other datasets, only a single executor will read the initial data (the rest of the processing will be distributed).
In addition, on HDFS, we recommend that you always use the Parquet format. On some extreme cases, the speedup when switching from CSV to Parquet can reach 100x.
The main reasons are:
- Parquet is a binary format, much faster to parse than text-oriented formats like CSV
- It uses faster compression algorithms
- Its column-oriented nature makes aggregations faster (cache-aware algorithms)
- Most importantly, its column-oriented nature is deeply integrated in Spark, and Spark will only read the columns that it has determined will actually be used for processing. On very wide datasets, this can lead to reading only a few percents of the data.
By default, when you create a Parquet dataset in HDFS, all identifiers in the Parquet schema are lowercased. This is absolutely required for compatibility with Hive, which does not support mixed-case or upper-case identifiers in Parquet tables.
This can cause some fairly unexpected results with Spark. For example, if your dataset contains column
Id, the Parquet file will say
id, and Spark will actually keep
Id. So, in a Pyspark recipe, you should use:
df.groupby("Id") will fail complaining that
Id does not exist.
Symptom: all HDFS datasets fail to load in Spark, and you see some error messages about files not found, while everything looks OK.
One possible cause is that Spark isn't properly configured, and isn't using your HDFS configuration. You can confirm this by looking closely at the URLs of the files that are supposedly not found. If these URLs start by
file:// instead of
hdfs://, then that's the reason.
Since Spark doesn't know about your HDFS cluster, it falls back to trying to read local files.
You should configure spark-env.sh and/or spark-defaults.conf to point to your Hadoop configuration. Exact method varies by Hadoop distribution, but it at least includes setting the
YARN_CONF_DIR variables in
When using Pyspark, Python code can be executed:
- In the Spark driver (ie, the "client" program that you actually write in a notebook or Pyspark recipe)
- In the Spark executors (ie, the processes that run on every machines of your Spark cluster)
The code that executes in the driver is run in the regular DSS Python environment, and can use all Python packages installed in this environment. (For more info, see this Howto ).
For example, assuming that the tabulate package is installed in the Python environment:
This code gets executed in the driver.
However, when you pass functions (or lambdas) to Spark commands, these functions get distributed. In that case, these functions don't have access to the regular DSS environment.
To solve that, you have to ensure that the required packages are available on every machine of the cluster (using pip or conda for example). You will also need to ensure that the "python27" command in the PATH of the executors points to a Python executable that contains the required packages.
If that's not the case (because you used a virtualenv for example), you can set in your spark-env.sh the PYSPARK_PYTHON environment variable and have it to point it to a proper Python.
As of DSS 2.1, the Spark SQL recipe only supports the basic "Spark SQL" language. It does not support the "hiveql" language.
In the Jupyter notebook, you can create a HiveContext instead of a SQLContext. However, beware that the behavior is not the same: in a HiveContext, you have implicit access to the tables that are referenced in the Hive metastore in addition to the tables that you manually register.
Caching and memory
One of the strong points of Spark is that it can "cache" RDDs and DataFrames. Spark can cache using memory and disk (obviously, much better performance is achieved when caching uses memory).
Caching is useful for RDDs/DFs:
- on which expensive computations have already been done
- that will be reused several times as basis for further processing.
It is really important to understand how your RDD/DF wil be used. Blindly enabling Spark caching will actually strongly decrease performance by taking away memory that could be better used elsewhere. It will also very often lead to out of memory situations
For example, do not cache a source dataset that will be used for a single grouping operation:
- No computation has yet been done, so reloading it would be almost as fast, especially if you use Parquet.
While Spark is not an "in-memory processing engine", it has deep optimizations to run in memory. Spark eagerly uses memory, and you'll most often need to fine tune the memory settings for your Spark job.
Spark uses memory for two main things:
- Caching parts of the RDDs and DataFrames. This is mostly useful for RDDs for which expensive computations have already been done.
- Shuffling data. Unlike Hadoop, Spark will agressively try to shuffle data between executors using memory, only falling back to disk if not enough memory is availabl.
Both these operations use the Java heap. You will thus need to set the Java heap memory to a high enough value. Note that "high enough" is generally really high. We recommend that you never go below 4G of memory for Spark executors. The official recommendation for Spark is "from 8 to hundreds of gigabytes".
Setting the java heap size is done differently according to the Spark execution mode:
- In local (non recommended mode), set the
- In YARN or "master" mode, set
For better tuning of the memory, you'll want to read about the
spark.storage.memoryFraction variables. See the official Spark configuration guide
Spark divides the RDD that it manipulates into "partitions". A strong limit of Spark (as of Spark 1.5) is that at no moment can the data for any partition go above 2 GB.
- When you read from HDFS, Spark automatically creates partitions of reasonable size
- When you read from other datasets, DSS repartitions the dataset in 100 partitions by default (you can always override this in the recipe settings, or call "repartition()" yourself)
There are several things that could make a partition grow too much:
- If a map operation strongly increases the size of its input. For example, a map that creates a dense vector
- If a groupByKey operation actually does not reduce the size of the input but concatenates, and has skewed keys (ie, a key is highly over-represented).
These are among the complex topics when dealing with Spark. Several strategies are possible:
- Decrease the size of input partitions of the operation
- Make sure that you repartition just after the operation that created too large partitions
This is a known issue in Spark 1.5 with the Tungsten engine. See https://issues.apache.org/jira/browse/SPARK-10309
Tungsten is a new optimized execution engine that aims to make Spark faster. It is however still fairly new!
You can try the following:
- Set the
spark.buffer.pageSizevariable to a lower value, like 2m
- If that still does not work, disable the new Tungsten engine: Set the
spark.sql.tungsten.enabledvariable to false