Read from hdfs with R. Brief overview of SparkR.

· · Read in about 6 min · (1173 words)

Disclaimer: originally I planned to write post about R functions/packages which allow to read data from hdfs (with benchmarks), but in the end it became more like an overview of SparkR capabilities.

Nowadays working with “big data” almost always means working with hadoop ecosystem. A few years ago this also meant that you also would have to be a good java programmer to work in such environment – even simple word count program took several dozens of lines of code. But 2-3 years ago things changed – thanks to Apache Spark with its concise (but powerful!) functional-style API. It is written in Scala, but also has java, python and recently R APIs.

Spark

I started to use Spark more than 2 years ago (and used it a lot). In most cases I use scala because

  • JVM native
  • the only fully featured – RDD level API, MLlib, GraphX, etc.
  • nice REPL
  • scala is well suited for data munging – good tradeoff between complexity and efficiency.

During this period I tried several times SparkR, but until version 1.6 it had too many rough edges. Starting from 1.6 it became a really useful tool for simple manipulations on spark data frames. Unfortunately we still do not have R user defined functions, so sparkR functionality is limited to built-in functions. Common pipelene for data scientist can be the following:

  1. read data from hdfs
  2. do some data wrangling (join/filter/etc.)
  3. optionally take subset/sample and collect data to local R session for exploratory analysis and fitting models.

Lets have a closer look into these steps.

Reading data from hdfs

Files in hdfs are usually stored in the following formats:

  1. plain txt/csv/json files
  2. sequence files. You can think of them as serialized java objects. In recent years became less popular. Also they are not portable (need custom readers), so I do not find them interesting for this post.
  3. avro (row-based)
  4. paruqet (column-based)
  5. orc (column-based)

Good news is that Spark (and SparkR!) can read json, parquet, orc with built-in read.df function and csv, avro with read.df and spark-avro, spark-csv spark packages.

Data wrangling

SparkR allows to perform dplyr-style manipulations on spark data frames. See official DataFrame and SparkR documents for details. Also I would like to highlight, that package provides quite comprehensive set of methods for manipulations on spark data frames including functions for:

  • data frames join, filter, group_by, sample
  • date / time manipulations
  • string manipulations, regular expressions
  • general math / statistical functions like sin, cos, mean, etc.

See full list of functions in package documentation.

Collecting data to local R session

However if you need to perform more complex manipulations to fit some model, you may need to collect data to local R session (and take a sample if size is too big). And here you can be unpleasantly surprised – collecting even small 50mb data frame can take minutes (see example below). Current mechanism of serialization / deserealization between R and JVM was designed primarily for exchanging meta-information (like function calls), not data. See this JIRA tikcket for details. Hopefully this issue will be fixed in the next release.

Examples and timings

First of all we need several things to be installed:

  1. hadoop. I have it installed at /opt/hadoop-2.6.0.
  2. Spark and SparkR – just download prebuilded version and unpack it. /opt/spark-1.6.0-bin-hadoop2.6 in my case.

Setting up SparkR on YARN

At work I have YARN cluster and client machine with Rstudio Server from which I usually work. To make SparkR work with Rstudio Server you should set up several system variables – SPARK_HOME, YARN_CONF_DIR, etc. You can follow official manual, but doing this each time makes me sad. The simpler way is to add this variables to ~/.Renviron.site or {R_HOME}/etc/Renviron.site(for system-wide oprions) files. Here are my configs:

SPARK_HOME=/opt/spark-1.6.0-bin-hadoop2.6
R_LIBS_SITE=${R_LIBS_SITE}:${SPARK_HOME}/R/lib
YARN_CONF_DIR=/opt/hadoop-2.6.0/etc/hadoop
JAVA_HOME=/usr/java/jdk170_64_45

Reading from hdfs to local R session

For becnhmarks we will generate small data frame with 1M rows:

N <- 1e6
k <- 1e4
df <- data.frame(V_int = sample(N, N, replace = T), 
                 V_num_1 = sample(N, N, replace = T) + 0.1,
                 V_num_2 = sample(N, N, replace = T) + 0.2, 
                 V_char_1 = rep(paste0('factor_1_', 1:k), each = N/k),
                 V_char_2 = rep(paste0('factor_2_', 1:k), each = N/k)
                 )
