Intro to
Apache Spark

Distributed, in-memory computing for data processing

What is it?

  • General-purpose cluster computing* framework and runtime
    A framework is a library. A runtime is a thing that runs your code.
    More on this in a second.
  • Supports multiple languages (Scala, Python, Java, R)
  • Smart enough to do some optimization of your queries/transformations
  • Has higher-level libraries which are more domain-specific (Spark SQL, Spark Streaming, MLlib, GraphX)

Cluster Computing?

  • Cluster computing is a broad term for dividing work between multiple computers
  • Each worker machine finishes a piece of a problem, then has their partial results combined on a master machine
  • Spark is "general-purpose". This doesn't mean it does everything well. It means it isn't dedicated to one domain or use case.

How does it work?

Data -> Distributed Data -> Distributed Execution -> Collect

Step 1: Data

Spark supports retrieving data from a variety of sources:

  • Filesystem
  • HDFS
  • Cassandra
  • Amazon S3
  • Relational Databases (JDBC)
  • etc...

Step 2: Distributed Data

With the exception of HDFS, these data sources do not represented distributed data. Dividing data between workers is one of the most important things Spark does.

Let's look at how Spark deals with a relational database

Spark starts by looking at the maximum and minimum primary key and divides that key space into partitions.

Spark then divides the partitions among its worker nodes.

This is a bit of a simplification, since workers can have multiple executor threads. But you get the idea.

This process isn't magic!

Spark will not account for gaps in your primary key values, so the partitions might not actually be equally sized!
This is called skew and can lead to some workers doing more work than others (bad).

Step 3 & 4: Distributed Execution and Results Collection

Spark executes parallel copies of your code against each data partition...

  • Shuffling data between worker nodes if it needs to
  • Sending results back to the master node at the end of your program (ideally)
  • We'll dive into this more when we start writing code

This mapping between source data and partitions is handled by an RDD. RDDs are a low-level Spark concept.

Most people use Dataframes and Datasets instead, which are more expressive and faster*

*more on this later

When should I use Spark?

(and when is it the worst?)

Myth #1

Spark is faster than an RDBMS or other database

False!
Spark really comes into play when you have so much data that you can't index it.

If you can, an index will almost* always be faster.

*Compute-heavy queries may run faster on Spark, but it's still a toss-up given the lack of an index.

Myth #2

You can do anything with Spark

Spark really only works well when the algorithm you're writing can easily be distributed.

Problems that don't break down into bits which can be solved in parallel are poorly suited for Spark. Code like this will run slower on Spark than it would on a single machine!

Myth #3

Spark Scala, Java, Python and R are equally fast

This is far less true than the Spark docs claim.

I'll explain why later this evening...

Let's make something!

Sample data

Here's one day of data from the popular NYC Taxi Dataset

							
$ curl -O http://assets.oculusinfo.com/salt/sample-data/taxi_one_day.csv
							
						
Or click here

Either way, remember where you put it.
We'll need the path to that file later.

spark-shell

Now, let's fire up a spark-shell so we have somewhere convenient to experiment

							
$ cd spark-1.6.1-bin-hadoop2.6/bin
$ ./spark-shell \
  --master local[2] \
  --packages com.databricks:spark-csv_2.10:1.4.0
							
						

Breakdown:

Give us two threads for executors:
								
--master local[2]
								
							
Download packages from maven central and put them on the classpath:
								
--packages com.databricks:spark-csv_2.10:1.4.0
								
							

Useful Package

spark-csv supports I/O for CSV/TSV/DSV files
								com.databricks:spark-csv_2.10:1.4.0
							

At this point, you should have a spark prompt:

...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
16/03/28 18:28:21 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/03/28 18:28:21 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/03/28 18:28:28 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/03/28 18:28:29 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/03/28 18:28:32 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/03/28 18:28:32 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
SQL context available as sqlContext.

scala>
						

Data Ingestion

Definition:

A Dataframe is a wrapper around an RDD which:

  • Represents distributed data as columnar, with schemas and well-defined data types
  • Offers a SQL-like API for interacting with that data

Let's create a Dataframe from taxi_one_day.csv:

							
// enter paste mode for multi-line stuff
scala> :paste
// modify the path and paste this code into the shell
val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true") // Use first line of all files as header
  .option("inferSchema", "true") // Automatically infer data types
  .load("path/to/taxi_one_day.csv") // can also use a directory/*.csv here
// type the EOF character (CTRL+D) to exit paste mode and interpret
							
						

Check out the inferred schema:

							
scala> df.printSchema
root
 |-- hack: string (nullable = true)
 |-- license: string (nullable = true)
 |-- code: string (nullable = true)
 |-- flag: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- pickup_time: timestamp (nullable = true)
 |-- dropoff_time: timestamp (nullable = true)
 |-- passengers: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- pickup_lon: double (nullable = true)
 |-- pickup_lat: double (nullable = true)
 |-- dropoff_lon: string (nullable = true)
 |-- dropoff_lat: string (nullable = true)
							
						

spark-csv may not infer the column datatypes correctly. Notice that pickup_lat and pickup_lon are doubles, but their dropoff counterparts are strings.

