Friday, September 19, 2014

R meets Spark: Machine Learning in R and Spark through SparkR

The Hadoop ecosystem is evolving rapidly in the incorporation of big data analytical capabilities. One of the latest additions is AMPLab’s Apache Spark. In Spark, MapReduce is only one of the supported data processing paradigms.
Spark has a very rich API, supporting interactive, batch, and near-real-time processing, in your language of choice: Python, Scala, or Java. Moreover, Spark is platform independent. Hadoop YARN is just one of the supported platforms to run Spark applications.
In this post I will talk about another frontend for Spark that is being developed by the guys at AMPLab: SparkR. Its API is (still) not feature-rich as its counterparts in Python, Scala, and Java, but nevertheless it is rich enough to support MapReduce and other processing dataflows using R as your front-end.
We will do that through an example application. Actually, we will port the Logistic Regression code from a previous post. That code already leverages data distribution on HDFS and processing parallelism through Hadoop’s MapReduce. But Spark provides a significant advantage for iterative applications like that one: it has a built-in in-memory distributed cache, allowing for a much more efficient processing.
Before diving in to the code, please notice that the goal of this post is not to show you how to set up the environment to run it. For that purpose, I suggest you follow two guidelines: - Apache Spark Documentation, for hoe to build and/or deploy Spark either as a standalone cluster, or running on top of Hadoop YARN. Actually, at the writing of this post, SparkR is supported only with Spark running as a standalone cluster. - SparkR Documentation
Now let’s dive into the code:
The first step is, of course, load the SparkR library into your R environment:
# load SparkR library into R session
library(SparkR)
Then you have to instantiate a Spark Context. You can do that in two ways:
In local mode, Spark runs in a local thread pool in your JVM. It is a good way to develop and test your application, before deploying it on your cluster. The number 4 between brackets means that Spark will run in 4 local worker threads:
# instantiate a local Spark Context with 4 worker threads
sc <- sparkR.init(master='local[4]')
In cluster mode, you have to pass the Spark URL of the cluster and optionally the amount of memory and CPU you will allocate to your workers:
# instantiate a remote Spark Context with 4 executors, each one running with 1 CPU-core and 1 GB of memory
sc <- sparkR.init(master='spark://localhost:7077',
                  appName='sparkr_logistic_regression',
                  sparkEnvir=list(spark.executor.memory='1g',
                                  spark.num.executors='4',
                                  spark.executor.cores='1'))
After getting a handler to your Spark environment, through the Spark Context, you can access data already in HDFS and create a Spark RDD from that. You can also pass a minimum number of splits for that RDD, otherwise the system will assume an amount equals to the available parallelism. Ideally, you would have at least one split for each Spark worker to operate on:
# reference to a input file on HDFS
input_file <- 'hdfs://localhost/tmp/german.data-numeric'

# RDD on Spark: in SparkR RDDs are abstracted as R lists
# this RDD is constructed from a text file and partitioned in 4 chunks in the Spark cluster
input_rdd <- textFile(sc, input_file, minSplits=4)
Note that input_rdd in the code above is only a pointer to the actual Spark RDD on the cluster. We can load its data into your R session with the collect command. But it would only work as long as you have enough memory to hold it in your R session, which perhaps is not the case when working with very large datasets. For that, you could just sample a small fraction of that RDD. In the code bellow, F means sample with replacement equals false, 3L means to take 3 samples, and 0L means the seed to use for sampling:
# take 3 sample elements from the RDD, without replacement
takeSample(input_rdd, F, 3L, 0L)
[[1]]
[1] "   2   9   4  11   4   5   3   3   4  32   3   2   2   1   1   0   0   1   0   0   0   0   0   1   2 "

[[2]]
[1] "   2  48   4  51   1   3   2   3   3  30   3   1   1   2   1   0   0   1   0   0   1   0   0   0   2 "