format(object.size(df), 'Mb')
# "27.9 Mb"

Now we will save it to disk and copy to hdfs:

write.table(df, 'test_spark.csv', sep = ',', row.names = F, col.names = F)
# command to call hadoop
hadoop_cmd <- "/opt/hadoop-2.6.0/bin/hadoop"
# copy csv from to hdfs
system2(hadoop_cmd, "fs -copyFromLocal test_spark.csv /user/dmitry.selivanov/csv/")

Now lets try to read it with SparkR and collect to local R session:

library(SparkR)
spark_env = list('spark.executor.memory' = '4g', 
                 'spark.executor.instances' = '4', 
                 'spark.executor.cores' = '4',
                 'spark.driver.memory' = '4g')
# here we use spark-csv package
# since I don't have direct internet access on my Rstudio server machine I uploded needed jars myself
# note that this is not assemlbed "fat" jar, so we also need commons-csv class
sc <- sparkR.init(master = "yarn-client", appName = "SparkR", sparkEnvir = spark_env, 
                  sparkJars=c("/home/dmitry.selivanov/packages/spark-csv_2.10-1.3.0.jar", 
                              "/home/dmitry.selivanov/packages/commons-csv-1.2.jar"))
sqlContext <- sparkRHive.init(sc)
sdf <- read.df(sqlContext, path = "/user/dmitry.selivanov/csv/test_spark.csv", 
               source = "com.databricks.spark.csv", inferSchema = "true")
# first we cache
cache(sdf)
# and trigger computation
nrow(sdf)
# now our sdf is materialized and in RAM
# lets collect it to local df
system.time(sdf_local <- collect(sdf))
# 130.927

more than 2 minutes! So at least until next release we should avoid using collect for any medium to large size data frames.

Alternatives

data.table

Here my favourite package comes in – data.table and fread function. I believe many of data.table users don’t know, that fread input can be not only a file name, but also a unix pipe!

library(data.table)
system.time(sdf_local <- fread(paste(hadoop_cmd, "fs -text /user/dmitry.selivanov/csv/test_spark.csv")))
# 4.005

This takes only 4 seconds! Antother great thing is that fs -text command can automatically choose codec for uncompressing files:

# write file splitted into 16 chunks
repartition(sdf, 16)
# save it with gzip compression
write.df(sdf, 
         path = "/user/dmitry.selivanov/csv/test_spark",
         source = "com.databricks.spark.csv", 
         codec = "org.apache.hadoop.io.compress.GzipCodec")
# read entire directory with gzipped files
system.time(sdf_local <- fread(paste(hadoop_cmd, "fs -text /user/dmitry.selivanov/csv/test_spark/*")))
# 4.784

dataconnector

One drawback of data.table::fread is that it can parse only flat files. Spark data frames can consists of nested columns (like R data frame with columns of type list). For such (usually rare) cases we can save data frame in orc format and then read it with dataconnector::orc2dataframe function.

dataconnector is new package developed HP Vertica Analytics Team (probably initially for working with DistributedR) and unfortunately not well known yet. But it is incredibly useful – it allows to:

  1. read orc and csv files from local file system or hdfs. Hope eventually we will also obtain parquet support;
  2. write arbitrary R objects directly to hdfs;

Another nice thing is that it doesn’t requre hadoop and java/RJava!

# save df in orc format
# create 
conf <- 
'{
  "webhdfsPort": YOUR_webhdfsPort_HERE,
  "hdfsPort": YOUR_hdfsPort_HERE,
  "hdfsHost": "YOUR_HOST_HERE",
  "hdfsUser": "YOUR_USERNAME_HERE"
}'
system.time(sdf_local <- orc2dataframe(
  "hdfs:///user/dmitry.selivanov/csv/test_spark/part-r-00000-1eb57e5d-2a98-489d-b2b0-a5dc44e9538b.orc", 
  hdfsConfigurationStr = conf))
# 6.330

other options

  • rhdfs and ravro packages by RevolutionAnalytics. Never tried, so can’t say anything.
  • h2o::h2o.importFile, but it can be tricky to set up h2o in hdfs-client mode.

What tools you use? Please, share your experience in comments.