This is likely due to the presence of a non-numeric character somewhere in the dataset.

Fundamental: Spark SQL

Dataframes are part of Spark SQL, and have been the primary mechanism for interacting with data in Spark for several versions.

Spark 1.6 introduces a preview of Datasets, which attempt to combine the benefits of Dataframes and directly using RDDs. We won't cover them today, since they'll be released in full in Spark 2.0 this summer.

Code written using Dataframes is faster than code which uses RDDs!.

This is because Spark can greatly optimize queries against columnar data, and is capable of doing a lot of the work using basic (unboxed) types rather than boxed types.

Querying

Let's try some simple queries against our data!

							
scala> df.first
scala> df.show
scala> df.select($"duration",$"distance").where($"distance" > 15)
							
						

Notice the last one returns a Dataframe instead of a result! This is because everything in Spark is lazily evaluated.

Calling transformations like select() or where() always results in a new Dataframe with an optimized query.

Nothing gets returned until you actually execute the query by asking for a result via actions such as first(), take() or collect().

Common Actions

							
scala> df.select($"duration",$"distance").where($"distance" > 15).first
scala> df.select($"duration",$"distance").where($"distance" > 15).take(5)
// you can also use plain strings when you don't need the $ "column syntax"
scala> df.select("duration","distance").where("distance > 15").show
scala> df.select("duration","distance").where("distance > 15").collect
scala> df.select("duration","distance").where("distance > 15").count
							
						

Common Transformations

							
scala> df.select("duration","distance").show
// fix datatype on dropoff_lon
scala> df.selectExpr("cast(dropoff_lon as double) as longitude").show
scala> df.filter($"distance" > 15).show // same as where()
scala> df.limit(100).show
scala> :paste
df.groupBy("hack")
  .agg(max("distance"))
	.orderBy($"max(distance)".desc)
	.show
							
						

Side note: this works the same way on parquet, json, etc. Nested objects can be accessed via dot notation: df.select("person.name.first")

Go nuts!

Try answering the following questions:

  • Average location of pickups?
  • Average pickup time?
  • Taxi (hack) which carried the most passengers that day?
  • Maximum and minimum dropoff longitude? What might this mean for our data quality?
  • Maximum and minimum (sane) dropoff longitude?

Intermediate Spark

Since every Dataframe transformation returns a new Dataframe, you can always store it in a variable and query it several times:

							
scala> :paste
val df2 = df.select("hack", "duration", "distance")
            .where("distance > 15")
scala> :paste
df2.groupBy("hack")
   .agg(max("distance"))
	 .orderBy($"max(distance)".desc)
	 .show
							
						

Caching

Every time you use an action, Spark goes right back to the source data. This is slow!

We can address this by using the cache() transformation on our Dataframe:

							
scala> val cachedDf = df.cache
							
						

This is a hint to Spark that it should start keeping records in memory for future use.

A brief note about RDDs

Dataframes offer several methods such as map(), flatMap(), foreach(), and others which:
  • Operate directly on the underlying RDD
  • Return an RDD

Always use these methods deliberately and conservatively, as there are potential performance penalties. And RDDs have a completely different API :(

Datasets in Spark 2.0 should make this a lot less confusing

Data Export

Let's talk a little bit about formats!

Spark Data Formats

Spark natively supports the following export (and import) formats:

  • JSON (slow)
  • Text
  • Parquet (fastest for queries)
  • ORC (small - mostly a Hive thing)
  • JDBC

It's generally recommended to use Parquet - it's the default.

							
df.filter("distance > 40")
	.write
  .format("parquet")
  .save("output/taxi_parquet")
							
						

You should now have a nested directory output/taxi_parquet, with several partitioned files inside taxi_parquet.

Exercise

a.k.a Beyond Spark SQL

So far I've shown you how you can tackle SQL-like problems with Spark. But Spark is good for a great deal more than that.

Let's look at something a bit less numeric. Here's a sample of some data we've been collecting; tweets from Twitter concerning Donald Trump:

							
$ curl -O http://assets.oculusinfo.com/salt/sample-data/trump_20160206.json.gz
							
						
							
scala> :paste
val trump = sqlContext.read.format("json").load("trump_20160206.json.gz")
							
						

Try the following:

  • Find the most-retweeted tweet
  • Filter for tweets which are retweets
  • Find the most-retweeted tweet which doesn't itself appear in the dataset
  • Other ideas?

How would you tackle this:

  • Find the most frequently-used word in the tweet text, across the entire dataset

This is where advanced Spark topics come in:

  • RDD API
  • Map/reduce
  • Accumulators
  • etc.

But we'll have to save those for another day.

Find the most frequently-used word in the tweet text, across the entire dataset

							
scala> :paste
trump
.select("text")
.limit(1000) // drop this line if you're not in a rush
.flatMap(t => t.getString(0).trim.split("\\s+?"))
.map(w => (w, 1))
.foldByKey(0)((l, r) => l+r)
.sortBy(w => w._2, false)
.take(10)
							
						

What is Uncharted doing with Spark?

Sparkpipe

Salt