makeParallel concepts

This vignette explains the concepts used by makeParallel. If you would like to quickly get started then see the vignette titled “makeParallel Quick Start”, vignette("quickstart", package = "makeParallel").

This project is experimental and ambitious. To write parallel R today consider using an established package, such as the recommended parallel package which has been included with R since version 2.14. The CRAN Task View High-Performance and Parallel Computing with R provides a thorough review of available third party software.

Big Picture

makeParallel makes code parallel. It statically analyzes entire scripts to detect dependencies between statements and implements a modular system for generating new code.

The appeal of this approach is that we don’t have to change our code to make it parallel. We can write higher level code that also performs better. By allowing this system to change our code we can benefit from underlying improvements in the system, and change code in ways that we may have never thought of, or that were manually infeasible. This is akin to an optimizing compiler. As the compiler becomes smarter, the same programs run faster, with no change to the underlying code. makeParallel creates R code from R code, so in this sense it acts as a transpiler.

Comparison to conventional approach

Most conventional approaches to parallel programming require the user to specify when and where the program should run in parallel. They do this by providing their own programming models and application programming interfaces. The conventional approach has several advantages. First of all, it works. It scales up to huge clusters. Well defined interfaces simplify understanding and debugging. The approach also has disadvantages. It usually requires programmers to use more knowledge of the underlying system, data, and computations to use it effectively.

In contrast to the conventional approach, makeParallel takes general R code and automatically transforms it into parallel code. Thus the system must determine a correct and reasonably efficient way to execute parts of the program in parallel.

makeParallel builds on existing infrastructure wherever possible. For example, it uses parallel::makeCluster to initialize clusters of workers communicating over sockets. Indeed, the conventional approaches provide the lower level tools.

Basic Model

The following diagram illustrates the computational model for the steps in inferring parallelism as implemented in the function makeParallel.

basic model

basic model

In this diagram the rectangles represent base classes defined in this package, and the edges represent generic methods. The diagram depicts the following:

  1. The end user supplies some input code, which could be the name of a file or an actual expression.
  2. inferGraph statically analyzes the code and constructs a dependency graph between the expressions currently based on use-definition chains. A future version of the software will allow other types of constraints, such as requiring two expressions to run on the same processor.
  3. schedule assigns each expression to a processor and orders the expression on each processor, while respecting the constraints of the dependency graph.
  4. generate creates executable code from the schedule.
  5. writeCode simply writes the code to a file and returns the expression containing the code.

These steps are all designed to be modular. The most interesting step is the scheduler. In general scheduling a task graph is NP hard, and many algorithms exist to solve this problem.

Data Parallelism

The default scheduler produces a MapSchedule which represents data parallelism. This captures the common and useful case of “embarrassingly parallel” operations. These come from replacing apply type functions with equivalent parallel versions. The current implementation is basic and does the following:

Future versions may include the following features. If you need these sooner then get in touch.

Task Parallelism

Task parallelism means two or more workers execute two or more different blocks of code simultaneously.

The only way to get a speedup from task parallelism is if the dependency graph allows two or more long running parts of the program to run simultaneously. This could happen when reading two large files, for example. scheduleTaskList implements a standard list scheduling algorithm. The algorithm needs the execution time for each expression to work properly so we pass it in explicitly below.

library(makeParallel)

code = parse(text = "
    x = read.csv(xfile)
    y = read.csv(yfile)
    xy = sort(c(x[, 1], y[, 1]))
    ")

pcode = makeParallel(code, scheduler = scheduleTaskList
    , exprTime = c(1.25, 1.24, 0.2))

plot(schedule(pcode))

This plot illustrates the schedule returned from the scheduling algorithm. The completion time of the script is the last time a processor is busy on the graph. More efficient schedules complete earlier. This plot is a useful diagnostic- if all cores are mostly busy that’s a good sign. If only one core does almost all the work then either 1) the code can’t be improved through task parallelism, or 2) the default scheduling algorithm chose a poor schedule. The task scheduling problem is a very rich problem with many published algorithms, so we leave hooks in for users to supply their own scheduling algorithm.

The generated code is a beastly thing that no sane person would ever write by hand. But it does have a few virtues:

writeCode(pcode)
## expression(library(parallel), nworkers = 2, timeout = 600, cls = makeCluster(nworkers, 
##     "PSOCK"), workers = vector(nworkers, mode = "list"), close.NULL = function(...) NULL, 
##     connect = function(server, client, port, timeout, sleep = 0.1, 
##         ...) {
##         if (ID == server) {
##             con = socketConnection(port = port, server = TRUE, 
##                 blocking = TRUE, open = "a+b", timeout = timeout, 
##                 ...)
##             workers[[client]] <<- con
##         }
##         if (ID == client) {
##             Sys.sleep(sleep)
##             con = socketConnection(port = port, server = FALSE, 
##                 blocking = TRUE, open = "a+b", timeout = timeout, 
##                 ...)
##             workers[[server]] <<- con
##         }
##         NULL
##     }, environment(connect) = environment(close.NULL) = .GlobalEnv, 
##     clusterExport(cls, c("workers", "connect", "close.NULL"), 
##         envir = environment()), clusterMap(cls, assign, "ID", 
##         seq(nworkers), MoreArgs = list(envir = .GlobalEnv)), 
##     socket_map = read.csv(text = "\n\"server\",\"client\",\"port\"\n1,2,33000\n"), 
##     by(socket_map, seq(nrow(socket_map)), function(x) {
##         clusterCall(cls, connect, x$server, x$client, x$port, 
##             timeout = timeout)
##     }), worker_code = c("if(ID != 1)\n    stop(sprintf(\"Worker is attempting to execute wrong code.\nThis code is for 1, but manager assigned ID %s\", ID))\n\nx = read.csv(xfile)\ny <- unserialize(workers[[2]])\nxy = sort(c(x[, 1], y[, 1]))", 
##         "if(ID != 2)\n    stop(sprintf(\"Worker is attempting to execute wrong code.\nThis code is for 2, but manager assigned ID %s\", ID))\n\ny = read.csv(yfile)\nserialize(y, workers[[1]], xdr = FALSE)"), 
##     evalg = function(codestring) {
##         code = parse(text = codestring)
##         eval(code, .GlobalEnv)
##         NULL
##     }, parLapply(cls, worker_code, evalg), clusterEvalQ(cls, 
##         lapply(workers, close)), stopCluster(cls))

