Distributed parallel random numbers

This vignette assumes some familiarity with the dust approach to random numbers, though we provide pointers below to the relevant documentation for further discussion.

The random number generator in dust is well suited to parallel calculations. On a single node we can set up a generator with many streams and then parallelise the calculation using OpenMP. The number of streams is either one per core or, typically better, one per logical unit of calculation (which may be the maximum number of threads, for example one stream per loop iteration). This is the approach taken by dust objects (vignette("dust")) and described for standalone use of the generators (vignette("rng_package")).

When parallelising, you may also have separate ‘nodes’, which could be separate processors, physical machines, or just processes on the same machine. The difference between threads is that nodes will not share any memory with each other during processing.

Some additional care is required to use multiple nodes for the calculation so their streams do not overlap (“distributed parallel computation”), and this vignette explains the problems and solutions. It applies to both use with dust objects and for standalone use of the random number generators.

A note on seeding

To seed multiple streams we begin by seeding the first generator. Practically how this happens can be ignored for now.

r <- dust::dust_rng$new(seed = 1)
r$state()
#>  [1] c1 5c 02 89 ec 2d 0a 91 1e 61 39 74 08 ab 41 5e c3 86 8d 6d f4 02 8a b1 56
#> [26] 49 ee d9 dd 95 81 e2

To seed the next stream we “jump ahead” in the stream. The generators provide pre-computed coefficients that jump the state of the generator to a state equal to drawing a very large number of random numbers (or equivalently steps of the Markov chains). For the default generator (xoshiro256+) this is 2^128 draws, equal to the square root of the period of the generator (22561).

r$jump()
r$state()
#>  [1] 15 6e 10 9b 13 4b 6b 88 3a 50 b0 f1 ef 99 11 5c 6f 68 1c 0f e8 97 17 40 da
#> [26] c7 c6 2e 27 b7 b7 5d

Constructing a generator with more than one stream does this automatically:

r2 <- dust::dust_rng$new(seed = 1, n_streams = 4)
matrix(r2$state(), 32)
#>       [,1] [,2] [,3] [,4]
#>  [1,]   c1   15   56   f9
#>  [2,]   5c   6e   fc   32
#>  [3,]   02   10   6d   84
#>  [4,]   89   9b   df   e6
#>  [5,]   ec   13   7b   30
#>  [6,]   2d   4b   10   34
#>  [7,]   0a   6b   6e   d2
#>  [8,]   91   88   ed   38
#>  [9,]   1e   3a   cf   f3
#> [10,]   61   50   0f   f2
#> [11,]   39   b0   57   89
#> [12,]   74   f1   0e   21
#> [13,]   08   ef   3d   72
#> [14,]   ab   99   3b   fd
#> [15,]   41   11   a2   53
#> [16,]   5e   5c   45   2b
#> [17,]   c3   6f   87   b5
#> [18,]   86   68   5d   92
#> [19,]   8d   1c   4b   2c
#> [20,]   6d   0f   86   8f
#> [21,]   f4   e8   44   25
#> [22,]   02   97   9a   9f
#> [23,]   8a   17   7d   d8
#> [24,]   b1   40   18   ae
#> [25,]   56   da   f8   c6
#> [26,]   49   c7   d0   89
#> [27,]   ee   c6   ce   79
#> [28,]   d9   2e   93   ef
#> [29,]   dd   27   a8   b0
#> [30,]   95   b7   63   42
#> [31,]   81   b7   dd   ec
#> [32,]   e2   5d   41   9f

Note here how the first 32 bytes (the first column) are the same as the initial state of the generator r, the second set are equal to the state after jumping once. The third is 2 * 2^128 steps ahead and the fourth is 3 * 2^128 steps ahead.

We also have coefficients for a “long jump”, which is equivalent to 2^192 steps of the generator (generator period1). The point of the original jump is that individual draws of random numbers will never overlap; the point of the long jumps is that each long jump is so far that we’ll never reach the next one by normal jumping (there are 2^64 ~= 10^19 jumps within a single long jump).

Distributed seeding

The basic idea using the above primitives is that we could take a set of “long jumps” to create a set of initial seeds, pass those to each node that will do calculation, then on each node take a set of jumps to create a locally parallel random number generator.

The function dust::dust_rng_distributed_state creates a list of random number states that are suitable for use on a series of nodes:

dust::dust_rng_distributed_state(seed = 1, n_streams = 1, n_nodes = 2L)
#> [[1]]
#>  [1] c1 5c 02 89 ec 2d 0a 91 1e 61 39 74 08 ab 41 5e c3 86 8d 6d f4 02 8a b1 56
#> [26] 49 ee d9 dd 95 81 e2
#>
#> [[2]]
#>  [1] 77 69 c2 44 65 7b dc 93 49 c7 93 ef dd 06 fc 47 cf af f3 d0 80 97 01 68 45
#> [26] a2 90 f4 38 dd 4d 0a

