Workers

Running heaps of jobs without annoying your colleagues

If you have thousands and thousands of jobs to submit at once you may not want to flood the cluster with them all at once. Each job submission is relatively slow (the HPC tools that the web interface has to use are relatively slow). The actual queue that the cluster uses doesn’t seem to like processing tens of thousands of job, and can slow down. And if you take up the whole cluster someone may come and knock on your office and complain at you. At the same time, batching your jobs up into little bits and manually sending them off is a pain and work better done by a computer.

An alternative is to submit a set of “workers” to the cluster, and then submit jobs to them. This is done with the rrq package, along with a redis server running on the cluster.

Overview

Getting started

To get started you will need to install the rrq package locally.

drat::add("mrc-ide")
install.packages("rrq")

Then construct the context as before

root <- "contexts"
ctx <- context::context_save(root, sources = "mysources.R")
#> [ open:db   ]  rds

There are two ways we can proceed from here; the first - “workers” - is very similar to the non-worker workflow and is described first. The second - “rrq” - is a bit more involved and is described second.

Workers

Then configure and create the queue; the use_workers argument is important here as it:

  • ensures that the rrq package is available on the cluster, where your workers will run
  • changes the behaviour of the $enqueue method so that jobs are not sent to the HPC scheduler but to the rrq scheduler
  • enables the $submit_workers method which you will use to create workers on the cluster

However, everything else will appear the same.

config <- didehpc::didehpc_config(use_workers = TRUE)
obj <- didehpc::queue_didehpc(ctx, config = config)
#> Loading context 02a9261fe4e9e6554d76936ecb35cef0
#> [ context   ]  02a9261fe4e9e6554d76936ecb35cef0
#> [ library   ]
#> [ namespace ]
#> [ source    ]  mysources.R
#> Running installation script on cluster
#>   ,:\      /:.
#>  //  \_()_/  \\
#> ||   |    |   ||  CONAN THE LIBRARIAN
#> ||   |    |   ||  Library:   Q:\didehpc\20210817-145020\contexts\lib\windows\4.0
#> ||   |____|   ||  Bootstrap: T:\conan\bootstrap\4.0
#>  \\  / || \  //   Cache:     Q:\didehpc\20210817-145020\contexts\conan\cache/pkg
#>   `:/  ||  \;'    Policy:    lazy
#>        ||         Repos:
#>        ||           * https://mrc-ide.github.io/didehpc-pkgs
#>        XX           * https://cloud.r-project.org
#>        XX         Packages:
#>        XX           * rrq
#>        XX           * callr
#>        OO
#>        `'
#> i Loading metadata database
#> v Loading metadata database ... done
#> i Getting 8 pkgs (2.79 MB), 1 cached
#> v Got rrq 0.4.4 (source) (118.54 kB)
#> v Got prettyunits 1.1.1 (windows) (37.72 kB)
#> v Got progress 1.2.2 (windows) (85.86 kB)
#> v Got docopt 0.7.1 (windows) (245.69 kB)
#> v Got hms 1.1.0 (windows) (104.22 kB)
#> v Got redux 1.1.0 (windows) (287.97 kB)
#> v Got callr 3.7.0 (windows) (439.25 kB)
#> v Got processx 3.5.2 (windows) (1.25 MB)
#> v Got ps 1.6.0 (windows) (775.40 kB)
#> v Installed docopt 0.7.1  (719ms)
#> v Installed hms 1.1.0  (829ms)
#> v Installed progress 1.2.2  (907ms)
#> v Installed prettyunits 1.1.1  (1s)
#> v Installed callr 3.7.0  (1.1s)
#> v Installed redux 1.1.0  (1.3s)
#> i Building rrq 0.4.4
#> v Installed processx 3.5.2  (6.7s)
#> v Installed ps 1.6.0  (6.8s)
#> v Built rrq 0.4.4 (8s)
#> v Installed rrq 0.4.4  (438ms)
#> v Summary:   9 new   15 kept  in 27.8s
#> Done!

You can now submit

t <- obj$enqueue(random_walk(0, 10))

This job will stay pending forever as the HPC scheduler will never run it

t$status()
#> [1] "PENDING"
t$times()
#>                            task_id           submitted started finished
#> 1 cb4d883580667f393a2945a05688620a 2021-08-17 14:53:51    <NA>     <NA>
#>     waiting running idle
#> 1 0.2062078      NA   NA

You must submit actual workers in order to actually run things. This could have been done before submitting the tasks, though workers will time out after 10 minutes of inactivity, if you have very many jobs to save your workers might exit before the work starts!