Customizability

makeParallel allows us to pass in functions that customize the behavior. For example, we may want the generated code to explicitly set the number of parallel workers to N as specified by the option mc.cores in the parallel package. The following function prepends the expression options(mc.cores = N) to the generated code.

coresGenerate <- function(schedule, mc.cores = 2L, ...)
{
    # Rely on the method dispatch for the actual work.
    out <- generate(schedule, ...)

    # Construct an expression containing the desired code.
    setCores <- substitute(options(mc.cores = MC_CORES)
                          , list(MC_CORES = mc.cores))

    # Combine the newly constructed expression with what would have been
    # generated otherwise.
    out@code <- c(setCores, writeCode(out))
    out
}

We can use this function as follows:

lapplyCode <- parse(text = "
    x <- list(a = 1:10, beta = exp(-3:3), logic = c(TRUE,FALSE,FALSE,TRUE))
    m1 <- lapply(x, mean)
")

transformed <- makeParallel(lapplyCode, generator = coresGenerate,
                            generatorArgs = list(mc.cores = 3L))

When we extract the code from this object with writeCode we see that it sets options(mc.cores = 3L) as the first line.

writeCode(transformed)
## expression(options(mc.cores = 3L), x <- list(a = 1:10, beta = exp(-3:3), 
##     logic = c(TRUE, FALSE, FALSE, TRUE)), m1 <- parallel::mclapply(x, 
##     mean))

Extensibility

Some schedulers must be tied to their code generators. inferGraph, schedule, and generate are all generic functions, so we can allow user defined classes to extend the system through R’s S4 object oriented programming system.

Building on the example above, we can define a class WorkerMapSchedule containing MapSchedule that adds a slot for mc.cores.

setClass("WorkerMapSchedule", slots = c(mc.cores = "integer"), contains = "MapSchedule")

Here’s a helper constructor function:

workerMapSchedule = function(graph, mc.cores = 2L, ...)
{
    message(sprintf("User defined scheduler, mc.cores = %s", mc.cores))
    out = mapSchedule(graph, ...)
    new("WorkerMapSchedule", out, mc.cores = mc.cores)
}

Now we need to associate a code generator with WorkerMapSchedule. We can use an existing one or we can define our own. Since we’ve already defined coresGenerate above we’ll just wrap that.

setMethod("generate", "WorkerMapSchedule", function(schedule, ...)
    coresGenerate(as(schedule, "MapSchedule"), mc.cores = schedule@mc.cores, ...)
)
## [1] "generate"

Finally, we can use the code as follows:

transformed <- makeParallel(code, scheduler = workerMapSchedule, mc.cores = 3L)
## User defined scheduler, mc.cores = 3
writeCode(transformed)
## expression(options(mc.cores = 3L), x = read.csv(xfile), y = read.csv(yfile), 
##     xy = sort(c(x[, 1], y[, 1])))

In this section we went beyond the basic customization in the previous section in two ways. First, we extended the existing class hierarchy by defining our own scheduler. Second, we defined methods and relied on method dispatch to control some aspects of the code generation process. We did not have to touch the dependency graph computations.

Summary

Users can specify the scheduling and code generation steps by passing functions or defining methods. Here’s an associated mental model for the possible steps taken by the makeParallel function.

extensible model

extensible model

The shaded nodes are not yet fully implemented.

In summary, makeParallel can be used in the following ways:

Static Analysis

Code transformation relies on static code analysis. This means we don’t actually run any code until the user specifically asks for it. The CodeDepends package currently provides most of the underlying tools.

As we build more tools that may be useful for general purpose static R code analyis we’ve been putting them in the CodeAnalysis package. makeParallel will most likely come to depend on CodeAnalysis.

Plans

The following features are planned, but not yet implemented.

Short Term

  1. Handle calls to library()
  2. Generate code for Windows
  3. Use measured information on expression run time and object sizes
  4. Add constraints for processes, ie. code that performs plotting must happen in one process
  5. Show which code actually changed, perhaps by providing a diff() method
  6. More efficient scheduling algorithms
  7. Detection of objects with reference semantics to handle appropriately

Longer Term

  1. Identify conditional versus unconditional data dependencies and handle appropriately
  2. Handle output dependencies aka Write After Write (WAW) dependencies.
  3. Allow users to pass information about the data, ie. dimensions and types.
  4. Infer dimensions and run time when possible.
  5. Generate code that uses systems besides R, for example Apache Hive.

Challenges

Some R code is more difficult to parallelize. This package currently doesn’t handle the following cases.

  1. Non standard evaluation (NSE) complicates static code analysis because it changes the semantics of the language. Familiar R functions with NSE include library() and the ~ syntax for modeling. Some third party packages use NSE extensively. Examples include data.table and dplyr.
  2. Objects with reference semantics can change in method calls, so they require much more synchronization to run in parallel. Environments, reference classes, and R6 classes are examples of such objects.

Acknowledgements

Thanks Duncan Temple Lang and Nick Ulle for providing feedback.

Thanks to all those who have let me look at their code, especially Scott Devine, Nistara Randhawa, and Lynna Chu.