R lets you write data analysis code quickly. With a bit of care, you can also make your code easy to read, which means that you can easily maintain your code too. In many cases, R is also fast enough at running your code.
Unfortunately, R requires that all your data be analyzed in main memory (RAM), on a single machine. This limits how much data you can analyze using R. There are a few solutions to this problem, including using Spark.
Apache Spark is a unified analytics engine for large-scale data processing. It is an open source cluster computing platform. That means that you can spread your data and your computations across multiple machines, effectively letting you analyze an unlimited amount of data. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.
Spark also supports a pseudo-distributed local mode, usually used only for development or testing purposes, where distributed storage is not required and the local file system can be used instead; in such a scenario, Spark is run on a single machine with one executor per CPU core.
sparklyr is an R package that lets you write R code to work with data in a Spark cluster. It has a dplyr interface, which means that you can write (more or less) the same dplyr-style R code, whether you are working with data on your machine or on a Spark cluster.
Besides data manipulation with dplyr and SQL, sparklyr package also supports the following libraries:
Working with sparklyr is very much like working with dplyr when you have data inside a database. In fact, sparklyr converts your R code into SQL code before passing it to Spark.
The typical workflow has three steps:
spark_connect()
.spark_disconnect()
.spark_connect()
takes a URL that gives the location to Spark. For a local cluster (as you are running), the URL should be “local”. For a remote cluster (on another machine, typically a high-performance server), the connection string will be a URL and port to connect on.
One word of warning. Connecting to a cluster takes several seconds, so it is impractical to regularly connect and disconnect. It is usually best to keep the connection open for the whole time that you want to work with Spark.
# Load sparklyr
library(sparklyr)
# # install a local version of Spark for development purposes (only once!)
# spark_install()
# set Java home to Java 8 (only working with Java 8 at the moment)
Sys.setenv(JAVA_HOME = "/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home")
# Connect to your Spark cluster
sc <- spark_connect("local")
# Print the version of Spark
spark_version(sc)
## [1] '2.3.0'
Before you can do any real work using Spark, you need to get your data into it. It is useful to be able to copy data from R to Spark. This is done with dplyr’s copy_to()
function. Be warned: copying data is a fundamentally slow process. copy_to()
takes two arguments: a Spark connection (dest), and a data frame (df) to copy over to Spark.
Once you have copied your data into Spark, you might want some reassurance that it has actually worked. You can see a list of all the data frames stored in Spark using src_tbls()
, which simply takes a Spark connection argument (x).
# copy data: create a Spark table flights
library(dplyr)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
# show tables
src_tbls(sc)
## [1] "flights"
# look inside flights Spark table
flights_tbl
## # Source: spark<flights> [?? x 19]
## year month day dep_time sched_dep_time dep_delay arr_time
## <int> <int> <int> <int> <int> <dbl> <int>
## 1 2013 1 1 517 515 2 830
## 2 2013 1 1 533 529 4 850
## 3 2013 1 1 542 540 2 923
## 4 2013 1 1 544 545 -1 1004
## 5 2013 1 1 554 600 -6 812
## 6 2013 1 1 554 558 -4 740
## 7 2013 1 1 555 600 -5 913
## 8 2013 1 1 557 600 -3 709
## 9 2013 1 1 557 600 -3 838
## 10 2013 1 1 558 600 -2 753
## # … with more rows, and 12 more variables: sched_arr_time <int>,
## # arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## # origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## # minute <dbl>, time_hour <dttm>
In the previous command, when you copied the data to Spark, copy_to()
returned a value. This return value is a special kind of tibble()
that doesn’t contain any data of its own. Indeed, dplyr allows them to store data from a remote data source, such as databases, and – as is the case here – Spark. For remote datasets, the tibble object simply stores a connection to the remote data.
On the Spark side, the data is stored in a variable called a DataFrame. This is a more or less direct equivalent of R’s data frame. Since these types are also analogous to database tables, sometimes the term table will also be used to describe this sort of rectangular data.
Calling tbl()
with a Spark connection, and a string naming the Spark data frame will return the same tibble object that was returned when you used copy_to()
.
# Link to the track_metadata table in Spark
flights_tbl <- tbl(sc, "flights")
# which class it belongs to
class(flights_tbl)
## [1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
# how big the dataset is (we don't know in fact!)
dim(flights_tbl)
## [1] NA 19
# try
#View(flights_tbl)
The easiest way to manipulate data frames stored in Spark is to use dplyr syntax.
flight_delay <-
flights_tbl %>%
group_by(tailnum) %>%
summarise(count = n(),
dist = mean(distance, na.rm = TRUE),
delay = mean(arr_delay, na.rm = TRUE)) %>%
mutate(delay_by_distance = delay / dist) %>%
filter(count > 20, dist < 2000, !is.na(delay)) %>%
arrange(desc(delay_by_distance))
flight_delay
## # Source: spark<?> [?? x 5]
## # Ordered by: desc(delay_by_distance)
## tailnum count dist delay delay_by_distance
## <chr> <dbl> <dbl> <dbl> <dbl>
## 1 N645MQ 25 480. 51 0.106
## 2 N832AS 163 228. 23.4 0.102
## 3 N8475B 35 326. 32.4 0.0993
## 4 N8683B 42 385. 35.8 0.0930
## 5 N835AS 194 228. 20.1 0.0881
## 6 N828AS 208 228. 20.0 0.0875
## 7 N8646A 38 353. 30.1 0.0852
## 8 N942MQ 44 462. 38.3 0.0830
## 9 N834AS 173 229. 18.7 0.0820
## 10 N908MQ 22 472 38.5 0.0816
## # … with more rows
Mind that sparklyr converts your dplyr code into SQL code before passing it to Spark. That means that most but not all dplyr queries can be submitted to sparklyr. For instance, there is no median function in SQL:
flights_tbl %>%
group_by(tailnum) %>%
summarise(median_dist = median(distance))
There are lots of reasons that you might want to move your data from Spark to R. You’ve already seen how some data is moved from Spark to R when you print it. You also need to collect your dataset if you want to plot it, or if you want to use a modeling technique that is not available in Spark.
library(ggplot2)
ggplot(flight_delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)
collected_flight_delay <- flight_delay %>%
collect()
class(flight_delay)
## [1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
class(collected_flight_delay)
## [1] "tbl_df" "tbl" "data.frame"
ggplot(collected_flight_delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)
system.time(
ggplot(flight_delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)
)
## user system elapsed
## 0.119 0.002 0.811
system.time(
ggplot(collected_flight_delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)
)
## user system elapsed
## 0.003 0.000 0.003
Sometimes you need to store intermediate results, since you can’t pipeline the whole analysis. Then we are facing a dilemma: you need to store the results of intermediate calculations, but you don’t want to collect them because it is slow. The solution is to use ´compute()´ to compute the calculation, but store the results in a temporary data frame on Spark. Compute takes two arguments: a tibble, and a variable name for the Spark data frame that will store the results.
computed_flight_delay <- flight_delay %>%
compute("flight_deleay")
class(computed_flight_delay)
## [1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
class(collected_flight_delay)
## [1] "tbl_df" "tbl" "data.frame"
src_tbls(sc)
## [1] "flight_deleay" "flights"
As previously mentioned, when you use the dplyr interface, sparklyr converts your code into SQL before passing it to Spark. Most of the time, this is what you want. However, you can also write raw SQL to accomplish the same task. If you want your code to be portable – that is, used outside of R as well – then it may be useful. For example, a fairly common workflow is to use sparklyr to experiment with data processing, then switch to raw SQL in a production environment.
library(DBI)
query = "select month, day, count(*) as count
from flights
group by month, day
having count > 365
order by -count"
# evaluate the query and move all the results to R
dbGetQuery(sc, query) %>% head(10)
## month day count
## 1 11 27 1014
## 2 7 11 1006
## 3 7 10 1004
## 4 12 2 1004
## 5 7 8 1004
## 6 7 18 1003
## 7 7 25 1003
## 8 7 12 1002
## 9 8 8 1001
## 10 7 9 1001
When you are done, you can disconnect from Spark:
# Disconnect from Spark
spark_disconnect(sp)
For more information see Introduction to Spark in R using sparklyr DataCamp course by Richie Cotton.