[[3]]
[1] "   4  18   4  28   1   4   3   2   2  31   1   2   1   1   1   1   0   1   0   0   1   0   0   1   2 "
From the results above, we can see that the R representation for a Spark RDD is always a list. Is this case, for our input file, we have an R list with 1000 elements, one for each line in our original input text file.
The next step is then to parse each list element and transform it in a meaningful representation for our R Logistic Regression algorithm. This is a typical operation in Spark, easily achieved by using the MapPartitions transformation. In SparkR, this is implemented by the lapplyPartition function, which operates in a very similar way to the R lapply function. Again, the returned RDD is just a pointer to the actual RDD object in your Spark cluster:
# parse the text on each RDD element (on each partition in parallel), transforming each element in a
# numeric R vector
dataset_rdd <- lapplyPartition(input_rdd, function(part) {
  part <- lapply(part, function(x) unlist(strsplit(x, '\\s')))
  part <- lapply(part, function(x) as.numeric(x[x != '']))
  part
})
In the code above, we transform each list element into a numeric vector. Note also that this is performed in parallel, for each one of the four RDD partitions. We can inspect the content of the elements in the new RDD, by taking a sample of the RDD and analyzing its structure:
# take 3 sample elements from the RDD and inspect its content through the R str function
str(takeSample(dataset_rdd, F, 3L, 0L))
})
List of 3
 $ : num [1:25] 2 9 4 11 4 5 3 3 4 32 ...
 $ : num [1:25] 2 48 4 51 1 3 2 3 3 30 ...
 $ : num [1:25] 4 18 4 28 1 4 3 2 2 31 ...
})
Now we need to split our dataset_rdd in two: 80% to train our model and 20% to test it. We will do that through two RDD transformations, encapsuleted in one R function that returns a list with the references their results:
split_dataset <- function(rdd, ptest) {
  # create the test RDD with the first 'ptest' percentage of the input elements
  data_test_rdd <- lapplyPartition(rdd, function(part) {
    part_test <- part[1:(length(part)*ptest)]
    part_test
  })
  # create the train RDD with the remaining input elements
  data_train_rdd <- lapplyPartition(rdd, function(part) {
    part_train <- part[((length(part)*ptest)+1):length(part)]
    part_train
  })
  # return a list with the references for the test and train RDDs
  list(data_test_rdd, data_train_rdd)
}
The transformations in the code above are straightforward: we are just indexing the RDD according to the size we want, in terms of a percentage for the test dataset.
Now we have our dataset split into train and test parts, and almost ready to go into the model training. But not quite yet. We still need to transform it into R’s matrix format, which is necessary for the numeric calculations we need when training the model. As before, lapplyPartition takes care of it:
get_matrix_rdd <- function(rdd) {
  # create an RDD whose elements are R matrix chunks
  matrix_rdd <- lapplyPartition(rdd, function(part) {
    m <- Matrix(data=unlist(part, F, F), ncol=25, byrow=T)
    m <- cBind(1, m)
    m[,ncol(m)] <- m[,ncol(m)]-1
    m
  })
  matrix_rdd
}
In the code above, we see that instead of having a list with vector elements, now we have a list with matrix elements. Four matrices in our case, as we are splitting the RDD into 4 partitions. In the code we also prepare the matrix to include the intercept term (x0) and to normalize the output term (y) as 0 or 1.
The next function we need is one to balance the train matrix, so that we get the same number of positive and negative examples. This is necessary, because this dataset has 70% of positive and 30% of negative examples:
balance_matrix_rdd <- function(matrix_rdd) {
  # create an RDD with a balanced number of positive and negative examples
  balanced_matrix_rdd <- lapplyPartition(matrix_rdd, function(part) {
    y <- part[,26]
    index <- sample(which(y==0),length(which(y==1)))
    index <- c(index, which(y==1))
    part <- part[index,]
    part
  })
  balanced_matrix_rdd
}
Now we can call the functions we defined so far to prepare our train and test matrices. Then we can use a very nice feature of Spark, which is to cache in memory the objects we need frequent access to. That is useful for us, because we need to iterate over the data during our Gradient Descent execution:
# split the data into train and test sets
dataset <- split_dataset(dataset_rdd, 0.2)

# create the test data RDD
matrix_test_rdd <- get_matrix_rdd(dataset[[1]])

# create the train data RDD

matrix_train_rdd <- balance_matrix_rdd(get_matrix_rdd(dataset[[2]]))

# cache both train and test RDDs into the Spark cluster distributed memory
cache(matrix_test_rdd)
cache(matrix_train_rdd)
At this point we have our data split into 80% for training and 20% for testing, the training portion balanced in the number of positive and negative examples, and in R matrix format. But notice that this data is not in our local R environment, but cached in memory as RDDs in the Spark cluster, and partitioned according to the number of Spark executors we defined (4 un our case).
Before running the training algorithm, we need to instantiate out parameters vector and define both the hypothesis and cost functions. Those will all be local objects in R:
# initialize parameters vector theta
theta <- as.matrix(rep(0,25))