The argument is the number of workers to submit. Each worker is equivalent to a job that your configuration would otherwise create (in terms of cores selected).

workers <- obj$submit_workers(2)
#> Submitting 2 workers with base name 'epicurean_xiaosaurus'
#> submitting (-) [===================================] 100% | giving up in 600 s
workers
#> [1] "epicurean_xiaosaurus_1" "epicurean_xiaosaurus_2"

All workers get names in the form <adjective>_<animal>_<integer> so that you can remember which workers you set off. They will turn off after 10 minutes of inactivity by default (you can tweak this with the worker_timeout argument to didehpc_config or by sending a TIMEOUT_SET message).

One advantage over the usual queuing approach here is that you will not wait for anyone else’s jobs to complete once you have reserved your workers.

t$wait(10)
#> (-) waiting for cb4d883...20a, giving up in 9.5 s
#>  [1]  0.92566336 -0.58321241  0.54753392  0.72564391  0.51248705 -0.22445805
#>  [7]  0.03966805 -0.25494044 -0.16625861 -1.97337247

We’re going to interact with the rrq object a bit

rrq <- obj$rrq_controller()
rrq
#> <rrq_controller>
#>   Public:
#>     bulk_wait: function (x, timeout = Inf, time_poll = 1, progress = NULL, delete = TRUE)
#>     con: redis_api, R6
#>     deferred_list: function ()
#>     destroy: function (delete = TRUE, worker_stop_type = "message", worker_stop_timeout = 0)
#>     enqueue: function (expr, envir = parent.frame(), queue = NULL, separate_process = FALSE,
#>     enqueue_: function (expr, envir = parent.frame(), queue = NULL, separate_process = FALSE,
#>     enqueue_bulk: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#>     enqueue_bulk_: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#>     envir: function (create, notify = TRUE)
#>     initialize: function (queue_id, con = redux::hiredis())
#>     keys: list
#>     lapply: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#>     lapply_: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL,
#>     message_get_response: function (message_id, worker_ids = NULL, named = TRUE, delete = FALSE,
#>     message_has_response: function (message_id, worker_ids = NULL, named = TRUE)
#>     message_response_ids: function (worker_id)
#>     message_send: function (command, args = NULL, worker_ids = NULL)
#>     message_send_and_wait: function (command, args = NULL, worker_ids = NULL, named = TRUE,
#>     queue_id: 02a9261fe4e9e6554d76936ecb35cef0
#>     queue_length: function (queue = NULL)
#>     queue_list: function (queue = NULL)
#>     queue_remove: function (task_ids, queue = NULL)
#>     task_cancel: function (task_id, wait = TRUE, delete = TRUE)
#>     task_data: function (task_id)
#>     task_delete: function (task_ids, check = TRUE)
#>     task_exists: function (task_ids = NULL)
#>     task_list: function ()
#>     task_overview: function (task_ids = NULL)
#>     task_position: function (task_ids, missing = 0L, queue = NULL)
#>     task_preceeding: function (task_id, queue = NULL)
#>     task_progress: function (task_id)
#>     task_result: function (task_id)
#>     task_status: function (task_ids = NULL)
#>     task_wait: function (task_id, timeout = Inf, time_poll = 1, progress = NULL)
#>     tasks_result: function (task_ids)
#>     tasks_wait: function (task_ids, timeout = Inf, time_poll = 1, progress = NULL)
#>     worker_config_list: function ()
#>     worker_config_read: function (name)
#>     worker_config_save: function (name, time_poll = NULL, timeout = NULL, queue = NULL,
#>     worker_delete_exited: function (worker_ids = NULL)
#>     worker_detect_exited: function ()
#>     worker_info: function (worker_ids = NULL)
#>     worker_len: function ()
#>     worker_list: function ()
#>     worker_list_exited: function ()
#>     worker_load: function (worker_ids = NULL)
#>     worker_log_tail: function (worker_ids = NULL, n = 1)
#>     worker_process_log: function (worker_id)
#>     worker_status: function (worker_ids = NULL)
#>     worker_stop: function (worker_ids = NULL, type = "message", timeout = 0, time_poll = 0.05,
#>     worker_task_id: function (worker_ids = NULL)
#>   Private:
#>     scripts: list
#>     store: object_store, R6

This is another R6 object, though this one at least has decent documentation - see the rrq::rrq_controller for details of each method

You can see what your workers have been up to with the workers_log_tail command:

