Distributed, in-memory computing for data processing
Data -> Distributed Data -> Distributed Execution -> Collect
Spark supports retrieving data from a variety of sources:
With the exception of HDFS, these data sources do not represented distributed data. Dividing data between workers is one of the most important things Spark does.
Let's look at how Spark deals with a relational database
Spark starts by looking at the maximum and minimum primary key and divides that key space into partitions.
Spark then divides the partitions among its worker nodes.
This is a bit of a simplification, since workers can have multiple executor threads. But you get the idea.
This process isn't magic!
Spark will not account for gaps in your primary key values, so the partitions might not actually be equally sized!
This is called skew and can lead to some workers doing more work than others (bad).
Spark executes parallel copies of your code against each data partition...
This mapping between source data and partitions is handled by an RDD
. RDDs
are a low-level Spark concept.
Most people use Dataframes
and Datasets
instead, which are more expressive and faster*
(and when is it the worst?)
False!
Spark really comes into play when you have so much data that you can't index it.
If you can, an index will almost* always be faster.
*Compute-heavy queries may run faster on Spark, but it's still a toss-up given the lack of an index.Spark really only works well when the algorithm you're writing can easily be distributed.
Problems that don't break down into bits which can be solved in parallel are poorly suited for Spark. Code like this will run slower on Spark than it would on a single machine!
This is far less true than the Spark docs claim.
I'll explain why later this evening...
Here's one day of data from the popular NYC Taxi Dataset
$ curl -O http://assets.oculusinfo.com/salt/sample-data/taxi_one_day.csv
Or click here
Either way, remember where you put it.
We'll need the path to that file later.
Now, let's fire up a spark-shell so we have somewhere convenient to experiment
$ cd spark-1.6.1-bin-hadoop2.6/bin
$ ./spark-shell \
--master local[2] \
--packages com.databricks:spark-csv_2.10:1.4.0
--master local[2]
--packages com.databricks:spark-csv_2.10:1.4.0
com.databricks:spark-csv_2.10:1.4.0
At this point, you should have a spark prompt:
... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.1 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. 16/03/28 18:28:21 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 16/03/28 18:28:21 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 16/03/28 18:28:28 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 16/03/28 18:28:29 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 16/03/28 18:28:32 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 16/03/28 18:28:32 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) SQL context available as sqlContext. scala>
A Dataframe is a wrapper around an RDD
which:
Let's create a Dataframe
from taxi_one_day.csv:
// enter paste mode for multi-line stuff
scala> :paste
// modify the path and paste this code into the shell
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("path/to/taxi_one_day.csv") // can also use a directory/*.csv here
// type the EOF character (CTRL+D) to exit paste mode and interpret
Check out the inferred schema:
scala> df.printSchema
root
|-- hack: string (nullable = true)
|-- license: string (nullable = true)
|-- code: string (nullable = true)
|-- flag: integer (nullable = true)
|-- type: string (nullable = true)
|-- pickup_time: timestamp (nullable = true)
|-- dropoff_time: timestamp (nullable = true)
|-- passengers: integer (nullable = true)
|-- duration: integer (nullable = true)
|-- distance: double (nullable = true)
|-- pickup_lon: double (nullable = true)
|-- pickup_lat: double (nullable = true)
|-- dropoff_lon: string (nullable = true)
|-- dropoff_lat: string (nullable = true)
spark-csv
may not infer the column datatypes correctly. Notice that pickup_lat
and pickup_lon
are doubles
, but their dropoff
counterparts are strings
.
This is likely due to the presence of a non-numeric character somewhere in the dataset.
Dataframe
s are part of Spark SQL, and have been the primary mechanism for interacting with data in Spark for several versions.
Spark 1.6 introduces a preview of Dataset
s, which attempt to combine the benefits of Dataframes and directly using RDDs. We won't cover them today, since they'll be released in full in Spark 2.0 this summer.
Code written using Dataframe
s is faster than code which uses RDD
s!.
This is because Spark can greatly optimize queries against columnar data, and is capable of doing a lot of the work using basic (unboxed) types rather than boxed types.
Let's try some simple queries against our data!
scala> df.first
scala> df.show
scala> df.select($"duration",$"distance").where($"distance" > 15)
Notice the last one returns a Dataframe instead of a result! This is because everything in Spark is lazily evaluated.
Calling transformations like select()
or where()
always results in a new Dataframe with an optimized query.
Nothing gets returned until you actually execute
the query by asking for a result via actions such as first()
, take()
or collect().
scala> df.select($"duration",$"distance").where($"distance" > 15).first
scala> df.select($"duration",$"distance").where($"distance" > 15).take(5)
// you can also use plain strings when you don't need the $ "column syntax"
scala> df.select("duration","distance").where("distance > 15").show
scala> df.select("duration","distance").where("distance > 15").collect
scala> df.select("duration","distance").where("distance > 15").count
scala> df.select("duration","distance").show
// fix datatype on dropoff_lon
scala> df.selectExpr("cast(dropoff_lon as double) as longitude").show
scala> df.filter($"distance" > 15).show // same as where()
scala> df.limit(100).show
scala> :paste
df.groupBy("hack")
.agg(max("distance"))
.orderBy($"max(distance)".desc)
.show
Side note: this works the same way on parquet, json, etc. Nested objects can be accessed via dot notation: df.select("person.name.first")
Try answering the following questions:
Since every Dataframe transformation returns a new Dataframe, you can always store it in a variable and query it several times:
scala> :paste
val df2 = df.select("hack", "duration", "distance")
.where("distance > 15")
scala> :paste
df2.groupBy("hack")
.agg(max("distance"))
.orderBy($"max(distance)".desc)
.show
Every time you use an action, Spark goes right back to the source data. This is slow!
We can address this by using the cache() transformation on our Dataframe:
scala> val cachedDf = df.cache
This is a hint to Spark that it should start keeping records in memory for future use.
map()
, flatMap()
, foreach()
, and others which:
Always use these methods deliberately and conservatively, as there are potential performance penalties. And RDDs have a completely different API :(
Let's talk a little bit about formats!
Spark natively supports the following export (and import) formats:
It's generally recommended to use Parquet - it's the default.
df.filter("distance > 40")
.write
.format("parquet")
.save("output/taxi_parquet")
You should now have a nested directory output/taxi_parquet
, with several partitioned files inside taxi_parquet
.
So far I've shown you how you can tackle SQL-like problems with Spark. But Spark is good for a great deal more than that.
Let's look at something a bit less numeric. Here's a sample of some data we've been collecting; tweets from Twitter concerning Donald Trump:
$ curl -O http://assets.oculusinfo.com/salt/sample-data/trump_20160206.json.gz
scala> :paste
val trump = sqlContext.read.format("json").load("trump_20160206.json.gz")
Try the following:
How would you tackle this:
This is where advanced Spark topics come in:
But we'll have to save those for another day.
Find the most frequently-used word in the tweet text, across the entire dataset
scala> :paste
trump
.select("text")
.limit(1000) // drop this line if you're not in a rush
.flatMap(t => t.getString(0).trim.split("\\s+?"))
.map(w => (w, 1))
.foldByKey(0)((l, r) => l+r)
.sortBy(w => w._2, false)
.take(10)