Uncharted Spark Pipeline
data:image/s3,"s3://crabby-images/f7832/f783268811e6ae199d648dd21bed59e4496510a0" alt="Coverage Status"
Apache Spark is a powerful tool for distributed data processing. Enhancing and maintaining productivity on this platform involves implementing Spark scripts in a modular, testable and reusable fashion.
The Uncharted Spark Pipeline facilitates expressing individual components of Spark scripts in a standardized way so that they can be:
- connected in series (or even in a more complex dependency graph of operations)
- unit tested effectively with mock inputs
- reused and shared
Quick Start
Try the pipeline yourself using spark-shell:
$ spark-shell --packages software.uncharted.sparkpipe:sparkpipe-core:1.1.0
scala> import software.uncharted.sparkpipe.Pipe
scala> Pipe("hello").to(_+" world").run
Assuming you have a file named people.json, read a DataFrame from a file and manipulate it:
scala> :paste
import software.uncharted.sparkpipe.Pipe
import software.uncharted.sparkpipe.ops
Pipe(sqlContext)
.to(ops.core.dataframe.io.read("people.json", "json"))
.to(ops.core.dataframe.renameColumns(Map("age" -> "personAge")))
.to(_.filter("personAge > 21").count)
.run
Advanced Usage
Optional Stages
scala> import software.uncharted.sparkpipe.Pipe
scala> Pipe("hello").maybeTo(None).run // == "hello"
scala> Pipe("hello").maybeTo(Some(a => a+" world")).run // == "hello world"
Branching
import software.uncharted.sparkpipe.Pipe
val oneInjest = Pipe("some complex data injest pipeline")
val transform = oneInjest.to(_.toUpperCase())
val toHdfs = oneInjest.to(in => {
// convert to parquet and send to HDFS
})
transform.run
toHdfs.run
// or
Pipe(transform, toHdfs).run
Merging
import software.uncharted.sparkpipe.Pipe
val oneInjest = Pipe("some complex data injest pipeline")
val anotherInjest = Pipe("another complex data injest pipeline")
// You can merge up to 5 pipes this way
val transform = Pipe(oneInjest, anotherInjest).to(in => {
val oneOutput = in._1
val twoOutput = in._2
oneOutput + " and " + twoOutput
})
.run
Caching
import software.uncharted.sparkpipe.Pipe
val oneInjest = Pipe("some complex data injest pipeline")
val anotherInjest = Pipe("another complex data injest pipeline")
// merge and run
val transform = Pipe(oneInjest, anotherInjest).to(in => {
val oneOutput = in._1
val twoOutput = in._2
oneOutput + " and " + twoOutput
})
.run
// at this point, the output of every stage of every Pipe is cached
oneInjest.run // <- this will return a reference to the same String as the one used inside transform!
// this is useful, so that you can cache and reuse the same RDDs/DataFrames in multiple Pipes
// want to clear the cache?
oneInjest.reset
oneInjest.run // <- this is a new copy of the string "some complex data injest pipeline"
Included Operations
The Uncharted Spark Pipeline comes bundled with core operations which perform a variety of useful tasks, and are intended to serve as aids in implementing more domain-specific operations.
Core operations fall into the following categories:
RDD Operations
DataFrame Operations
DataSet Operations
Coming soon!