Skip to content

Spark Distributed Analytic Framework

Description and Overview

Apache Spark is a fast and general engine for large-scale data processing.

How to Use Spark

Because of its high memory and I/O bandwidth requirements, we recommend you run your spark jobs on Perlmutter. We recommend that you run Spark inside of Shifter. This will improve performance and usability by utilizing Shifter's per-node file cache for shuffle files and temporary data files. Without this functionality, these files are written either to your scratch directory (which is not optimized for repeated accesses of many small files) or the RAM file system at /tmp (which removes memory from the node doing the calculation and can lead to the node crashing from lack of memory).

Follow the steps below to use spark, note that the order of the commands matters. DO NOT load the spark module until you are inside a batch job.

Interactive mode

Submit an interactive batch job with at least 2 nodes. Our setup for Spark puts the master on one node and the workers on the other nodes.

salloc -N 2 -t 30 -C cpu -q interactive --image=nersc/spark-2.3.0:v2 --volume="/global/pscratch/sd/${USER:0:1}/$USER/tmpfiles:/tmp:perNodeCache=size=200G"

This will request a job with the Spark 2.3.0 Shifter image (if you wish to run with an earlier version of spark, replace the version numbers for the above and following commands). It also sets up an xfs file on each node as a per node cache, which will be accessed inside the Shifter image via /tmp. By default, Spark will use this as the directory it caches temporary files. By default Spark will put event logs in $SCRATCH/spark/spark_event_logs, you will need to create this directory the first time you start up Spark.

Wait for the job to start. Once it does you will be on a compute node and you will need to load the Spark module:

export EXEC_CLASSPATH=path_to_any_extra_needed_jars #Only required if you're using external libraries or jarfiles
module load spark/2.3.0

You can start Spark with this command:

start-all.sh

To connect to the Python Spark Shell, do:

shifter pyspark

To connect to the Scala Spark Shell, do:

shifter spark-shell

To shutdown the Spark cluster, do:

stop-all.sh

Batch mode

Below are example batch scripts for Perlmutter. You can change number of nodes/time/QOS accordingly (so long as the number of nodes is greater than 1). On Perlmutter you can use the debug QOS for short, debugging jobs and the regular QOS for long jobs.

Here's an example script for Perlmutter called run.sl:

#!/bin/bash

#SBATCH -q regular
#SBATCH -N 2
#SBATCH -C cpu
#SBATCH -t 00:30:00
#SBATCH -e mysparkjob_%j.err
#SBATCH -o mysparkjob_%j.out
#SBATCH --image=nersc/spark-2.3.0:v2
#SBATCH --volume="/global/pscratch/sd/${USER:0:1}/$USER/tmpfiles:/tmp:perNodeCache=size=200G"

export EXEC_CLASSPATH=path_to_any_extra_needed_jars #Only required if you're using external libraries or jarfiles
module load spark/2.3.0

start-all.sh

shifter spark-submit $SPARK_EXAMPLES/python/pi.py

stop-all.sh

To submit the job:

sbatch run.sl

Example PySpark Script

Here's an example pyspark script that calculates pi.

from time import time
import numpy as np
from random import random
from operator import add

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CalculatePi').getOrCreate()
sc = spark.sparkContext

n = 10000000

def is_point_inside_unit_circle(p):
    # p is useless here
    x, y = random(), random()
    return 1 if x*x + y*y < 1 else 0

t_0 = time()

# parallelize creates a spark Resilient Distributed Dataset (RDD)
# its values are useless in this case
# but allows us to distribute our calculation (inside function)
count = sc.parallelize(range(0, n)) \
             .map(is_point_inside_unit_circle).reduce(add)
print(np.round(time()-t_0, 3), "seconds elapsed for spark approach and n=", n)
print("Pi is roughly %f" % (4.0 * count / n))

You can run this script in an interactive or batch session (see above for how to get one of those) with

shifter spark-submit --master $SPARKURL <path_to_python_script>

Guide for Optimizing Your Spark Code

Scala or Python

When writing code for Spark, historically scala has out performed python (via pySpark).However, as of Spark 2.0, the performance of python code using dataframes has become roughly equivalent for most operations. So we recommend that you write your code in python and dataframes. You will get roughly the same performance and also benefit from the fact that you can use a familiar language and a pandas-like dataframe interface.

Memory Management and Input Data

Choose enough nodes that your input data can comfortably fit in the aggregate memory of all the nodes. If you will be working with the same dataframe over and over again, you can "cache" the dataframe to make sure it stays in memory. This will cut down on recalculating time. Just be sure to unpersist it when you're done using it. Whenever possible, store your input data in your $SCRATCH directory.