rrq$worker_log_tail(n = Inf)
#>                worker_id       time       command
#> 1 epicurean_xiaosaurus_1 1629208433         ALIVE
#> 2 epicurean_xiaosaurus_1 1629208433    TASK_START
#> 3 epicurean_xiaosaurus_2 1629208434         ALIVE
#> 4 epicurean_xiaosaurus_1 1629208434 TASK_COMPLETE
#>                            message
#> 1
#> 2 977ba6cb22c03767d8bb2386b0c2b271
#> 3
#> 4 977ba6cb22c03767d8bb2386b0c2b271

The time column represents seconds - relative seconds should still be useful here.

As before, logging works on a per-task basis:

t$log()
#> [ open:db   ]  rds
#> [ context   ]  02a9261fe4e9e6554d76936ecb35cef0
#> [ library   ]
#> [ namespace ]
#> [ source    ]  mysources.R
#> [ root      ]  contexts
#> [ context   ]  02a9261fe4e9e6554d76936ecb35cef0
#> [ task      ]  cb4d883580667f393a2945a05688620a
#> [ expr      ]  random_walk(0, 10)
#> [ start     ]  2021-08-17 14:53:54.075
#> [ ok        ]
#> [ end       ]  2021-08-17 14:53:54.231

Find out how long your workers will persist for:

rrq$message_send_and_wait("TIMEOUT_GET", worker_ids = workers)
#> $epicurean_xiaosaurus_1
#>   timeout remaining
#>       600       600
#>
#> $epicurean_xiaosaurus_2
#>   timeout remaining
#>  600.0000  599.4219

Other than that, hopefully everything else continues as normal. We can submit a bunch of jobs and run them using $lapply:

sizes <- 3:8
grp <- obj$lapply(sizes, random_walk, x = 0)
#> Creating bundle: 'robust_krill'
#> [ bulk      ]  Creating 6 tasks
#> submitting 6 tasks

Task status:

grp$status()
#> e505c76477549a36f04625d810b494e1 8f2525389b6ba6ca19034567599a42be
#>                        "PENDING"                        "PENDING"
#> 579c132c388fb39f88992691a9ff16a1 bacdb4853335a3bc3ff9008d3bc09a93
#>                        "PENDING"                        "PENDING"
#> 5aa65c5b6b3b41b5afdf471a2e75886b 001a83fce62afb5eb9c49eda46d4c3cf
#>                        "PENDING"                        "PENDING"

Collect the results:

res <- grp$wait(5)
#> (-) [=======================>------------------------] 50% | giving up in 4 s
#> (\) [=======================>------------------------] 50% | giving up in 4 s
#> (|) [================================================] 100% | giving up in 3 s
res
#> [[1]]
#> [1] -0.7841728 -1.9470967 -2.1056223
#>
#> [[2]]
#> [1] -0.4309173 -1.6581243 -0.4737780  0.4492074
#>
#> [[3]]
#> [1] -1.4924993 -2.0190762 -0.6046004 -1.3086768 -1.4110236
#>
#> [[4]]
#> [1] -1.184490257 -0.196637540 -0.191229471 -0.004466775  0.483852828
#> [6]  1.066925267
#>
#> [[5]]
#> [1]  0.6912883 -0.6199649 -1.4499917 -2.5812726 -1.4212357 -1.4081750 -2.2605613
#>
#> [[6]]
#> [1] -0.54840267  0.05881601  0.71391542  1.40774062  1.85894605  3.56734897
#> [7]  4.74281970  5.10295235

While workers will turn off automatically, it’s polite to turn them off as soon as you’re done using obj$stop_workers()

Alternatively, after submitting a bunch of jobs you can run

rrq$message_send("TIMEOUT_SET", 0)

which will mean that the workers will stop immediately after not receiving a task (so after they finish processing all your jobs they’ll stop one by one). Practically this still takes one minute because that’s the polling timeout time (I may be able to improve this later).

