This vignette describes a pair of relatively advanced techniques for using a second queue embedded on top of the usual HPC scheduler in order to scale work; either running many more tasks than is convenient to run with the HPC scheduler, or running tasks that need more resources than you can easily get on a single cluster node.
If you have thousands and thousands of tasks to submit at once you may not want to flood the cluster with them all at once. Each task submission tends to be relatively slow on every platform that we have used, and submitting thousands of tasks will take minutes (even if you could submit 10 tasks a second, a thousand tasks will take almost two minutes, and a million tasks would take 27 hours!). Some cluster schedulers also slow down as the queue size increases, becoming less efficient at distributing work. And if you take up the whole cluster someone may come and find you in order to complain. At the same time, batching your tasks up into little bits and manually sending them off is a pain and work better done by a computer. An alternative is to submit a (relatively small) set of “workers” to the cluster, and then submit tasks to them. We call this the “lightweight queue pattern”.
The second use case is where you want to run some computation on the
cluster that needs to run some of its calculations in parallel, as you
might do following instructions in vignette("parallel")
.
Suppose that your cluster only has nodes with 32 cores though, and you
have some calculations that would benefit from more cores than this. Or
suppose that you want to farm out some smaller number number of
sub-tasks each of which will consume an entire node. You can use the
approach described here to scale your tasks off a single node, and we
call this the “interprocess communication pattern”.
Both of these patterns are enabled with our rrq
package, along
with a Redis server which is running on
the cluster.
These are advanced topics, so be sure you’re happy running tasks on the cluster before diving in here. You should also be prepared to make some fairly minor changes to your code to suit some limitations and constraints of this approach.
To get started, you will need the rrq
package, if you do
not have it already (this will be installed automatically by hipercow,
you can skip this step if you want)
install.packages(
"rrq",
repos = c("https://mrc-ide.r-universe.dev", "https://cloud.r-project.org"))
You’ll want a very recent version; here we are using version 0.7.21
Most interaction with rrq
is done via a “controller”;
this is an object that you can use to submit tasks and query on their
status:
The interface to this controller is subject to change, but many of the ideas will feel very similar to you from hipercow itself. The controller object is just a handle that can be used with most functions in the rrq package:
The other thing you’ll need are some workers. Let’s submit a single worker to the cluster, and wait for it to become available:
resources <- hipercow_resources(queue = "Testing")
info <- hipercow_rrq_workers_submit(1, resources = resources)
#> ✔ Using existing rrq queue 'rrq:7010f758'
#> ✔ Submitted task '8b61e994eecba7e711ea4aa9fa1dff2a' using 'windows'
#> ✔ Created bundle 'cluttered_wildcat' with 1 task
#>
⠙ Waiting for worker to start [2s][K
⠹ Waiting for worker to start [2.2s][K
⠸
#> Waiting for worker to start [2.4s][K
⠼ Waiting for worker to start [2.6s][K
⠴
#> Waiting for worker to start [2.8s][K
⠦ Waiting for worker to start [3s][K
⠧
#> Waiting for worker to start [3.2s][K
⠇ Waiting for worker to start [3.4s][K
⠏
#> Waiting for worker to start [3.6s][K
⠋ Waiting for worker to start [3.8s][K
⠙
#> Waiting for worker to start [4s][K
⠹ Waiting for worker to start [4.2s][K
⠸
#> Waiting for worker to start [4.4s][K
⠼ Waiting for worker to start [4.6s][K
⠴
#> Waiting for worker to start [4.8s][K
⠦ Waiting for worker to start [5s][K
⠧
#> Waiting for worker to start [5.2s][K
⠇ Waiting for worker to start [5.4s][K
⠏
#> Waiting for worker to start [5.6s][K
⠋ Waiting for worker to start [5.8s][K
⠙
#> Waiting for worker to start [6s][K
⠹ Waiting for worker to start [6.2s][K
⠸
#> Waiting for worker to start [6.4s][K
⠼ Waiting for worker to start [6.6s][K
⠴
#> Waiting for worker to start [6.8s][K
⠦ Waiting for worker to start [7s][K
⠧
#> Waiting for worker to start [7.2s][K
⠇ Waiting for worker to start [7.4s][K
⠏
#> Waiting for worker to start [7.6s][K
⠋ Waiting for worker to start [7.8s][K
⠙
#> Waiting for worker to start [8s][K
⠹ Waiting for worker to start [8.2s][K
⠸
#> Waiting for worker to start [8.4s][K
⠼ Waiting for worker to start [8.6s][K
⠴
#> Waiting for worker to start [8.8s][K
⠦ Waiting for worker to start [9s][K
⠧
#> Waiting for worker to start [9.2s][K
⠇ Waiting for worker to start [9.4s][K
⠏
#> Waiting for worker to start [9.6s][K
⠋ Waiting for worker to start [9.8s][K
⠙
#> Waiting for worker to start [10s][K
⠹ Waiting for worker to start [10.2s][K
⠸
#> Waiting for worker to start [10.4s][K
⠼ Waiting for worker to start [10.6s][K
⠴
#> Waiting for worker to start [10.8s][K
⠦ Waiting for worker to start [11s][K
⠧
#> Waiting for worker to start [11.2s][K
⠇ Waiting for worker to start [11.4s][K
⠏
#> Waiting for worker to start [11.6s][K
⠋ Waiting for worker to start [11.8s][K
⠙
#> Waiting for worker to start [12s][K
⠹ Waiting for worker to start [12.2s][K
⠸
#> Waiting for worker to start [12.4s][K
⠼ Waiting for worker to start [12.6s][K
⠴
#> Waiting for worker to start [12.8s][K
⠦ Waiting for worker to start [13s][K
⠧
#> Waiting for worker to start [13.2s][K
⠇ Waiting for worker to start [13.4s][K
⠏
#> Waiting for worker to start [13.6s][K
⠋ Waiting for worker to start [13.8s][K
⠙
#> Waiting for worker to start [14s][K
⠹ Waiting for worker to start [14.2s][K
⠸
#> Waiting for worker to start [14.4s][K
⠼ Waiting for worker to start [14.6s][K
⠴
#> Waiting for worker to start [14.8s][K
⠦ Waiting for worker to start [15s][K
⠧
#> Waiting for worker to start [15.2s][K
⠇ Waiting for worker to start [15.4s][K
⠏
#> Waiting for worker to start [15.6s][K
⠋ Waiting for worker to start [15.8s][K
⠙
#> Waiting for worker to start [16s][K
⠹ Waiting for worker to start [16.2s][K
⠸
#> Waiting for worker to start [16.4s][K
⠼ Waiting for worker to start [16.6s][K
⠴
#> Waiting for worker to start [16.8s][K
⠦ Waiting for worker to start [17s][K
⠧
#> Waiting for worker to start [17.2s][K
⠇ Waiting for worker to start [17.4s][K
⠏
#> Waiting for worker to start [17.6s][K
⠋ Waiting for worker to start [17.8s][K
⠙
#> Waiting for worker to start [18s][K
⠹ Waiting for worker to start [18.3s][K
⠸
#> Waiting for worker to start [18.5s][K
⠼ Waiting for worker to start [18.7s][K
⠴
#> Waiting for worker to start [18.9s][K
⠦ Waiting for worker to start [19.1s][K
⠧
#> Waiting for worker to start [19.3s][K
⠇ Waiting for worker to start [19.5s][K
⠏
#> Waiting for worker to start [19.7s][K
⠋ Waiting for worker to start [19.9s][K
⠙
#> Waiting for worker to start [20.1s][K
⠹ Waiting for worker to start [20.3s][K
⠸
#> Waiting for worker to start [20.5s][K
⠼ Waiting for worker to start [20.7s][K
⠴
#> Waiting for worker to start [20.9s][K
⠦ Waiting for worker to start [21.1s][K
⠧
#> Waiting for worker to start [21.3s][K
⠇ Waiting for worker to start [21.5s][K
⠏
#> Waiting for worker to start [21.7s][K
⠋ Waiting for worker to start [21.9s][K
⠙
#> Waiting for worker to start [22.1s][K
⠹ Waiting for worker to start [22.3s][K
⠸
#> Waiting for worker to start [22.5s][K
⠼ Waiting for worker to start [22.7s][K
⠴
#> Waiting for worker to start [22.9s][K
⠦ Waiting for worker to start [23.1s][K
⠧
#> Waiting for worker to start [23.3s][K
⠇ Waiting for worker to start [23.5s][K
⠏
#> Waiting for worker to start [23.7s][K
⠋ Waiting for worker to start [23.9s][K
⠙
#> Waiting for worker to start [24.1s][K
⠹ Waiting for worker to start [24.3s][K
⠸
#> Waiting for worker to start [24.5s][K
⠼ Waiting for worker to start [24.7s][K
⠴
#> Waiting for worker to start [24.9s][K
⠦ Waiting for worker to start [25.1s][K
⠧
#> Waiting for worker to start [25.3s][K
⠇ Waiting for worker to start [25.5s][K
⠏
#> Waiting for worker to start [25.7s][K
⠋ Waiting for worker to start [25.9s][K
⠙
#> Waiting for worker to start [26.1s][K
⠹ Waiting for worker to start [26.3s][K
⠸
#> Waiting for worker to start [26.5s][K
⠼ Waiting for worker to start [26.7s][K
⠴
#> Waiting for worker to start [26.9s][K
⠦ Waiting for worker to start [27.1s][K
⠧
#> Waiting for worker to start [27.3s][K
⠇ Waiting for worker to start [27.5s][K
⠏
#> Waiting for worker to start [27.7s][K
⠋ Waiting for worker to start [27.9s][K
⠙
#> Waiting for worker to start [28.1s][K
⠹ Waiting for worker to start [28.3s][K
⠸
#> Waiting for worker to start [28.5s][K
⠼ Waiting for worker to start [28.7s][K
⠴
#> Waiting for worker to start [28.9s][K
⠦ Waiting for worker to start [29.1s][K
⠧
#> Waiting for worker to start [29.3s][K
⠇ Waiting for worker to start [29.5s][K
⠏
#> Waiting for worker to start [29.7s][K
⠋ Waiting for worker to start [29.9s][K
⠙
#> Waiting for worker to start [30.1s][K
⠹ Waiting for worker to start [30.3s][K
⠸
#> Waiting for worker to start [30.5s][K
⠼ Waiting for worker to start [30.7s][K
⠴
#> Waiting for worker to start [30.9s][K
⠦ Waiting for worker to start [31.1s][K
⠧
#> Waiting for worker to start [31.3s][K
⠇ Waiting for worker to start [31.5s][K
[K
info
#> queue_id worker_id task_id
#> 1 rrq:7010f758 rrq-7010f758-ea39208bac92 8b61e994eecba7e711ea4aa9fa1dff2a
#> bundle_name
#> 1 cluttered_wildcat
(because we are experimenting here, we are using the
Testing
queue, which should be used only for finding out if
your cluster scripts are working).
The workers are submitted as task bundles and can be inspected using their bundle name like any other task:
This worker will remain running for 10 minutes after the last piece of work it runs.
We’ll load the rrq
package to make the calls a little
clearer to read:
Submitting a task works much the same as hipercow, except that rather
than task_create_expr
you will use
rrq_task_create_expr
and pass the the controller as an
argument:
as with hipercow, this id
is a hex string:
There’s nothing here to distinguish this from a task identifier in hipercow itself (both just use strings), so be careful with your workflows.
Once you have you have your task, interacting with it will feel familiar as you can query its status, wait on it and fetch the result:
rrq_task_status(id)
#> [1] "COMPLETE"
rrq_task_wait(id)
#> [1] TRUE
rrq_task_result(id)
#> [1] 0.090459940 0.003429649 0.362491860 0.017130037 0.775192023 0.263664258
#> [7] 0.627378736 0.694402343 0.057331173 0.546824257
The big difference here from hipercow is how fast this process should be; the roundtrip of a task here will be a (hopefully small) fraction of a second:
Let’s submit 1,000 trivial tasks, using
rrq_task_create_bulk_expr
, taking the square root of the
first thousand positive integers.
There’s no equivalent of a task bundle in rrq
; this just
returns a vector of task 1000 task identifiers. You can pass this vector
in to rrq_task_wait()
though, and then fetch the results
using rrq_task_results()
(note the pluralisation;
rrq_task_results()
always returns a list, while
rrq_task_result()
fetches a single task result).
ok <- rrq_task_wait(ids)
#>
⠙ Waiting for tasks (13 waiting, 0 running, 987 finished) [2s][K
[K
result <- rrq_task_results(ids)
This process has taken 2.12 seconds, which is likely much faster than submitting this many tasks directly.
You can submit a task to the cluster that accesses a pool of workers. This is quite difficult to demonstrate in a clear way, but bear with us.
The general pattern we try to achieve here is this:
A practical example of this approach might be to submit a hipercow task that runs an orderly report and within that you run an MCMC where you send each chain to a different worker (which may each use multiple cores). This gives you three levels of parallelism and the ability to span past a single node fairly easily. Though you may need to use a pen and paper to keep track of what you’re doing.
For this example, we’ll use the “windows” driver and submit work to a series of four workers running on the DIDE windows cluster.
We’ll submit a hipercow task that runs this small piece of code:
example <- function(n) {
ids <- rrq::rrq_task_create_bulk_call(sqrt, seq_len(n))
ok <- rrq::rrq_task_wait(ids)
stopifnot(ok)
result <- rrq::rrq_task_results(ids)
rrq::rrq_task_delete(ids)
sum(unlist(result))
}
Hopefully this is fairly self-explanatory, if a bit pointless. Note that we delete the rrq tasks after completing them, which prevents rrq tasks accumulating.
This code needs to be available in the hipercow environment run by the task:
It would be nice if we had some more workers too; let’s add a couple more:
info <- hipercow_rrq_workers_submit(2, resources = resources)
#> ✔ Using existing rrq queue 'rrq:7010f758'
#> ✔ Submitted 2 tasks using 'windows'
#> ✔ Created bundle 'twofaced_loon' with 2 tasks
#>
⠙ Waiting for workers (2 waiting, 0 running, 0 finished) [2s][K
⠹ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [2.2s][K
⠸ Waiting for workers (2
#> waiting, 0 running, 0 finished) [2.4s][K
⠼ Waiting for workers (2 waiting, 0
#> running, 0 finished) [2.6s][K
⠴ Waiting for workers (2 waiting, 0 running, 0
#> finished) [2.8s][K
⠦ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [3s][K
⠧ Waiting for workers (2 waiting, 0 running, 0 finished) [3.2s][K
⠇
#> Waiting for workers (2 waiting, 0 running, 0 finished) [3.4s][K
⠏ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [3.6s][K
⠋ Waiting for workers (2
#> waiting, 0 running, 0 finished) [3.8s][K
⠙ Waiting for workers (2 waiting, 0
#> running, 0 finished) [4s][K
⠹ Waiting for workers (2 waiting, 0 running, 0
#> finished) [4.2s][K
⠸ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [4.4s][K
⠼ Waiting for workers (2 waiting, 0 running, 0 finished) [4.6s][K
⠴
#> Waiting for workers (2 waiting, 0 running, 0 finished) [4.8s][K
⠦ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [5s][K
⠧ Waiting for workers (2
#> waiting, 0 running, 0 finished) [5.2s][K
⠇ Waiting for workers (2 waiting, 0
#> running, 0 finished) [5.4s][K
⠏ Waiting for workers (2 waiting, 0 running, 0
#> finished) [5.6s][K
⠋ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [5.8s][K
⠙ Waiting for workers (2 waiting, 0 running, 0 finished) [6s][K
⠹
#> Waiting for workers (2 waiting, 0 running, 0 finished) [6.2s][K
⠸ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [6.4s][K
⠼ Waiting for workers (2
#> waiting, 0 running, 0 finished) [6.6s][K
⠴ Waiting for workers (2 waiting, 0
#> running, 0 finished) [6.8s][K
⠦ Waiting for workers (2 waiting, 0 running, 0
#> finished) [7s][K
⠧ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [7.2s][K
⠇ Waiting for workers (2 waiting, 0 running, 0 finished) [7.4s][K
⠏
#> Waiting for workers (2 waiting, 0 running, 0 finished) [7.6s][K
⠋ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [7.8s][K
⠙ Waiting for workers (2
#> waiting, 0 running, 0 finished) [8s][K
⠹ Waiting for workers (2 waiting, 0
#> running, 0 finished) [8.2s][K
⠸ Waiting for workers (2 waiting, 0 running, 0
#> finished) [8.4s][K
⠼ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [8.6s][K
⠴ Waiting for workers (2 waiting, 0 running, 0 finished) [8.8s][K
⠦
#> Waiting for workers (2 waiting, 0 running, 0 finished) [9s][K
⠧ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [9.2s][K
⠇ Waiting for workers (2
#> waiting, 0 running, 0 finished) [9.4s][K
⠏ Waiting for workers (2 waiting, 0
#> running, 0 finished) [9.6s][K
⠋ Waiting for workers (2 waiting, 0 running, 0
#> finished) [9.8s][K
⠙ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [10s][K
⠹ Waiting for workers (2 waiting, 0 running, 0 finished) [10.2s][K
⠸
#> Waiting for workers (2 waiting, 0 running, 0 finished) [10.4s][K
⠼ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [10.6s][K
⠴ Waiting for workers (2
#> waiting, 0 running, 0 finished) [10.8s][K
⠦ Waiting for workers (2 waiting, 0
#> running, 0 finished) [11s][K
⠧ Waiting for workers (2 waiting, 0 running, 0
#> finished) [11.2s][K
⠇ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [11.4s][K
⠏ Waiting for workers (2 waiting, 0 running, 0 finished) [11.6s][K
⠋
#> Waiting for workers (2 waiting, 0 running, 0 finished) [11.8s][K
⠙ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [12s][K
⠹ Waiting for workers (2
#> waiting, 0 running, 0 finished) [12.2s][K
⠸ Waiting for workers (2 waiting, 0
#> running, 0 finished) [12.4s][K
⠼ Waiting for workers (2 waiting, 0 running, 0
#> finished) [12.6s][K
⠴ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [12.8s][K
⠦ Waiting for workers (2 waiting, 0 running, 0 finished) [13s][K
⠧
#> Waiting for workers (2 waiting, 0 running, 0 finished) [13.2s][K
⠇ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [13.4s][K
⠏ Waiting for workers (2
#> waiting, 0 running, 0 finished) [13.6s][K
⠋ Waiting for workers (2 waiting, 0
#> running, 0 finished) [13.8s][K
⠙ Waiting for workers (2 waiting, 0 running, 0
#> finished) [14s][K
⠹ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [14.2s][K
⠸ Waiting for workers (2 waiting, 0 running, 0 finished) [14.4s][K
⠼
#> Waiting for workers (2 waiting, 0 running, 0 finished) [14.6s][K
⠴ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [14.8s][K
⠦ Waiting for workers (2
#> waiting, 0 running, 0 finished) [15s][K
⠧ Waiting for workers (2 waiting, 0
#> running, 0 finished) [15.2s][K
⠇ Waiting for workers (2 waiting, 0 running, 0
#> finished) [15.4s][K
⠏ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [15.7s][K
⠋ Waiting for workers (2 waiting, 0 running, 0 finished) [15.9s][K
⠙
#> Waiting for workers (2 waiting, 0 running, 0 finished) [16.1s][K
⠹ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [16.3s][K
⠸ Waiting for workers (2
#> waiting, 0 running, 0 finished) [16.5s][K
⠼ Waiting for workers (2 waiting, 0
#> running, 0 finished) [16.7s][K
⠴ Waiting for workers (2 waiting, 0 running, 0
#> finished) [16.9s][K
⠦ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [17.1s][K
⠧ Waiting for workers (2 waiting, 0 running, 0 finished) [17.3s][K
⠇
#> Waiting for workers (2 waiting, 0 running, 0 finished) [17.5s][K
⠏ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [17.7s][K
⠋ Waiting for workers (2
#> waiting, 0 running, 0 finished) [17.9s][K
⠙ Waiting for workers (2 waiting, 0
#> running, 0 finished) [18.1s][K
⠹ Waiting for workers (2 waiting, 0 running, 0
#> finished) [18.3s][K
⠸ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [18.5s][K
⠼ Waiting for workers (2 waiting, 0 running, 0 finished) [18.7s][K
⠴
#> Waiting for workers (2 waiting, 0 running, 0 finished) [18.9s][K
⠦ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [19.1s][K
⠧ Waiting for workers (2
#> waiting, 0 running, 0 finished) [19.3s][K
⠇ Waiting for workers (2 waiting, 0
#> running, 0 finished) [19.5s][K
⠏ Waiting for workers (2 waiting, 0 running, 0
#> finished) [19.7s][K
⠋ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [19.9s][K
⠙ Waiting for workers (2 waiting, 0 running, 0 finished) [20.1s][K
⠹
#> Waiting for workers (2 waiting, 0 running, 0 finished) [20.3s][K
⠸ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [20.5s][K
⠼ Waiting for workers (2
#> waiting, 0 running, 0 finished) [20.7s][K
⠴ Waiting for workers (2 waiting, 0
#> running, 0 finished) [20.9s][K
⠦ Waiting for workers (2 waiting, 0 running, 0
#> finished) [21.1s][K
⠧ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [21.3s][K
⠇ Waiting for workers (2 waiting, 0 running, 0 finished) [21.5s][K
⠏
#> Waiting for workers (2 waiting, 0 running, 0 finished) [21.7s][K
⠋ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [21.9s][K
⠙ Waiting for workers (2
#> waiting, 0 running, 0 finished) [22.1s][K
⠹ Waiting for workers (2 waiting, 0
#> running, 0 finished) [22.3s][K
⠸ Waiting for workers (2 waiting, 0 running, 0
#> finished) [22.5s][K
⠼ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [22.7s][K
⠴ Waiting for workers (2 waiting, 0 running, 0 finished) [22.9s][K
⠦
#> Waiting for workers (2 waiting, 0 running, 0 finished) [23.1s][K
⠧ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [23.3s][K
⠇ Waiting for workers (2
#> waiting, 0 running, 0 finished) [23.5s][K
⠏ Waiting for workers (2 waiting, 0
#> running, 0 finished) [23.7s][K
⠋ Waiting for workers (2 waiting, 0 running, 0
#> finished) [23.9s][K
⠙ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [24.1s][K
⠹ Waiting for workers (2 waiting, 0 running, 0 finished) [24.3s][K
⠸
#> Waiting for workers (2 waiting, 0 running, 0 finished) [24.5s][K
⠼ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [24.7s][K
⠴ Waiting for workers (2
#> waiting, 0 running, 0 finished) [24.9s][K
⠦ Waiting for workers (2 waiting, 0
#> running, 0 finished) [25.1s][K
⠧ Waiting for workers (2 waiting, 0 running, 0
#> finished) [25.3s][K
⠇ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [25.5s][K
⠏ Waiting for workers (2 waiting, 0 running, 0 finished) [25.7s][K
⠋
#> Waiting for workers (2 waiting, 0 running, 0 finished) [25.9s][K
⠙ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [26.1s][K
⠹ Waiting for workers (2
#> waiting, 0 running, 0 finished) [26.3s][K
⠸ Waiting for workers (1 waiting, 0
#> running, 1 finished) [26.5s][K
[K
now submit a task that will call this function:
id <- task_create_expr(
example(16),
parallel = hipercow_parallel(use_rrq = TRUE),
resources = resources)
#> ✔ Submitted task '71e64618d20f71852f7c0097d51799a5' using 'windows'
id
#> [1] "71e64618d20f71852f7c0097d51799a5"
Here, id
is a hipercow task identifier. This
sends an additional task to the queue, so now we have five things
running on the cluster (four workers and one task). The task runs, picks
up the controller, then uses it do distribute a (trivial) calculation
over the four workers.
You can see from the worker logs here the tasks being split between the workers:
rrq_worker_log_tail(n = 32)
#> worker_id child time command
#> 1 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_START
#> 2 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_COMPLETE
#> 3 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_START
#> 4 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_COMPLETE
#> 5 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_START
#> 6 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_COMPLETE
#> 7 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_START
#> 8 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_COMPLETE
#> 9 rrq-7010f758-bd7c13c3581f NA 1732816713 ALIVE
#> 10 rrq-7010f758-bd7c13c3581f NA 1732816713 ENVIR
#> 11 rrq-7010f758-bd7c13c3581f NA 1732816713 ENVIR
#> 12 rrq-7010f758-bd7c13c3581f NA 1732816713 QUEUE
#> 13 rrq-7010f758-917984e8738f NA 1732816713 ALIVE
#> 14 rrq-7010f758-917984e8738f NA 1732816713 ENVIR
#> 15 rrq-7010f758-917984e8738f NA 1732816713 ENVIR
#> 16 rrq-7010f758-917984e8738f NA 1732816713 QUEUE
#> 17 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 18 rrq-7010f758-917984e8738f NA 1732816744 TASK_START
#> 19 rrq-7010f758-bd7c13c3581f NA 1732816744 TASK_START
#> 20 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 21 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 22 rrq-7010f758-bd7c13c3581f NA 1732816744 TASK_COMPLETE
#> 23 rrq-7010f758-917984e8738f NA 1732816744 TASK_COMPLETE
#> 24 rrq-7010f758-bd7c13c3581f NA 1732816744 TASK_START
#> 25 rrq-7010f758-917984e8738f NA 1732816744 TASK_START
#> 26 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 27 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 28 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 29 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 30 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 31 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 32 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 33 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 34 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 35 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 36 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 37 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 38 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 39 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 40 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 41 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 42 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 43 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 44 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 45 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 46 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 47 rrq-7010f758-bd7c13c3581f NA 1732816744 TASK_COMPLETE
#> 48 rrq-7010f758-917984e8738f NA 1732816744 TASK_COMPLETE
#> message
#> 1 fac7edb4aba7afb25a5f5fb89dd9953e
#> 2 fac7edb4aba7afb25a5f5fb89dd9953e
#> 3 1498b3eea50b6a6e6e3d77af58bd8221
#> 4 1498b3eea50b6a6e6e3d77af58bd8221
#> 5 9882b0b7f866bd795c609a0050046127
#> 6 9882b0b7f866bd795c609a0050046127
#> 7 fa7582fb84ac23c84daca75876249cf9
#> 8 fa7582fb84ac23c84daca75876249cf9
#> 9
#> 10 new
#> 11 create
#> 12 default
#> 13
#> 14 new
#> 15 create
#> 16 default
#> 17 da0da47f97109e83a1058b190c9f8332
#> 18 1492f25d1b38f004ff5fd2981afb7cb9
#> 19 70533fd8d88fa4da0381b9f11b54731a
#> 20 da0da47f97109e83a1058b190c9f8332
#> 21 a24702958ddc18f1acb762f4841535e6
#> 22 70533fd8d88fa4da0381b9f11b54731a
#> 23 1492f25d1b38f004ff5fd2981afb7cb9
#> 24 8b74ff24f7db85876ffa7da746711ae6
#> 25 5c366be9197c398bc25afbeef925de4c
#> 26 a24702958ddc18f1acb762f4841535e6
#> 27 166b5210958ad0ca2997b4d32d5e84aa
#> 28 166b5210958ad0ca2997b4d32d5e84aa
#> 29 420cb53445a3e1c04dee89751836eb4d
#> 30 420cb53445a3e1c04dee89751836eb4d
#> 31 34e98c52297f50b9ce5f3bd88f011276
#> 32 34e98c52297f50b9ce5f3bd88f011276
#> 33 db9cb6dd182f537e1bf086854c1fd2ec
#> 34 db9cb6dd182f537e1bf086854c1fd2ec
#> 35 18aa71b506647566a08b83ac4c4efa06
#> 36 18aa71b506647566a08b83ac4c4efa06
#> 37 1876225bb4f5a0b9ed103f87c4589c6e
#> 38 1876225bb4f5a0b9ed103f87c4589c6e
#> 39 d50cdad8eec494e653f6a0c4bb5dc27c
#> 40 d50cdad8eec494e653f6a0c4bb5dc27c
#> 41 c42185da2edacbec230f13c912938759
#> 42 c42185da2edacbec230f13c912938759
#> 43 14f3608cc4663a84576108cf9822421f
#> 44 14f3608cc4663a84576108cf9822421f
#> 45 97ad2df3d8cc470a32879d7992feb617
#> 46 97ad2df3d8cc470a32879d7992feb617
#> 47 8b74ff24f7db85876ffa7da746711ae6
#> 48 5c366be9197c398bc25afbeef925de4c
This example is trivial, but you could submit 10 workers each using a 32 core node, and then use a single core task to farm out a series of large simulations across your bank of computers. Or create a 500 single core workers (so ~25% of the cluster) and smash through a huge number of simulations with minimal overhead.
This section will expand as we document patterns that have been useful.
The workers will use the rrq
environment if it exists,
failing that the default
environment. So if you need
different packages and sources loaded on the workers on your normal
tasks, you can do this by creating a different environment
hipercow_environment_create("rrq", packages = "cowsay")
#> ✔ Created environment 'rrq'
#> ℹ Refreshing existing rrq worker environments
#> ✔ Using existing rrq queue 'rrq:7010f758'
You can submit your workers with any resources and parallel control
you want (see vignettes("parallel")
for details); pass
these as resources
and parallel
to
hipercow_rrq_workers_submit()
.
By default, workers will live for 10 minutes after finishing their last task. This means that most of the time that you use workers you can largely forget about cleanup. If you want be polite and give up these resources early (this would be important if you wanted to launch new workers, or if you were using a large fraction of the cluster), you can tell them to stop after completion of the last job:
You should not treat data in a submitted task as permanent; it is subject for deletion at any point! So your aim should be to pull the data out of rrq as soon as you can. Practically we won’t delete data from the database for at least a few days after creation, but we make no guarantees. We’ll describe cleanup here later.
We reserve the right to delete things from the Redis database without warning, though we will try and be polite about doing this.
Redis is an in memory datastore, which means that all the
inputs and outputs from your tasks are stored in memory on the head
node. This means that you do need to be careful about what you store as
part of your tasks. We will refuse to save any object larger than 100KB
once serialised (approximately the size of a file created by
saveRDS()
without using compression).
We encourage you to make use of rrq’s task deletion using
rrq::rrq_task_delete()
to delete tasks once you are done
with them. You can pass a long vector efficiently into this
function.
If you need to save large outputs you will need to write them to a
file (e.g., with saveRDS()
rather than returning them from
the function or expression set as the target of the rrq task. If you are
submitting a very large number of tasks that take a short and
deterministic time to run this can put a lot of load on the file server,
so be sure you are using a project share and not a personal share when
using the windows cluster (see vignette("windows")
).