Each element of this list can be passed through to to dust::dust_rng_pointer$new or to create dust::dust_generator objects. It will depend on your application how you do this of course.

We also include the convenience function dust::dust_rng_distributed_pointer which returns a list of pointer objects

dust::dust_rng_distributed_pointer(seed = 1, n_streams = 1, n_nodes = 2L)
#> [[1]]
#> <dust_rng_pointer>
#>   Public:
#>     algorithm: xoshiro256plus
#>     initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#>     is_current: function ()
#>     n_streams: 1
#>     state: function ()
#>     sync: function ()
#>   Private:
#>     is_current_: TRUE
#>     ptr_: externalptr
#>     state_: c1 5c 02 89 ec 2d 0a 91 1e 61 39 74 08 ab 41 5e c3 86 8d ...
#>
#> [[2]]
#> <dust_rng_pointer>
#>   Public:
#>     algorithm: xoshiro256plus
#>     initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#>     is_current: function ()
#>     n_streams: 1
#>     state: function ()
#>     sync: function ()
#>   Private:
#>     is_current_: TRUE
#>     ptr_: externalptr
#>     state_: 77 69 c2 44 65 7b dc 93 49 c7 93 ef dd 06 fc 47 cf af f3 ...

We will illustrate the idea first with the pi estimation algorithm from vignette("rng_package") and to start we will ignore the within-host parallelism (imagine we’re trying to distribute the calculation over a set of single core jobs). Because we need our code available on multiple nodes we can’t use cpp11::source_cpp and will have to create a package, which will be called piparallel as before.

As a reminder, to use our pi estimation code on a single node we first create a dust_rng_pointer object

rng <- dust:::dust_rng_pointer$new()

We can call the function like so:

piparallel::pi_dust_parallel(1e6, rng, 1)
#> [1] 3.142492

To do this in parallel, we use the above functions to create a list of pointers:

ptrs <- dust::dust_rng_distributed_pointer(seed = 1, n_streams = 1,
                                           n_nodes = 4L)

Next, we need a parallel cluster; we’ll use the parallel package for this and create a cluster of two nodes (this could be increased of course). This might be done via a cluster scheduler or the more sophisticated tools in the future package.

cl <- parallel::makeCluster(2, "PSOCK")

We’ve installed our package in lib so can make sure that’s available on each node:

parallel::clusterCall(cl, loadNamespace, "piparallel", lib)
#> [[1]]
#> <environment: namespace:piparallel>
#>
#> [[2]]
#> <environment: namespace:piparallel>

Then we can call our pi estimation function. We take as arguments the number of iterations, a pointer to random state and the number of threads to use.

ans <- parallel::clusterApply(cl, ptrs, function(r)
                              piparallel::pi_dust_parallel(1e6, r, 1))
ans
#> [[1]]
#> [1] 3.144028
#>
#> [[2]]
#> [1] 3.141628
#>
#> [[3]]
#> [1] 3.141232
#>
#> [[4]]
#> [1] 3.141676

We now have four estimates of pi which are independent and can be safely averaged:

mean(unlist(ans))
#> [1] 3.142141

These are the same numbers we would have found had we run things in series locally:

ptrs_local <- dust::dust_rng_distributed_pointer(seed = 1, n_streams = 1,
                                                 n_nodes = 4L)
ans_local <- lapply(ptrs_local, function(r)
                    piparallel::pi_dust_parallel(1e6, r, 1))
ans_local
#> [[1]]
#> [1] 3.144028
#>
#> [[2]]
#> [1] 3.141628
#>
#> [[3]]
#> [1] 3.141232
#>
#> [[4]]
#> [1] 3.141676

If we’d run the clusterApply over a different number of nodes, the calculation would also be unchanged.

The same approach works where we want each node to run in parallel. So we might want to distribute calculations over 4 nodes which each have 8 cores say. Then we’d configure our pointers to have more streams (here using 32 streams as that might represent our upper bound of per-node compute capacity)

ptrs <- dust::dust_rng_distributed_pointer(seed = 1, n_streams = 32,
                                           n_nodes = 4L)

The state vector for each pointer is now considerably longer:

length(ptrs[[1]]$state())
#> [1] 1024
ans <- lapply(ptrs, function(r)
              piparallel::pi_dust_parallel(1e6, r, 8))
ans
#> [[1]]
#> [1] 3.142302
#>
#> [[2]]
#> [1] 3.141256
#>
#> [[3]]
#> [1] 3.141828
#>
#> [[4]]
#> [1] 3.140525

As before, this gives us four answers (two per node) but each node ran 2 * 32 * 1e6 random draws, spread over 8 threads in two serial jobs.