obj$stop_workers()
Sys.sleep(1)
rrq$worker_log_tail(workers, n = Inf)
#>                 worker_id       time       command
#> 1  epicurean_xiaosaurus_1 1629208433         ALIVE
#> 2  epicurean_xiaosaurus_1 1629208433    TASK_START
#> 3  epicurean_xiaosaurus_2 1629208434         ALIVE
#> 4  epicurean_xiaosaurus_1 1629208434 TASK_COMPLETE
#> 5  epicurean_xiaosaurus_1 1629208435       MESSAGE
#> 6  epicurean_xiaosaurus_1 1629208435      RESPONSE
#> 7  epicurean_xiaosaurus_2 1629208435       MESSAGE
#> 8  epicurean_xiaosaurus_2 1629208435      RESPONSE
#> 9  epicurean_xiaosaurus_1 1629208435    TASK_START
#> 10 epicurean_xiaosaurus_2 1629208435    TASK_START
#> 11 epicurean_xiaosaurus_1 1629208436 TASK_COMPLETE
#> 12 epicurean_xiaosaurus_1 1629208436    TASK_START
#> 13 epicurean_xiaosaurus_2 1629208436 TASK_COMPLETE
#> 14 epicurean_xiaosaurus_2 1629208436    TASK_START
#> 15 epicurean_xiaosaurus_1 1629208436 TASK_COMPLETE
#> 16 epicurean_xiaosaurus_1 1629208436    TASK_START
#> 17 epicurean_xiaosaurus_2 1629208436 TASK_COMPLETE
#> 18 epicurean_xiaosaurus_2 1629208436    TASK_START
#> 19 epicurean_xiaosaurus_1 1629208437 TASK_COMPLETE
#> 20 epicurean_xiaosaurus_2 1629208437 TASK_COMPLETE
#> 21 epicurean_xiaosaurus_1 1629208437       MESSAGE
#> 22 epicurean_xiaosaurus_1 1629208437      RESPONSE
#> 23 epicurean_xiaosaurus_1 1629208437          STOP
#> 24 epicurean_xiaosaurus_2 1629208437       MESSAGE
#> 25 epicurean_xiaosaurus_2 1629208437      RESPONSE
#> 26 epicurean_xiaosaurus_2 1629208437          STOP
#>                             message
#> 1
#> 2  977ba6cb22c03767d8bb2386b0c2b271
#> 3
#> 4  977ba6cb22c03767d8bb2386b0c2b271
#> 5                       TIMEOUT_GET
#> 6                       TIMEOUT_GET
#> 7                       TIMEOUT_GET
#> 8                       TIMEOUT_GET
#> 9  96b4a731efe9bf2efb5e3e7e6aef781a
#> 10 1df4287014a01c65f807c39cc84e5482
#> 11 96b4a731efe9bf2efb5e3e7e6aef781a
#> 12 26c86df13bf2c438bd3d09db57a3627a
#> 13 1df4287014a01c65f807c39cc84e5482
#> 14 7cbb27b699ffc184723d884e896a1c30
#> 15 26c86df13bf2c438bd3d09db57a3627a
#> 16 e544f4fafc61746df4b5d6b80b95338d
#> 17 7cbb27b699ffc184723d884e896a1c30
#> 18 b03fa5fd3f1509e6e98e9707cce0d445
#> 19 e544f4fafc61746df4b5d6b80b95338d
#> 20 b03fa5fd3f1509e6e98e9707cce0d445
#> 21                             STOP
#> 22                             STOP
#> 23                         OK (BYE)
#> 24                             STOP
#> 25                             STOP
#> 26                         OK (BYE)
rrq$destroy()

rrq

In this model, we create a very lightweight queue which in turn creates very lightweight tasks. This avoids even more overhead than the approach above, though it can be more difficult to debug because less information is saved. Rather than round-tripping data through the disk, everything goes via the redis server.

The first part here looks very similar, except that we use use_rrq = TRUE rather than use_workers

config <- didehpc::didehpc_config(use_rrq = TRUE)
obj <- didehpc::queue_didehpc(ctx, config = config)
#> Loading context 02a9261fe4e9e6554d76936ecb35cef0
#> [ context   ]  02a9261fe4e9e6554d76936ecb35cef0
#> [ library   ]
#> [ namespace ]
#> [ source    ]  mysources.R

We still submit workers

workers <- obj$submit_workers(10)
#> Submitting 10 workers with base name 'vegetarian_alaskajingle'
#> submitting (-) [======>----------------------------] 20% | giving up in 599 s
#> submitting (\) [=========>-------------------------] 30% | giving up in 599 s
#> submitting (|) [=============>---------------------] 40% | giving up in 598 s
#> submitting (/) [=================>-----------------] 50% | giving up in 597 s
#> submitting (-) [====================>--------------] 60% | giving up in 597 s
#> submitting (\) [=======================>-----------] 70% | giving up in 596 s
#> submitting (|) [===========================>-------] 80% | giving up in 596 s
#> submitting (/) [===============================>---] 90% | giving up in 596 s
#> submitting (-) [===================================] 100% | giving up in 595 s
workers
#>  [1] "vegetarian_alaskajingle_1"  "vegetarian_alaskajingle_2"
#>  [3] "vegetarian_alaskajingle_3"  "vegetarian_alaskajingle_4"
#>  [5] "vegetarian_alaskajingle_5"  "vegetarian_alaskajingle_6"
#>  [7] "vegetarian_alaskajingle_7"  "vegetarian_alaskajingle_8"
#>  [9] "vegetarian_alaskajingle_9"  "vegetarian_alaskajingle_10"

