Spark is a great technology for building distributed applications, and its integration in DSS unleashes a huge potential for data scientists.
Being a new technology, it will however sometimes prove challenging to use. This article lists a series of common issues that one might run trying to build a predictive application with Spark, and also some helpful tips.
Also, don't forget to skim through the Spark documentation. Stack Overflow and the Spark mailing lists have a good deal of troubleshooting help too.
Connecting to datasets
Caching and memory
DSS's integration with Spark lets you read and write all datasets using Spark. However, it is very highly recommended that for "real" usage, you only use Spark on HDFS datasets.
When Spark runs on HDFS datasets, the reading of the source datasets will be truly parallel. On other datasets, only a single executor will read the initial data (the rest of the processing will be distributed).
In addition, on HDFS, we recommend that you always use the Parquet format. On some extreme cases, the speedup when switching from CSV to Parquet can reach 100x.
The main reasons are:
By default, when you create a Parquet dataset in HDFS, all identifiers in the Parquet schema are lowercased. This is absolutely required for compatibility with Hive, which does not support mixed-case or upper-case identifiers in Parquet tables.
This can cause some fairly unexpected results with Spark. For example, if your dataset contains column
Id, the Parquet file will say
id, and Spark will actually keep
Id. So, in a Pyspark recipe, you should use:
df.groupby("Id") will fail complaining that
Id does not exist.
Symptom: all HDFS datasets fail to load in Spark, and you see some error messages about files not found, while everything looks OK.
One possible cause is that Spark isn't properly configured, and isn't using your HDFS configuration. You can confirm this by looking closely at the URLs of the files that are supposedly not found. If these URLs start by
file:// instead of
hdfs://, then that's the reason.
Since Spark doesn't know about your HDFS cluster, it falls back to trying to read local files.
You should configure spark-env.sh and/or spark-defaults.conf to point to your Hadoop configuration. Exact method varies by Hadoop distribution, but it at least includes setting the
YARN_CONF_DIR variables in
When using Pyspark, Python code can be executed:
The code that executes in the driver is run in the regular DSS Python environment, and can use all Python packages installed in this environment. (For more info, see this Howto ).
For example, assuming that the tabulate package is installed in the Python environment:
This code gets executed in the driver.
However, when you pass functions (or lambdas) to Spark commands, these functions get distributed. In that case, these functions don't have access to the regular DSS environment.
To solve that, you have to ensure that the required packages are available on every machine of the cluster (using pip or conda for example). You will also need to ensure that the "python27" command in the PATH of the executors points to a Python executable that contains the required packages.
If that's not the case (because you used a virtualenv for example), you can set in your spark-env.sh the PYSPARK_PYTHON environment variable and have it to point it to a proper Python.
As of DSS 2.1, the Spark SQL recipe only supports the basic "Spark SQL" language. It does not support the "hiveql" language.
In the Jupyter notebook, you can create a HiveContext instead of a SQLContext. However, beware that the behavior is not the same: in a HiveContext, you have implicit access to the tables that are referenced in the Hive metastore in addition to the tables that you manually register.
One of the strong points of Spark is that it can "cache" RDDs and DataFrames. Spark can cache using memory and disk (obviously, much better performance is achieved when caching uses memory).
Caching is useful for RDDs/DFs:
It is really important to understand how your RDD/DF wil be used. Blindly enabling Spark caching will actually strongly decrease performance by taking away memory that could be better used elsewhere. It will also very often lead to out of memory situations
For example, do not cache a source dataset that will be used for a single grouping operation:
While Spark is not an "in-memory processing engine", it has deep optimizations to run in memory. Spark eagerly uses memory, and you'll most often need to fine tune the memory settings for your Spark job.
Spark uses memory for two main things:
Both these operations use the Java heap. You will thus need to set the Java heap memory to a high enough value. Note that "high enough" is generally really high. We recommend that you never go below 4G of memory for Spark executors. The official recommendation for Spark is "from 8 to hundreds of gigabytes".
Setting the java heap size is done differently according to the Spark execution mode:
For better tuning of the memory, you'll want to read about the
spark.storage.memoryFraction variables. See the official Spark configuration guide
Spark divides the RDD that it manipulates into "partitions". A strong limit of Spark (as of Spark 1.5) is that at no moment can the data for any partition go above 2 GB.
There are several things that could make a partition grow too much:
These are among the complex topics when dealing with Spark. Several strategies are possible:
This is a known issue in Spark 1.5 with the Tungsten engine. See https://issues.apache.org/jira/browse/SPARK-10309
Tungsten is a new optimized execution engine that aims to make Spark faster. It is however still fairly new!
You can try the following:
spark.buffer.pageSizevariable to a lower value, like 2m
spark.sql.tungsten.enabledvariable to false