With this set-up we can change the number of nodes and number of threads without affecting the calculation, but spread the work out over the compute that we have available to us.

Continuing the streams

The above set-up assumes that we want to establish our streams, do some work with them, then throw them away. That is, we never want to run calculations on the same nodes again. To sensibly we have several choices:

Take another set of long jumps to jump over all partial series used and continue from there. Possibly the simplest solution; if we have set up initial states for n nodes and we have our initial seed we can simply take n long jumps to move away from our initial sequence, then configure seeds using the next n states. This can be repeated without practical limit. It has the advantage that no reverse communication needs to happen about the random number seed - that is, the worker nodes never need to tell our main node where they got to in the sequence.

Report back the final state reached by each node. A more involved solution, but possibly more satisfying, involves each node sending back its final state at the end of sampling (along with whatever calculations were being performed). We then pass this state back to the nodes when continuing so that all calculations are taken from an unbroken set of streams.

pi_estimate_continue <- function(n, ptr, n_threads) {
  value <- piparallel::pi_dust_parallel(n, ptr, n_threads)
  ptr$sync()
  list(value = value,
       ptr = ptr)
}
parallel::clusterExport(cl, "pi_estimate_continue")
ptrs <- dust::dust_rng_distributed_pointer(seed = 1, n_streams = 1,
                                           n_nodes = 4L)
ans <- parallel::clusterApply(cl, ptrs, function(r)
                              pi_estimate_continue(1e6, r, 1))
ans
#> [[1]]
#> [[1]]$value
#> [1] 3.144028
#>
#> [[1]]$ptr
#> <dust_rng_pointer>
#>   Public:
#>     algorithm: xoshiro256plus
#>     initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#>     is_current: function ()
#>     n_streams: 1
#>     state: function ()
#>     sync: function ()
#>   Private:
#>     is_current_: TRUE
#>     ptr_: externalptr
#>     state_: 3c 9a e0 2e ac 65 52 0d b4 be 20 c8 cd c5 9e 27 82 14 7e ...
#>
#>
#> [[2]]
#> [[2]]$value
#> [1] 3.141628
#>
#> [[2]]$ptr
#> <dust_rng_pointer>
#>   Public:
#>     algorithm: xoshiro256plus
#>     initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#>     is_current: function ()
#>     n_streams: 1
#>     state: function ()
#>     sync: function ()
#>   Private:
#>     is_current_: TRUE
#>     ptr_: externalptr
#>     state_: 2e 3d 0e 9c fb a7 d3 08 f3 32 89 25 c1 38 b8 aa 95 0b 65 ...
#>
#>
#> [[3]]
#> [[3]]$value
#> [1] 3.141232
#>
#> [[3]]$ptr
#> <dust_rng_pointer>
#>   Public:
#>     algorithm: xoshiro256plus
#>     initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#>     is_current: function ()
#>     n_streams: 1
#>     state: function ()
#>     sync: function ()
#>   Private:
#>     is_current_: TRUE
#>     ptr_: externalptr
#>     state_: 7f b4 7d 26 a0 13 11 24 20 95 93 02 4b 3d 16 9d ba cf a2 ...
#>
#>
#> [[4]]
#> [[4]]$value
#> [1] 3.141676
#>
#> [[4]]$ptr
#> <dust_rng_pointer>
#>   Public:
#>     algorithm: xoshiro256plus
#>     initialize: function (seed = NULL, n_streams = 1L, long_jump = 0L, algorithm = "xoshiro256plus")
#>     is_current: function ()
#>     n_streams: 1
#>     state: function ()
#>     sync: function ()
#>   Private:
#>     is_current_: TRUE
#>     ptr_: externalptr
#>     state_: a0 46 22 8e b9 e5 e8 fe 37 99 06 e4 18 2d 13 f3 25 33 fb ...

We can again assemble our answer, which agrees with above

mean(vapply(ans, "[[", numeric(1), "value"))
#> [1] 3.142141

The state of these pointers now also agrees with above;

ptrs_local[[1]]$state()
#>  [1] 3c 9a e0 2e ac 65 52 0d b4 be 20 c8 cd c5 9e 27 82 14 7e 79 c2 8f fb 47 4c
#> [26] 00 da 0f 97 50 f7 f6
ans[[1]]$ptr$state()
#>  [1] 3c 9a e0 2e ac 65 52 0d b4 be 20 c8 cd c5 9e 27 82 14 7e 79 c2 8f fb 47 4c
#> [26] 00 da 0f 97 50 f7 f6

This approach could be useful where the calculation is to be continued (for example iterating until some convergence criteria is met). The additional work of synchronising and returning the pointer adds complexity though.

Considerations