# hypothesis function
hypot <- function(z) {
  1/(1+exp(-z))
}

# gradient of cost function
gCost <- function(t,X,y) {
  1/nrow(X)*(t(X)%*%(hypot(X%*%t)-y))
}
Now we define our training function. This will perform one step of Gradient Descent in a mini-batch fashion. This means that it will compute partial gradients, one for each partition of our training data, and then sum them up. Computing partial gradients is a map-type transformation easily achieved with the lapplyPartition function, as before. What is new at this point is to use a new function, reduceByKey, to aggregate the partial gradients. As in a typical MapReduce operation, the lapplyPartition function will emit a lists of key-value pairs that will be aggregated by the reduceByKey function:
train <- function(theta, rdd) {
  # runs one step of a batch gradient computation in a subset of the input data defined by the Spark cluster
  # compute partial results
  gradient_rdd <- lapplyPartition(rdd, function(part) {
    X <- part[,1:25]
    y <- part[,26]
    p_gradient <- gCost(theta,X,y)
    list(list(1, p_gradient))
  })
  agg_gradient_rdd <- reduceByKey(gradient_rdd, '+', 1L)
  # aggregated output of one iteration
  collect(agg_gradient_rdd)[[1]][[2]]
}
From the code above, we notice a small difference in the output of the lapplyPartition when compared to the previous cases: now we are generating key-value pairs in the form of list(key, value) elements. We also see that the reduceByKey function takes 3 arguments: the RDD to operate on, an R function to execute, and the number of partitions to generate in the result. Here we are generating only 1 partition and bringing the result into the local R session through the collect function. That is a common pattern, when the data to bring to R is small enough to be loaded into memory. That is usually true for the gradient vector in a Logistic Regression.
A side note about the indexing [[1]][[2]] in the collect function above: its output is a list with one element (because we emitted only a single key); that element is also a list, whose first element is the key, and the second is the value we want to collect. Thus the notation [[1]][[2]].
Finally we get to the main loop of the Gradient Descent, where we call the train function to operate on the train data RDD until our convergence criteria is satisfied:
# cost function optimization through batch gradient descent (training)
# alpha = learning rate
# steps = iterations of gradient descent
# tol = convergence criteria
# convergence is measured by comparing L2 norm of current gradient and previous one
alpha <- 0.05
tol <- 1e-4
step <- 1
while(T) {
  cat("step: ",step,"\n")
  p_gradient <- train(theta, matrix_train_rdd)
  theta <- theta-alpha*p_gradient
  gradient <- train(theta, matrix_train_rdd)
  if(abs(norm(gradient,type="F")-norm(p_gradient,type="F"))<=tol) break
  step <- step+1
}
After convergence, we are ready to apply our model to the test data. Here I opted for bringing the test dataset into R’s local session (again, using the collect function). This would be a common situation in a real usage scenario, where you have your train data very large to have into your local R session, but you have enough memory to hold your test data:
# hypothesis testing
# counts the predictions from the test set classified as 'good' and ' bad' credit and
# compares with the actual values
data_test <- collect(matrix_test_rdd)
data_test <- lapply(data_test, function(x) as.matrix(x))
data_test <- do.call(rbind, data_test)
X_test <- data_test[,1:25]
y_test <- data_test[,26]
y_pred <- as.matrix(hypot(X_test%*%theta))
result <- xor(as.vector(round(y_pred)),as.vector(y_test))
corrects = length(result[result==F])
wrongs = length(result[result==T])
cat("\ncorrects: ",corrects,"\n")
cat("wrongs: ",wrongs,"\n")
cat("accuracy: ",corrects/length(y_pred),"\n")
cat("\nCross Tabulation:\n")
print(table(y_test,round(y_pred), dnn=c('y_test','y_pred')))
Hopefully this example was useful to illustrate how you can use R as another front-end option for your Spark cluster. As of today, the SparkR package is not so feature-rich as its counterparts in Python, Scala, and Java. It is still in its early infancy (alpha release), but it is being rapidly developed. The split-apply-combine processing pattern provided by Spark fits very well in the R environment. Such constructs are familiar for R developers, providing for a very rapid learning curve of the main Spark processing capabilities.

©2014 - Alexandre Vilcek