This document assumes that that you have a Redis server running. If not, see the bottom of the document for options you will have for installing this on your own system. You can test if your Redis server is behaving as expected by running
If you get an error like “Connection refused” then check your installation.
The package is designed to be easy to get started with, and has features that you might like to use later. If you run the “Hello world” section you probably have 90% of what you need - the result of the document will show features that will help you bend that around your specific needs.
Without any great explanation, here is the basic approach to using rrq to run a task on another R process, asynchronously. First, we create a “controller” object which you can use for queuing tasks.
This controller uses an “identifier” (here, id
is
rrq:babc4f67
) which can be anything you want but acts like
a folder within the Redis server, distinguishing your queue from any
others hosted on the same server.
We will set this a default controller to use, which means we can
avoid passing in controller = obj
to all the calls
below:
Submit a task to the queue with rrq_task_create_expr()
,
returning a key for hat task
We’ll also need some worker processes to carry out our tasks. Here, we’ll spawn two for now (see the section below on alternatives to this)
+Wait for the task to complete:
then retrieve the result
Things to note here:
For years, the parallel
package has provided users with
the ability to run tasks in parallel with very little setup. Given a
list of data x
and some function f
, one can
change from the serial code
to run in parallel given a cluster object cl
(the even simpler parallel::mclapply(x, fun)
can be used
on platforms other than Windows with reasonable success). Nice as this
is it suffers some drawbacks, most of which follow from the simple
blocking interface:
parLapply
has been called, and is restricted to a
single node without considerable effort. One cannot add workers to the
cluster while it runs, or remove unneeded ones as tasks finish.parallel::parLapplyLB
) is quite slowAs such it is hard to build interfaces like queues or work through dependency graphs (though see heroic work in future and targets). Attempts at doing this run into issues of where do you store the data and the queue in such a way that you can safely have multiple worker processes interacting with the queue without corrupting the database or hitting race conditions. Approaches like liteq may not work on network file systems, and therefore become limited to a single node.
At the other end of the scale, HPC systems with their schedulers can avoid all these issues, but with byzantine interfaces and slow per-task submission.
Notable features of rrq
which motivate its development
within this landscape:
lapply
Running tasks is a little different to many R-parallel backends,
because we do not directly allocate tasks to our workers, but simply
place it on first-in-first-out task queue. A pool of workers will then
poll for work from this queue. See vignette("design")
for
more on this.
Consider enqueing this expression
This has created a task that will sleep for 2 seconds then return a random number
Initially the task has status RUNNING
(it will be
PENDING
very briefly):
Then after a couple of seconds it will complete (we pad this out here so that it will complete even on slow systems)
The basic task lifecycle is this:
PENDING
RUNNING
COMPLETE
or
ERROR
In addition there are rarer ways a task can end
(CANCELLED
, DIED
, TIMEOUT
) or
fail to start due to dependencies between tasks (DEFERRED
or IMPOSSIBLE
; see below).
A task in any terminal state (except IMPOSSIBLE
, so
COMPLETE
, ERROR
, CANCELLED
,
DIED
or TIMEOUT
) can be retried, at which
point the status is MOVED
and the task will “point”
somewhere else (that task will move through the usual
PENDING -> RUNNING -> (terminal state)
flow). See
vignette("fault-tolerance")
for details.
Above, we slept for a few seconds in order for the task to become
finished. However, this is an extremely common operation, so
rrq
provides a function rrq_task_wait()
which
will wait until a task finishes, then returns a logical indicating if
the task succeeded or not, after which you can fetch the result with
rrq_task_result()
.
t <- rrq_task_create_expr({
Sys.sleep(2)
runif(1)
})
rrq_task_wait(t)
#> [1] TRUE
rrq_task_result(t)
#> [1] 0.6660437
The polling interval here is 1 second by default, but if the task completes within that period it will still be returned as soon as it is complete (the interval is just the time between progress bar updates and the period where an interrupt would be caught to cancel the wait).
It is rare that we want our workers to run in completely empty R environments (no extra loaded packages, no custom functions available). Quite often you will want to run something to configure the workers before they accept tasks.
In order to do this, first define a function that will accept one argument which will be the environment that the worker will use, and then set that environment up.
For the most common case, where you have script files that contain
function definitions and you have a set of packages to load, rrq has a
helper function rrq_envir()
.
So, for example, suppose we want to source a file “myfuns.R” which contains some code
We might write:
The next step is to register this function for your queue:
By default, this will notify all running workers to update their environment. Note that if your function errors in any way, your workers will exit!
rrq_worker_log_tail(n = 4)
#> worker_id child time command message
#> 1 pokeable_auklet_1 NA 1723025925 MESSAGE REFRESH
#> 2 pokeable_auklet_2 NA 1723025925 MESSAGE REFRESH
#> 3 pokeable_auklet_1 NA 1723025925 ENVIR new
#> 4 pokeable_auklet_2 NA 1723025925 ENVIR new
#> 5 pokeable_auklet_1 NA 1723025925 ENVIR create
#> 6 pokeable_auklet_2 NA 1723025925 ENVIR create
#> 7 pokeable_auklet_1 NA 1723025925 RESPONSE REFRESH
#> 8 pokeable_auklet_2 NA 1723025925 RESPONSE REFRESH
Now our workers have picked up our functions we can start using them:
If you need more control you can write your own function. We could
have written create
as
This approach would also allow you do do something like read an rds or Rdata file containing a large object that you want every worker to have a copy of.
The rrq
package does not aspire to be a fully fledged
scheduler, but sometimes a little more control than first-in-first-out
is required. There are a few options available that allow the user to
control how tasks are run when needed. These involve:
We support a simple system for allowing tasks to depend on other tasks. An example of this might be where you need to download a file, then run a series of analyses on it. Or where you want to run an analysis over a set of parameters, and then aggregate once they’re all done. How the output of one task feeds into the others is up to you, but practically this will require one of the following options:
When queueing a task, you can provide a vector of task identifiers as
the depends_on
argument. These identifiers must all be
known to rrq
and the task will not be started until all
these prerequisites are completed. The task lifecycle will look
different to the above; rather than starting as PENDING
the
task begins as DEFERRED
.
Once all prerequisites are complete, a task becomes possible and it
moves from DEFERRED
to PENDING
. It will be
placed at the front of the queue.
If a prerequisite task fails for any reason (an error, is cancelled,
or its worker dies) then the task will become
IMPOSSIBLE
.
For example, suppose that we have code:
create <- function(n) {
saveRDS(runif(n), "numbers.rds")
}
use <- function(i) {
d <- readRDS("numbers.rds")
d[[i]]
}
Here we have some function create
that we want to run
first, doing some setup, then another function use
that we
want to run after which will read the result of running
create
and do some analysis on it.
Create an rrq_controller
object and tell workers to read
the deps.R
file which contains these function
definitions
obj <- rrq_controller(paste0("rrq:", ids::random_id(bytes = 4)))
rrq_default_controller_set(obj)
rrq_worker_envir_set(rrq_envir(sources = "deps.R"))
source("deps.R")
We can then enqueue our first task:
Then use this id
The status of the first task will be PENDING
, per
usual:
however, the second task will be DEFERRED
because it is
not yet in the queue:
rrq_task_status(id_use)
#> [1] "DEFERRED"
rrq_queue_list()
#> [1] "e8f2f868119f74eb2fced249e886aed4"
Once the first task is processed by a worker, the status changes:
rrq_task_status(id)
#> [1] "COMPLETE"
rrq_task_status(id_use)
#> [1] "PENDING"
rrq_queue_list()
#> [1] "4c44b980e6dd43b9cb0a3130ff781fd6"
At this point the second task will proceed through the queue as usual.
Points to note here:
Sometimes it is useful to have different workers listen on different queues. For example, you may have workers on different machines with different capabilities (e.g., a machine with a GPU or high memory). You may have tasks that are expected to take quite a long time but want some workers to monitor a fast queue with short lived tasks.
Every worker listens to the default
queue, but when
starting a worker, you can add additional queues and control the
priority order of these queues for that worker. When submitting tasks
you then specify the queue that the task sits in.
The easiest way to configure this is to save a worker configuration:
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
rrq_worker_config_save(
"short",
rrq_worker_config(queue = "short"))
rrq_worker_config_save(
"all",
rrq_worker_config(queue = c("short", "long")))
rrq_worker_config_list()
#> [1] "short" "all" "localhost"
Above, we create two configurations: “short” which just listens on
the queue short
, and all
which listens both on
the short and long task queues (note that both these workers will also
listen on the default queue).
w_short <- rrq_worker_spawn(name_config = "short")
#> ℹ Spawning 1 worker with prefix 'astronomical_izuthrush'
w_all <- rrq_worker_spawn(name_config = "all")
#> ℹ Spawning 1 worker with prefix 'damaging_platypus'
We can then submit a long task to the worker:
id_long1 <- rrq_task_create_expr(Sys.sleep(3600), queue = "long")
id_long2 <- rrq_task_create_expr(Sys.sleep(3600), queue = "long")
After the workers have had the ability to pick up work, our “short” worker is still available:
So we can submit tasks to this short queue and have them processed
Note that there is no validation to check that any worker is listening on any queue when you submit a task. Indeed there can’t be as new workers can be added at any time (so at the point of submission perhaps there were no workers).
Running a task in a separate process offers some additional features at a cost of a little more overhead per task.
The cost is that we have to launch an additional process for every
task run. We use callr
for this to
smooth over a number of rough edges, but this does impose a minimum
overhead of about 0.1s per task, plus the cost of loading any packages
that your task might need (if you use packages that make heavy use of
things like S4 classes this can easily extend to a few seconds).
The additional features that it provides are:
rrq_task_log()
The sorts of tasks that benefit from this sort of approach are typically long-running (expected running times in the 10s of seconds or more) so that the overhead is low, but also the features of cancellation and timeouts become more useful. We have also seen this used usefully where the task may leak memory, or cache results aggressively - over time this would cause the worker process to consume more memory until the worker process was killed by the operating system.
To use a separate process, add separate_process = TRUE
to calls to rrq_task_create_expr()
. This will then enable
the argument timeout
to have an effect, as well as
rrq_task_cancel()
.
The data for each task, and the task result itself, is saved in Redis. This is alongside the typically much smaller metadata required to run rrq. Because Redis is an in memory database, this means that some things will not be a great idea; for example sending off 1000 tasks that will each write back 100 MB of simulation output would try and write 100 GB of data into the Redis database which may cause issues for your server!
To allow for this workflow, rrq
supports configuring its
object store (rrq::object_store
) so that objects above a
certain size are written out elsewhere. Currently, the only “elsewhere”
supported is to disk with the assumption that the controller and all
workers share a filesystem. The approach used is safe for multiple
concurrent processes, including over network mounted filesystems.
To configure this you must use rrq_configure()
before attaching either a controller or a worker to the queue.
The configuration interface here will change in future, but we will
maintain backward compatibility with the current options.
To configure storage so that every object that is greater than 1KB is saved to disk, you could write:
id <- paste0("rrq:", ids::random_id(bytes = 4))
path <- tempfile()
rrq_configure(id, store_max_size = 1000, offload_path = path)
then we create the queue object as normal (and spawn a worker so that we can use it)
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
w <- rrq_worker_spawn(1)
#> ℹ Spawning 1 worker with prefix 'antipacifistic_germanshorthairedpointer'
It’s not hard at all to get to 1KB of data, we can do that by simulating a big pile of random numbers:
Once the task has finished, data will be stored on disk below the path given above:
This keeps the larger objects out of the database.
This vignette uses the very basic rrq_worker_spawn()
method to create workers on your local machine. This is intended
primarily for development only, though it may be useful in some
situations. There are other options available, depending on how you want
to use rrq
.
The simplest way of getting started with rrq
is to use
rrq_worker_spawn()
, as above. This approach has several
nice features; it uses callr
, so no extra work is required
to make the worker R session behave like the controller session (it will
find your environment variables, library, and working directory), and it
behaves the same way on all platforms (compare below). However, the
workers will disappear when the controlling session completes (this is
either a good or a bad thing) and you will be limited to a single
node.
There are two issues here; one is the technical details of launching your rrq workers on the cluster, and the other is the details around whether your HPC admins would like you to (and the security implications of doing so).
If you are using rrq with an HPC system, then you will want to schedule workers onto the system. The details here will change
The basic approach is to write out a launcher script somewhere:
This can be called from the command line:
$ ./rrq_worker --help
Usage:
rrq_worker [options] <id>
Options:
--config=NAME Name of a worker configuration [default: localhost]
--name=NAME Name of the worker (optional)
This is a bash script that can then be called from whatever cluster job scheduler you use. The important things to pass through are:
id
the only positional argument, which is the queue
id--config=NAME
allows controlling of the named worker
config (set via rrq_worker_config_save()
, and allowing
changing of timeout, verbosity and queues)In addition, you may need to change the configuration type. If you
need to control redis access you should set the REDIS_URL
environment variable to point at your Redis server.
We provide a docker image that you can use (mrcide/rrq
),
though typically you would want to extend this image to include your own
packages. Alternatively create your own docker image (see the
main dockerfile but replace COPY . /src
with an
installation of rrq) and the image
that sets the entrypoint to call rrq_worker
.
We use rrq to orchestrate workers in web applications where a number of workers carry out long running calculations for a HTTP API written using plumber and porcelain.
If you have submitted workers via a task scheduler, you might want to
block and wait for them to become available. You can do this using the
rrq_worker_wait()
function.
We first create a vector of names for the new workers, and then tell
rrq
that we’re going to produce these workers.
How you then start the workers is up to you; you might start them at the command line with
rrq_worker --worker-id cluster_1 rrq:46f706a3
rrq_worker --worker-id cluster_2 rrq:46f706a3
(into separate terminals). Or you might queue these jobs with a cluster scheduler such as Slurm or PBS (with appropriate care over the working directory). But you can then immediately, in your R console write:
and your session will block and wait for the workers to appear, erroring if they do not appear in time.
If you use many workers, particularly on different machines, you may not notice if some disappear. Possible causes of this include:
By default, if this happens when your worker is running a task, that
task status will forever be stuck in RUNNING
.
rrq
provides a simple heartbeat process, if requested,
to detect when a worker has disappeared. To do this, we run a second
process on each worker that periodically writes to the Redis database on
a key that will expire in a time slightly longer than that period, in
effect making a dead man’s
switch - see rrq::rrq_heartbeat
for details.
To enable the heartbeat, save a worker configuration with the
heartbeat_period
set to some number of seconds. Below we
use 2 seconds so that this example runs reasonably quickly, but in
practice something like 60 might be slightly less load on your Redis
server.
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
res <- rrq_worker_config_save(
"localhost",
rrq_worker_config(heartbeat_period = 2))
Then, launch a worker
Our worker will print information indicating that the heartbeat is
enabled (use rrq_worker_process_log()
)
#> [2024-08-07 11:18:50.341996] HEARTBEAT rrq:b5c90261:worker:taxidermic_americangoldfinch_1:heartbeat
#> [2024-08-07 11:18:50.581563] HEARTBEAT OK
#> [2024-08-07 11:18:50.596252] ALIVE
#> [2024-08-07 11:18:50.596533] ENVIR new
#> [2024-08-07 11:18:50.59684] QUEUE default
#> __
#> ______________ _/ /
#> ______ / ___/ ___/ __ `/ /_____
#> /_____/ / / / / / /_/ /_/_____/
#> ______ /_/ /_/ \__, (_) ______
#> /_____/ /_/ /_____/
#> worker: taxidermic_americangoldfinch_1
#> config: localhost
#> rrq_version: 0.7.17
#> platform: x86_64-pc-linux-gnu
#> running: Ubuntu 20.04.6 LTS
#> hostname: wpia-dide300
#> username: rfitzjoh
#> queue: rrq:b5c90261:queue:default
#> wd: /home/rfitzjoh/Documents/src/rrq/vignettes_src
#> pid: 2720190
#> redis_host: 127.0.0.1
#> redis_port: 6379
#> heartbeat_key: rrq:b5c90261:worker:taxidermic_americangoldfinch_1:heartbeat
We also have a heartbeat key here that we can inspect:
info <- rrq_worker_info()[[1]]
obj$con$EXISTS(info$heartbeat_key)
#> [1] 1
obj$con$PTTL(info$heartbeat_key) # in milliseconds
#> [1] 5829
We queue some slow job onto the worker:
Then we kill the worker:
Of course, immediately our key still exists:
but eventually it will expire:
So far as rrq
is concerned, at this point your task is
still running
Handling this situation is still completely manual. You can detect lost workers jobs with:
rrq_worker_detect_exited()
#> Lost 1 worker:
#> - taxidermic_americangoldfinch_1
#> Orphaning 1 task:
#> - f88a6a235f8dbd9627e3e6862daae49b
this will also “orphan” the task
Any tasks that were dependent on this task will now be marked as
IMPOSSIBLE
.
In a future version we will support automatic re-queuing of jobs assigned to disappeared workers.
There are several options to get started with Redis, the best one will likely depend on your platform and needs.
(Linux, macOS with docker desktop, Windows with docker desktop)
This is how we develop rrq because it’s easy to destroy and recreate the redis instance. Start the docker redis container like:
docker run --name redis --rm -d -p 127.0.0.1:6379:6379 redis
This will listen on port 6379 which is the Redis default. You can
stop the container (deleting all data) with
docker stop redis
On Linux this is fairly straightforward, either by downloading and building the source
code or by installing via apt
or snap
On macOS the source will compile, or you can install a redis server via homebrew
On Windows you can install redis via WSL. There have also been various ports.
If you have redis running on a different machine (this will be the
case if you’re using redis to distribute tasks over a number of
different machines) you will need to tell rrq
and
redux
where to find it. The simplest way is to set the
environment variable REDIS_HOST
to the name of the machine
if it is running with default ports, or set REDUX_URL
if
you need more control. Alternatively, when connecting to the server
above, you can manually construct your redux::hiredis
object and pass in any configuration option you need; see the
documentation for redux::redis_config
for details.