Ideally the calculations will not depend on number of nodes used, so you should take the same approach as described in vignette("rng_package") and try and identify the “parallelisable” component (which might be larger than the number of nodes) and parallelise based on that.

For example, suppose we want to run a multi-chain MCMC simulation. Parallelising across chains is an obvious between-node target. We could then send n chains over m nodes (m <= n) and we’d want to arrange our seeds so that we long-jump for each chain not over each node as that way no matter how many nodes we had available we’d get the same results.

Use cases

Here we show a more complete, and less contrived, use case with dust’s “volatility” model. This example picks up from vignette("data") and the reader is directed there for a fuller explanation.

volatility <- dust::dust_example("volatility")

The model was written to fit to a time series:

The model can run in parallel on a single node to produce a likelihood estimate given parameters, we’ll write a small function to do this:

run_filter <- function(pars, mod) {
  if (any(pars < 0)) {
    return(-Inf)
  }
  pars <- list(alpha = pars[[1]], sigma = pars[[2]])
  mod$update_state(pars = pars, time = 0)
  mod$filter()$log_likelihood
}

We assume that our parameter vector here is a length-2 numeric vector with values alpha and sigma and ensure that these are positive by returning -Inf if invalid values are given.

We can create an instance of the model, set the data, and run the filter (for details see vignette("data"))

n_particles <- 128
n_threads <- 4
mod <- volatility$new(list(), 0, n_particles, n_threads = n_threads)
mod$set_data(dust::dust_data(data))
#> NULL
run_filter(c(0.91, 1), mod)
#> [1] -179.0706

We could also write a very simple MCMC using the Metropolis-Hastings algorithm:

mcmc <- function(mod, p, n_times, proposal_sd = 0.02) {
  ll <- run_filter(p, mod)

  ret <- matrix(NA_real_, n_times + 1, length(p) + 1)
  ret[1, ] <- c(p, ll)

  for (i in seq_len(n_times)) {
    p_new <- rnorm(length(p), p, proposal_sd)
    ll_new <- run_filter(p_new, mod)
    if (ll_new > ll || runif(1) < exp(ll_new - ll)) {
      ll <- ll_new
      p <- p_new
    }
    ret[i + 1, ] <- c(p, ll)
  }
  ret
}

We can run this for a number of steps and collect up sampled parameters and their likelihoods

ans <- mcmc(mod, c(0.91, 1), 20)

(We are cutting a lot of corners with the inference here; we have not specified any priors and are assuming that the distribution can be integrated over the parameters, and our proposal mechanism is extremely simple supporting only orthogonal proposals of the two parameters which are certainly correlated. However, the basic features are shared with any more sophisticated approach.)

Our aim here is now to carry this out in parallel, where we run a chain per node. To do this we need to do the same basic steps as above. If we want to run 8 chains we’ll need 8 seeds, even if we run on fewer nodes:

seed <- dust::dust_rng_distributed_state(n_nodes = 8L, algorithm = volatility)

It will also be useful to write a little wrapper function that repeats the setup from above, then runs the MCMC:

run_mcmc <- function(seed, data, ...) {
  volatility <- dust::dust_example("volatility")
  mod <- volatility$new(list(), 0, 128, seed = seed, n_threads = 2)
  mod$set_data(dust::dust_data(data))
  mcmc(mod, ...)
}
parallel::clusterExport(cl, c("mcmc", "run_mcmc", "run_filter"))
ans <- parallel::clusterApply(cl, seed, run_mcmc, data, c(0.91, 1), 20)

This produces a list of samples from each of the 8 chains, run over 2 nodes, each of which ran using 2 threads. We can scale any of this parallelism (increasing threads until we hit the total number of particles, increasing nodes until we hit the number of chains) but the results will be deterministic (except for the call to runif that we use here for the acceptance test within the MCMC).

Note that in creating the seed above we left n_streams as the default (1) because the dust model constructor will take care of expanding out per-particle seeds via a series of jumps.

You should be careful with these approaches to not exceed the compute available to you, especially when using a multi-user system such as an HPC.

Summary

  • Simplest: create a set of suitable rng seeds on the controlling node and send them to the worker nodes, don’t try and continue calculations afterwards.
  • Create a set of pointer objects which could be used on the controlling process first, then sent to the worker nodes
  • Hardest: create a set of pointer objects and send them to the worker nodes, then at the end of the calculation synchronise the state and return to the controlling node

[^1] It’s hard to get a sense of these numbers but 2^128 is about 3 x 10^38 draws. If we draw a billion numbers (10^9) a second (1ns per draw is the rate on a reasonable CPU) we can draw 3 x 10^16 numbers a year, so at this rate it would take this would take 10^22 years to reach the next stream. In contrast, the universe is 14 billion years old (1.4 x 10^10 years).


  1. 3/4↩︎