To send tasks to these workers we directly use the rrq_controller object - we’ll not use the queue_didehpc object from this point.

rrq <- obj$rrq_controller()

This will look and act a lot like the main didehpc queue controller, but with a few differences. Tasks will come back as plain strings rather than user-friendly objects and lapply and enqueue_bulk are now blocking operations by default. Most tasks will clean up after they delete rather than leaving a persistent record on disk. The payback for this is potentially very fast task turnarounds and better behaviour with the disk under heavy load.

t <- rrq$enqueue(sin(1))
rrq$task_wait(t, 10)
#> [1] 0.841471

For example; submitting 50 trivial tasks to our pool of workers and retrieving the results:

system.time(res <- rrq$lapply(1:50, sin))
#>    user  system elapsed
#>   0.026   0.000   0.325

or 500 tasks:

system.time(res <- rrq$lapply(1:500, sin, progress = FALSE))
#>    user  system elapsed
#>   0.161   0.052   1.476

Across the network, the latency here is ~1/600 s per task. On fi–didemrchnb it will hopefully be a bit faster because of the infiniband network.

rrq$worker_stop()
rrq$destroy()

It is theoretically possible to submit a cluster job that creates an rrq_controller and controls the second queue. To do that you need to write function like:

get_rrq_controller <- function(x, ...) {
  queue_id <- Sys.getenv("CONTEXT_ID", "")
  stopifnot(queue_id != "")
  rrq::rrq_controller$new((queue_id)
}

within your sources, then you can use it in place of running (say) a lapply() call in your code. This approach allows a relatively simple form of inter-process communication. Talk to Rich if this is something you might have a use for, if you have simulation needs that are larger than a single node.

Advanced use: different worker and task resources

Suppose that we want to submit a job where we have a single process which orchestrates a group of workers each of which takes up an entire node. We used this pattern in the covid response where we wanted to run different MCMC chains on different nodes, each using 32 cores, but we also needed a single “controlling” process to organise collecting results from these nodes.

What we want to do do is specify a different set of resources to be used by the workers than by tasks submitted by $enqueue(). Note that this only makes sense when using use_rrq = TRUE. For example, to submit a controlling process that uses the GeneralNodes template and one core (the default) but worker processes that can use 8 cores, we might write

config <- didehpc::didehpc_config(
  use_rrq = TRUE,
  worker_resource = didehpc::worker_resource(cores = 8))
config
#> <didehpc_config>
#>  - cluster: fi--dideclusthn
#>  - credentials:
#>     - username: rfitzjoh
#>     - password: *******************
#>  - username: rfitzjoh
#>  - resource:
#>     - template: GeneralNodes
#>     - parallel: FALSE
#>     - count: 1
#>     - type: Cores
#>  - shares:
#>     - home: (local) /home/rich/net/home => \\fi--san03.dide.ic.ac.uk\homes\rfitzjoh => Q: (remote)
#>     - temp: (local) /home/rich/net/temp => \\fi--didef3.dide.ic.ac.uk\tmp => T: (remote)
#>  - use_workers: FALSE
#>  - use_rrq: TRUE
#>  - worker_timeout: 600
#>  - conan_bootstrap: TRUE
#>  - r_version: 4.0.3
#>  - use_java: FALSE
#>  - redis_host: fi--dideclusthn.dide.ic.ac.uk
#>  - worker_resource:
#>     - template: GeneralNodes
#>     - parallel: TRUE
#>     - count: 8
#>     - type: Cores

See the worker_resource section here indicates different resources to the resource section.

Now, when submitting workers with obj$submit_workers each worker will be able to use 8 cores, but a task submitted by $enqueue() will only be able to use one. You might then submit a task that uses the get_rrq_controller trick above as part of your single core job, which can then farm out work to your workers using the rrq queue object.

Cleaning up

Above we use rrq$destroy() to clean up every trace of the queue. Periodically we may flush the entire Redis database, or set all keys to expire after a day or so. Do not leave any important information in here please.