R lets you write data analysis code quickly. With a bit of care, you can also make your code easy to read, which means that you can easily maintain your code too. In many cases, R is also fast enough at running your code.

Unfortunately, R requires that all your data be analyzed in main memory (RAM), on a single machine. This limits how much data you can analyze using R. There are a few solutions to this problem, including using Spark.

Apache Spark is a unified analytics engine for large-scale data processing. It is an open source cluster computing platform. That means that you can spread your data and your computations across multiple machines, effectively letting you analyze an unlimited amount of data. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance.

Spark also supports a pseudo-distributed local mode, usually used only for development or testing purposes, where distributed storage is not required and the local file system can be used instead; in such a scenario, Spark is run on a single machine with one executor per CPU core.

sparklyr is an R package that lets you write R code to work with data in a Spark cluster. It has a dplyr interface, which means that you can write (more or less) the same dplyr-style R code, whether you are working with data on your machine or on a Spark cluster.

Besides data manipulation with dplyr and SQL, sparklyr package also supports the following libraries:

Connect

Working with sparklyr is very much like working with dplyr when you have data inside a database. In fact, sparklyr converts your R code into SQL code before passing it to Spark.

The typical workflow has three steps:

  1. Connect to Spark using spark_connect().
  2. Do some work.
  3. Close the connection to Spark using spark_disconnect().

spark_connect() takes a URL that gives the location to Spark. For a local cluster (as you are running), the URL should be “local”. For a remote cluster (on another machine, typically a high-performance server), the connection string will be a URL and port to connect on.

One word of warning. Connecting to a cluster takes several seconds, so it is impractical to regularly connect and disconnect. It is usually best to keep the connection open for the whole time that you want to work with Spark.

# Load sparklyr
library(sparklyr)

# # install a local version of Spark for development purposes (only once!)
# spark_install()

# set Java home to Java 8 (only working with Java 8 at the moment)
Sys.setenv(JAVA_HOME = "/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home")

# Connect to your Spark cluster
sc <- spark_connect("local")

# Print the version of Spark
spark_version(sc)
## [1] '2.3.0'

Copy

Before you can do any real work using Spark, you need to get your data into it. It is useful to be able to copy data from R to Spark. This is done with dplyr’s copy_to() function. Be warned: copying data is a fundamentally slow process. copy_to() takes two arguments: a Spark connection (dest), and a data frame (df) to copy over to Spark.

Once you have copied your data into Spark, you might want some reassurance that it has actually worked. You can see a list of all the data frames stored in Spark using src_tbls(), which simply takes a Spark connection argument (x).

# copy data: create a Spark table flights
library(dplyr)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")

# show tables
src_tbls(sc)
## [1] "flights"
# look inside flights Spark table
flights_tbl
## # Source: spark<flights> [?? x 19]
##     year month   day dep_time sched_dep_time dep_delay arr_time
##    <int> <int> <int>    <int>          <int>     <dbl>    <int>
##  1  2013     1     1      517            515         2      830
##  2  2013     1     1      533            529         4      850
##  3  2013     1     1      542            540         2      923
##  4  2013     1     1      544            545        -1     1004
##  5  2013     1     1      554            600        -6      812
##  6  2013     1     1      554            558        -4      740
##  7  2013     1     1      555            600        -5      913
##  8  2013     1     1      557            600        -3      709
##  9  2013     1     1      557            600        -3      838
## 10  2013     1     1      558            600        -2      753
## # … with more rows, and 12 more variables: sched_arr_time <int>,
## #   arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <dttm>

In the previous command, when you copied the data to Spark, copy_to() returned a value. This return value is a special kind of tibble() that doesn’t contain any data of its own. Indeed, dplyr allows them to store data from a remote data source, such as databases, and – as is the case here – Spark. For remote datasets, the tibble object simply stores a connection to the remote data.

On the Spark side, the data is stored in a variable called a DataFrame. This is a more or less direct equivalent of R’s data frame. Since these types are also analogous to database tables, sometimes the term table will also be used to describe this sort of rectangular data.

Calling tbl() with a Spark connection, and a string naming the Spark data frame will return the same tibble object that was returned when you used copy_to().

# Link to the track_metadata table in Spark
flights_tbl <- tbl(sc, "flights")

