Dmitriy Selivanov — written Feb 20, 2016 — source
Disclaimer: originally I planned to write post about R functions/packages which allow to read data from hdfs (with benchmarks), but in the end it became more like an overview of SparkR capabilities.
Nowadays working with “big data” almost always means working with hadoop ecosystem. A few years ago this also meant that you also would have to be a good java programmer to work in such environment – even simple word count program took several dozens of lines of code. But 2-3 years ago things changed – thanks to Apache Spark with its concise (but powerful!) functional-style API. It is written in Scala, but also has java, python and recently R APIs.
I started to use Spark more than 2 years ago (and used it a lot). In most cases I use scala because
During this period I tried several times SparkR, but until version 1.6 it had too many rough edges. Starting from 1.6 it became a really useful tool for simple manipulations on spark data frames. Unfortunately we still do not have R user defined functions, so sparkR functionality is limited to built-in functions. Common pipelene for data scientist can be the following:
Lets have a closer look into these steps.
Files in hdfs are usually stored in the following formats:
Good news is that Spark (and SparkR!) can read json
, parquet
, orc
with built-in read.df
function and csv
, avro
with read.df
and spark-avro, spark-csv spark packages.
SparkR allows to perform dplyr-style manipulations on spark data frames. See official DataFrame and SparkR documents for details. Also I would like to highlight, that package provides quite comprehensive set of methods for manipulations on spark data frames including functions for:
join
, filter
, group_by
, sample
sin
, cos
, mean
, etc.See full list of functions in package documentation.
However if you need to perform more complex manipulations to fit some model, you may need to collect data to local R session (and take a sample if size is too big). And here you can be unpleasantly surprised – collecting even small 50mb data frame can take minutes (see example below). Current mechanism of serialization / deserealization between R and JVM was designed primarily for exchanging meta-information (like function calls), not data. See this JIRA tikcket for details. Hopefully this issue will be fixed in the next release.
First of all we need several things to be installed:
/opt/hadoop-2.6.0
./opt/spark-1.6.0-bin-hadoop2.6
in my case.At work I have YARN cluster and client machine with Rstudio Server from which I usually work. To make SparkR work with Rstudio Server you should set up several system variables – SPARK_HOME
, YARN_CONF_DIR
, etc. You can follow official manual, but doing this each time makes me sad. The simpler way is to add this variables to ~/.Renviron.site
or {R_HOME}/etc/Renviron.site
(for system-wide oprions) files. Here are my configs:
SPARK_HOME=/opt/spark-1.6.0-bin-hadoop2.6
R_LIBS_SITE=${R_LIBS_SITE}:${SPARK_HOME}/R/lib
YARN_CONF_DIR=/opt/hadoop-2.6.0/etc/hadoop
JAVA_HOME=/usr/java/jdk170_64_45
For becnhmarks we will generate small data frame with 1M rows:
Now we will save it to disk and copy to hdfs:
Now lets try to read it with SparkR and collect to local R session:
more than 2 minutes! So at least until next release we should avoid using collect
for any medium to large size data frames.
Here my favourite package comes in – data.table and fread
function. I believe many of data.table
users don’t know, that fread
input can be not only a file name, but also a unix pipe!
This takes only 4 seconds! Antother great thing is that fs -text
command can automatically choose codec for uncompressing files:
One drawback of data.table::fread
is that it can parse only flat files. Spark data frames can consists of nested columns (like R data frame with columns of type list
). For such (usually rare) cases we can save data frame in orc
format and then read it with dataconnector::orc2dataframe
function.
dataconnector
is new package developed HP Vertica Analytics Team (probably initially for working with DistributedR) and unfortunately not well known yet. But it is incredibly useful – it allows to:
orc
and csv
files from local file system or hdfs. Hope eventually we will also obtain parquet support;Another nice thing is that it doesn’t requre hadoop and java/RJava!
h2o::h2o.importFile
, but it can be tricky to set up h2o in hdfs-client mode.What tools you use? Please, share your experience in comments.
Tweet