# which class it belongs to
class(flights_tbl)
## [1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"
# how big the dataset is (we don't know in fact!)
dim(flights_tbl)
## [1] NA 19
# try 
#View(flights_tbl)

Query

The easiest way to manipulate data frames stored in Spark is to use dplyr syntax.

flight_delay <- 
  flights_tbl %>% 
  group_by(tailnum) %>%
  summarise(count = n(), 
            dist = mean(distance, na.rm = TRUE), 
            delay = mean(arr_delay, na.rm = TRUE)) %>%
  mutate(delay_by_distance = delay / dist) %>%
  filter(count > 20, dist < 2000, !is.na(delay)) %>%
  arrange(desc(delay_by_distance))

flight_delay
## # Source:     spark<?> [?? x 5]
## # Ordered by: desc(delay_by_distance)
##    tailnum count  dist delay delay_by_distance
##    <chr>   <dbl> <dbl> <dbl>             <dbl>
##  1 N645MQ     25  480.  51              0.106 
##  2 N832AS    163  228.  23.4            0.102 
##  3 N8475B     35  326.  32.4            0.0993
##  4 N8683B     42  385.  35.8            0.0930
##  5 N835AS    194  228.  20.1            0.0881
##  6 N828AS    208  228.  20.0            0.0875
##  7 N8646A     38  353.  30.1            0.0852
##  8 N942MQ     44  462.  38.3            0.0830
##  9 N834AS    173  229.  18.7            0.0820
## 10 N908MQ     22  472   38.5            0.0816
## # … with more rows

Mind that sparklyr converts your dplyr code into SQL code before passing it to Spark. That means that most but not all dplyr queries can be submitted to sparklyr. For instance, there is no median function in SQL:

flights_tbl %>% 
  group_by(tailnum) %>%
  summarise(median_dist = median(distance))

There are lots of reasons that you might want to move your data from Spark to R. You’ve already seen how some data is moved from Spark to R when you print it. You also need to collect your dataset if you want to plot it, or if you want to use a modeling technique that is not available in Spark.

library(ggplot2)

ggplot(flight_delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)

collected_flight_delay <- flight_delay %>%
  collect()

class(flight_delay)
## [1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"
class(collected_flight_delay)
## [1] "tbl_df"     "tbl"        "data.frame"
ggplot(collected_flight_delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)

system.time(
  ggplot(flight_delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)
)
##    user  system elapsed 
##   0.119   0.002   0.811
system.time(
  ggplot(collected_flight_delay, aes(dist, delay)) +
  geom_point(aes(size = count), alpha = 1/2) +
  geom_smooth() +
  scale_size_area(max_size = 2)
)
##    user  system elapsed 
##   0.003   0.000   0.003

Sometimes you need to store intermediate results, since you can’t pipeline the whole analysis. Then we are facing a dilemma: you need to store the results of intermediate calculations, but you don’t want to collect them because it is slow. The solution is to use ´compute()´ to compute the calculation, but store the results in a temporary data frame on Spark. Compute takes two arguments: a tibble, and a variable name for the Spark data frame that will store the results.

computed_flight_delay <- flight_delay %>%
  compute("flight_deleay")

class(computed_flight_delay)
## [1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"
class(collected_flight_delay)
## [1] "tbl_df"     "tbl"        "data.frame"
src_tbls(sc)
## [1] "flight_deleay" "flights"

As previously mentioned, when you use the dplyr interface, sparklyr converts your code into SQL before passing it to Spark. Most of the time, this is what you want. However, you can also write raw SQL to accomplish the same task. If you want your code to be portable – that is, used outside of R as well – then it may be useful. For example, a fairly common workflow is to use sparklyr to experiment with data processing, then switch to raw SQL in a production environment.

library(DBI)
query = "select month, day, count(*) as count 
from flights
group by month, day
having count > 365
order by -count"

# evaluate the query and move all the results to R
dbGetQuery(sc, query) %>% head(10)
##    month day count
## 1     11  27  1014
## 2      7  11  1006
## 3      7  10  1004
## 4     12   2  1004
## 5      7   8  1004
## 6      7  18  1003
## 7      7  25  1003
## 8      7  12  1002
## 9      8   8  1001
## 10     7   9  1001

When you are done, you can disconnect from Spark:

# Disconnect from Spark
spark_disconnect(sp)

For more information see Introduction to Spark in R using sparklyr DataCamp course by Richie Cotton.