---
title: "Workers"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{Workers}
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
---
This vignette describes a pair of relatively advanced techniques for using a *second* queue embedded on top of the usual HPC scheduler in order to scale work; either running many more tasks than is convenient to run with the HPC scheduler, or running tasks that need more resources than you can easily get on a single cluster node.
If you have thousands and thousands of tasks to submit at once you may not want to flood the cluster with them all at once. Each task submission tends to be relatively slow on every platform that we have used, and submitting thousands of tasks will take minutes (even if you could submit 10 tasks a second, a thousand tasks will take almost two minutes, and a million tasks would take 27 hours!). Some cluster schedulers also slow down as the queue size increases, becoming less efficient at distributing work. And if you take up the whole cluster someone may come and find you in order to complain. At the same time, batching your tasks up into little bits and manually sending them off is a pain and work better done by a computer. An alternative is to submit a (relatively small) set of "workers" to the cluster, and then submit tasks to them. We call this the **"lightweight queue pattern"**.
The second use case is where you want to run some computation on the cluster that needs to run some of its calculations in parallel, as you might do following instructions in `vignette("parallel")`. Suppose that your cluster only has nodes with 32 cores though, and you have some calculations that would benefit from more cores than this. Or suppose that you want to farm out some smaller number number of sub-tasks each of which will consume an entire node. You can use the approach described here to scale your tasks off a single node, and we call this the **"interprocess communication pattern"**.
Both of these patterns are enabled with our [`rrq`](https://mrc-ide.github.io/rrq) package, along with a [Redis](https://redis.io) server which is running on the cluster.
These are advanced topics, so be sure you're happy running tasks on the cluster before diving in here. You should also be prepared to make some fairly minor changes to your code to suit some limitations and constraints of this approach.
# Getting started
To get started, you will need the `rrq` package, if you do not have it already (this will be installed automatically by hipercow, you can skip this step if you want)
```r
install.packages(
"rrq",
repos = c("https://mrc-ide.r-universe.dev", "https://cloud.r-project.org"))
```
You'll want a very recent version; here we are using version 0.7.21
``` r
library(hipercow)
hipercow_init(driver = "windows")
#> ✔ Initialised hipercow at '.' (/home/rfitzjoh/net/home/cluster/hipercow-vignette/hv-20241128-3181c95d685377)
#> ✔ Configured hipercow to use 'windows'
```
# The lightweight queue pattern
Most interaction with `rrq` is done via a "controller"; this is an object that you can use to submit tasks and query on their status:
``` r
r <- hipercow_rrq_controller()
#> ✔ Created new rrq queue 'rrq:7010f758'
```
The interface to this controller is subject to change, but many of the ideas will feel very similar to you from hipercow itself. The controller object is just a handle that can be used with most functions in the rrq package:
``` r
r
#>
```
The other thing you'll need are some workers. Let's submit a single worker to the cluster, and wait for it to become available:
``` r
resources <- hipercow_resources(queue = "Testing")
info <- hipercow_rrq_workers_submit(1, resources = resources)
#> ✔ Using existing rrq queue 'rrq:7010f758'
#> ✔ Submitted task '8b61e994eecba7e711ea4aa9fa1dff2a' using 'windows'
#> ✔ Created bundle 'cluttered_wildcat' with 1 task
#>
⠙ Waiting for worker to start [2s][K
⠹ Waiting for worker to start [2.2s][K
⠸
#> Waiting for worker to start [2.4s][K
⠼ Waiting for worker to start [2.6s][K
⠴
#> Waiting for worker to start [2.8s][K
⠦ Waiting for worker to start [3s][K
⠧
#> Waiting for worker to start [3.2s][K
⠇ Waiting for worker to start [3.4s][K
⠏
#> Waiting for worker to start [3.6s][K
⠋ Waiting for worker to start [3.8s][K
⠙
#> Waiting for worker to start [4s][K
⠹ Waiting for worker to start [4.2s][K
⠸
#> Waiting for worker to start [4.4s][K
⠼ Waiting for worker to start [4.6s][K
⠴
#> Waiting for worker to start [4.8s][K
⠦ Waiting for worker to start [5s][K
⠧
#> Waiting for worker to start [5.2s][K
⠇ Waiting for worker to start [5.4s][K
⠏
#> Waiting for worker to start [5.6s][K
⠋ Waiting for worker to start [5.8s][K
⠙
#> Waiting for worker to start [6s][K
⠹ Waiting for worker to start [6.2s][K
⠸
#> Waiting for worker to start [6.4s][K
⠼ Waiting for worker to start [6.6s][K
⠴
#> Waiting for worker to start [6.8s][K
⠦ Waiting for worker to start [7s][K
⠧
#> Waiting for worker to start [7.2s][K
⠇ Waiting for worker to start [7.4s][K
⠏
#> Waiting for worker to start [7.6s][K
⠋ Waiting for worker to start [7.8s][K
⠙
#> Waiting for worker to start [8s][K
⠹ Waiting for worker to start [8.2s][K
⠸
#> Waiting for worker to start [8.4s][K
⠼ Waiting for worker to start [8.6s][K
⠴
#> Waiting for worker to start [8.8s][K
⠦ Waiting for worker to start [9s][K
⠧
#> Waiting for worker to start [9.2s][K
⠇ Waiting for worker to start [9.4s][K
⠏
#> Waiting for worker to start [9.6s][K
⠋ Waiting for worker to start [9.8s][K
⠙
#> Waiting for worker to start [10s][K
⠹ Waiting for worker to start [10.2s][K
⠸
#> Waiting for worker to start [10.4s][K
⠼ Waiting for worker to start [10.6s][K
⠴
#> Waiting for worker to start [10.8s][K
⠦ Waiting for worker to start [11s][K
⠧
#> Waiting for worker to start [11.2s][K
⠇ Waiting for worker to start [11.4s][K
⠏
#> Waiting for worker to start [11.6s][K
⠋ Waiting for worker to start [11.8s][K
⠙
#> Waiting for worker to start [12s][K
⠹ Waiting for worker to start [12.2s][K
⠸
#> Waiting for worker to start [12.4s][K
⠼ Waiting for worker to start [12.6s][K
⠴
#> Waiting for worker to start [12.8s][K
⠦ Waiting for worker to start [13s][K
⠧
#> Waiting for worker to start [13.2s][K
⠇ Waiting for worker to start [13.4s][K
⠏
#> Waiting for worker to start [13.6s][K
⠋ Waiting for worker to start [13.8s][K
⠙
#> Waiting for worker to start [14s][K
⠹ Waiting for worker to start [14.2s][K
⠸
#> Waiting for worker to start [14.4s][K
⠼ Waiting for worker to start [14.6s][K
⠴
#> Waiting for worker to start [14.8s][K
⠦ Waiting for worker to start [15s][K
⠧
#> Waiting for worker to start [15.2s][K
⠇ Waiting for worker to start [15.4s][K
⠏
#> Waiting for worker to start [15.6s][K
⠋ Waiting for worker to start [15.8s][K
⠙
#> Waiting for worker to start [16s][K
⠹ Waiting for worker to start [16.2s][K
⠸
#> Waiting for worker to start [16.4s][K
⠼ Waiting for worker to start [16.6s][K
⠴
#> Waiting for worker to start [16.8s][K
⠦ Waiting for worker to start [17s][K
⠧
#> Waiting for worker to start [17.2s][K
⠇ Waiting for worker to start [17.4s][K
⠏
#> Waiting for worker to start [17.6s][K
⠋ Waiting for worker to start [17.8s][K
⠙
#> Waiting for worker to start [18s][K
⠹ Waiting for worker to start [18.3s][K
⠸
#> Waiting for worker to start [18.5s][K
⠼ Waiting for worker to start [18.7s][K
⠴
#> Waiting for worker to start [18.9s][K
⠦ Waiting for worker to start [19.1s][K
⠧
#> Waiting for worker to start [19.3s][K
⠇ Waiting for worker to start [19.5s][K
⠏
#> Waiting for worker to start [19.7s][K
⠋ Waiting for worker to start [19.9s][K
⠙
#> Waiting for worker to start [20.1s][K
⠹ Waiting for worker to start [20.3s][K
⠸
#> Waiting for worker to start [20.5s][K
⠼ Waiting for worker to start [20.7s][K
⠴
#> Waiting for worker to start [20.9s][K
⠦ Waiting for worker to start [21.1s][K
⠧
#> Waiting for worker to start [21.3s][K
⠇ Waiting for worker to start [21.5s][K
⠏
#> Waiting for worker to start [21.7s][K
⠋ Waiting for worker to start [21.9s][K
⠙
#> Waiting for worker to start [22.1s][K
⠹ Waiting for worker to start [22.3s][K
⠸
#> Waiting for worker to start [22.5s][K
⠼ Waiting for worker to start [22.7s][K
⠴
#> Waiting for worker to start [22.9s][K
⠦ Waiting for worker to start [23.1s][K
⠧
#> Waiting for worker to start [23.3s][K
⠇ Waiting for worker to start [23.5s][K
⠏
#> Waiting for worker to start [23.7s][K
⠋ Waiting for worker to start [23.9s][K
⠙
#> Waiting for worker to start [24.1s][K
⠹ Waiting for worker to start [24.3s][K
⠸
#> Waiting for worker to start [24.5s][K
⠼ Waiting for worker to start [24.7s][K
⠴
#> Waiting for worker to start [24.9s][K
⠦ Waiting for worker to start [25.1s][K
⠧
#> Waiting for worker to start [25.3s][K
⠇ Waiting for worker to start [25.5s][K
⠏
#> Waiting for worker to start [25.7s][K
⠋ Waiting for worker to start [25.9s][K
⠙
#> Waiting for worker to start [26.1s][K
⠹ Waiting for worker to start [26.3s][K
⠸
#> Waiting for worker to start [26.5s][K
⠼ Waiting for worker to start [26.7s][K
⠴
#> Waiting for worker to start [26.9s][K
⠦ Waiting for worker to start [27.1s][K
⠧
#> Waiting for worker to start [27.3s][K
⠇ Waiting for worker to start [27.5s][K
⠏
#> Waiting for worker to start [27.7s][K
⠋ Waiting for worker to start [27.9s][K
⠙
#> Waiting for worker to start [28.1s][K
⠹ Waiting for worker to start [28.3s][K
⠸
#> Waiting for worker to start [28.5s][K
⠼ Waiting for worker to start [28.7s][K
⠴
#> Waiting for worker to start [28.9s][K
⠦ Waiting for worker to start [29.1s][K
⠧
#> Waiting for worker to start [29.3s][K
⠇ Waiting for worker to start [29.5s][K
⠏
#> Waiting for worker to start [29.7s][K
⠋ Waiting for worker to start [29.9s][K
⠙
#> Waiting for worker to start [30.1s][K
⠹ Waiting for worker to start [30.3s][K
⠸
#> Waiting for worker to start [30.5s][K
⠼ Waiting for worker to start [30.7s][K
⠴
#> Waiting for worker to start [30.9s][K
⠦ Waiting for worker to start [31.1s][K
⠧
#> Waiting for worker to start [31.3s][K
⠇ Waiting for worker to start [31.5s][K
[K
info
#> queue_id worker_id task_id
#> 1 rrq:7010f758 rrq-7010f758-ea39208bac92 8b61e994eecba7e711ea4aa9fa1dff2a
#> bundle_name
#> 1 cluttered_wildcat
```
(because we are experimenting here, we are using the `Testing` queue, which should be used only for finding out if your cluster scripts are working).
The workers are submitted as task bundles and can be inspected using their bundle name like any other task:
``` r
hipercow_bundle_status(info$bundle_name)
#> [1] "running"
```
This worker will remain running for 10 minutes after the last piece of work it runs.
## Basic usage
We'll load the `rrq` package to make the calls a little clearer to read:
``` r
library(rrq)
```
Submitting a task works much the same as hipercow, except that rather than `task_create_expr` you will use `rrq_task_create_expr` and pass the the controller as an argument:
``` r
id <- rrq_task_create_expr(runif(10))
```
as with hipercow, this `id` is a hex string:
``` r
id
#> [1] "ee7e3e3f5a6f119da1c243ba086b52b9"
```
There's nothing here to distinguish this from a task identifier in hipercow itself (both just use strings), so be careful with your workflows.
Once you have you have your task, interacting with it will feel familiar as you can query its status, wait on it and fetch the result:
``` r
rrq_task_status(id)
#> [1] "COMPLETE"
rrq_task_wait(id)
#> [1] TRUE
rrq_task_result(id)
#> [1] 0.090459940 0.003429649 0.362491860 0.017130037 0.775192023 0.263664258
#> [7] 0.627378736 0.694402343 0.057331173 0.546824257
```
The big difference here from hipercow is how fast this process should be; the roundtrip of a task here will be a (hopefully small) fraction of a second:
``` r
system.time({
id <- rrq_task_create_expr(runif(10))
rrq_task_wait(id)
rrq_task_result(id)
})
#> user system elapsed
#> 0.001 0.001 0.038
```
## Scaling up
Let's submit 1,000 trivial tasks, using `rrq_task_create_bulk_expr`, taking the square root of the first thousand positive integers.
``` r
ids <- rrq_task_create_bulk_expr(sqrt(x), data.frame(x = 1:1000))
```
There's no equivalent of a task bundle in `rrq`; this just returns a vector of task 1000 task identifiers. You can pass this vector in to `rrq_task_wait()` though, and then fetch the results using `rrq_task_results()` (note the pluralisation; `rrq_task_results()` always returns a list, while `rrq_task_result()` fetches a single task result).
``` r
ok <- rrq_task_wait(ids)
#>
⠙ Waiting for tasks (13 waiting, 0 running, 987 finished) [2s][K
[K
result <- rrq_task_results(ids)
```
This process has taken 2.12 seconds, which is likely much faster than submitting this many tasks directly.
# Interprocess commuication pattern
You can submit a task to the cluster that accesses a pool of workers. This is quite difficult to demonstrate in a clear way, but bear with us.
The general pattern we try to achieve here is this:
1. Submit a normal hipercow task to the cluster
2. This task distributes work over a set of workers; this might happen several times
3. Return some summary of this work from your main task
A practical example of this approach might be to submit a hipercow task that runs an orderly report and within that you run an MCMC where you send each chain to a different worker (which may each use multiple cores). This gives you three levels of parallelism and the ability to span past a single node fairly easily. Though you may need to use a pen and paper to keep track of what you're doing.
For this example, we'll use the "windows" driver and submit work to a series of four workers running on the DIDE windows cluster.
We'll submit a hipercow task that runs this small piece of code:
```r
example <- function(n) {
ids <- rrq::rrq_task_create_bulk_call(sqrt, seq_len(n))
ok <- rrq::rrq_task_wait(ids)
stopifnot(ok)
result <- rrq::rrq_task_results(ids)
rrq::rrq_task_delete(ids)
sum(unlist(result))
}
```
Hopefully this is fairly self-explanatory, if a bit pointless. Note that we delete the rrq tasks after completing them, which prevents rrq tasks accumulating.
This code needs to be available in the hipercow environment run by the task:
``` r
hipercow_environment_create("default", sources = "code.R")
#> ✔ Created environment 'default'
```
It would be nice if we had some more workers too; let's add a couple more:
``` r
info <- hipercow_rrq_workers_submit(2, resources = resources)
#> ✔ Using existing rrq queue 'rrq:7010f758'
#> ✔ Submitted 2 tasks using 'windows'
#> ✔ Created bundle 'twofaced_loon' with 2 tasks
#>
⠙ Waiting for workers (2 waiting, 0 running, 0 finished) [2s][K
⠹ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [2.2s][K
⠸ Waiting for workers (2
#> waiting, 0 running, 0 finished) [2.4s][K
⠼ Waiting for workers (2 waiting, 0
#> running, 0 finished) [2.6s][K
⠴ Waiting for workers (2 waiting, 0 running, 0
#> finished) [2.8s][K
⠦ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [3s][K
⠧ Waiting for workers (2 waiting, 0 running, 0 finished) [3.2s][K
⠇
#> Waiting for workers (2 waiting, 0 running, 0 finished) [3.4s][K
⠏ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [3.6s][K
⠋ Waiting for workers (2
#> waiting, 0 running, 0 finished) [3.8s][K
⠙ Waiting for workers (2 waiting, 0
#> running, 0 finished) [4s][K
⠹ Waiting for workers (2 waiting, 0 running, 0
#> finished) [4.2s][K
⠸ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [4.4s][K
⠼ Waiting for workers (2 waiting, 0 running, 0 finished) [4.6s][K
⠴
#> Waiting for workers (2 waiting, 0 running, 0 finished) [4.8s][K
⠦ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [5s][K
⠧ Waiting for workers (2
#> waiting, 0 running, 0 finished) [5.2s][K
⠇ Waiting for workers (2 waiting, 0
#> running, 0 finished) [5.4s][K
⠏ Waiting for workers (2 waiting, 0 running, 0
#> finished) [5.6s][K
⠋ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [5.8s][K
⠙ Waiting for workers (2 waiting, 0 running, 0 finished) [6s][K
⠹
#> Waiting for workers (2 waiting, 0 running, 0 finished) [6.2s][K
⠸ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [6.4s][K
⠼ Waiting for workers (2
#> waiting, 0 running, 0 finished) [6.6s][K
⠴ Waiting for workers (2 waiting, 0
#> running, 0 finished) [6.8s][K
⠦ Waiting for workers (2 waiting, 0 running, 0
#> finished) [7s][K
⠧ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [7.2s][K
⠇ Waiting for workers (2 waiting, 0 running, 0 finished) [7.4s][K
⠏
#> Waiting for workers (2 waiting, 0 running, 0 finished) [7.6s][K
⠋ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [7.8s][K
⠙ Waiting for workers (2
#> waiting, 0 running, 0 finished) [8s][K
⠹ Waiting for workers (2 waiting, 0
#> running, 0 finished) [8.2s][K
⠸ Waiting for workers (2 waiting, 0 running, 0
#> finished) [8.4s][K
⠼ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [8.6s][K
⠴ Waiting for workers (2 waiting, 0 running, 0 finished) [8.8s][K
⠦
#> Waiting for workers (2 waiting, 0 running, 0 finished) [9s][K
⠧ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [9.2s][K
⠇ Waiting for workers (2
#> waiting, 0 running, 0 finished) [9.4s][K
⠏ Waiting for workers (2 waiting, 0
#> running, 0 finished) [9.6s][K
⠋ Waiting for workers (2 waiting, 0 running, 0
#> finished) [9.8s][K
⠙ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [10s][K
⠹ Waiting for workers (2 waiting, 0 running, 0 finished) [10.2s][K
⠸
#> Waiting for workers (2 waiting, 0 running, 0 finished) [10.4s][K
⠼ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [10.6s][K
⠴ Waiting for workers (2
#> waiting, 0 running, 0 finished) [10.8s][K
⠦ Waiting for workers (2 waiting, 0
#> running, 0 finished) [11s][K
⠧ Waiting for workers (2 waiting, 0 running, 0
#> finished) [11.2s][K
⠇ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [11.4s][K
⠏ Waiting for workers (2 waiting, 0 running, 0 finished) [11.6s][K
⠋
#> Waiting for workers (2 waiting, 0 running, 0 finished) [11.8s][K
⠙ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [12s][K
⠹ Waiting for workers (2
#> waiting, 0 running, 0 finished) [12.2s][K
⠸ Waiting for workers (2 waiting, 0
#> running, 0 finished) [12.4s][K
⠼ Waiting for workers (2 waiting, 0 running, 0
#> finished) [12.6s][K
⠴ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [12.8s][K
⠦ Waiting for workers (2 waiting, 0 running, 0 finished) [13s][K
⠧
#> Waiting for workers (2 waiting, 0 running, 0 finished) [13.2s][K
⠇ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [13.4s][K
⠏ Waiting for workers (2
#> waiting, 0 running, 0 finished) [13.6s][K
⠋ Waiting for workers (2 waiting, 0
#> running, 0 finished) [13.8s][K
⠙ Waiting for workers (2 waiting, 0 running, 0
#> finished) [14s][K
⠹ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [14.2s][K
⠸ Waiting for workers (2 waiting, 0 running, 0 finished) [14.4s][K
⠼
#> Waiting for workers (2 waiting, 0 running, 0 finished) [14.6s][K
⠴ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [14.8s][K
⠦ Waiting for workers (2
#> waiting, 0 running, 0 finished) [15s][K
⠧ Waiting for workers (2 waiting, 0
#> running, 0 finished) [15.2s][K
⠇ Waiting for workers (2 waiting, 0 running, 0
#> finished) [15.4s][K
⠏ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [15.7s][K
⠋ Waiting for workers (2 waiting, 0 running, 0 finished) [15.9s][K
⠙
#> Waiting for workers (2 waiting, 0 running, 0 finished) [16.1s][K
⠹ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [16.3s][K
⠸ Waiting for workers (2
#> waiting, 0 running, 0 finished) [16.5s][K
⠼ Waiting for workers (2 waiting, 0
#> running, 0 finished) [16.7s][K
⠴ Waiting for workers (2 waiting, 0 running, 0
#> finished) [16.9s][K
⠦ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [17.1s][K
⠧ Waiting for workers (2 waiting, 0 running, 0 finished) [17.3s][K
⠇
#> Waiting for workers (2 waiting, 0 running, 0 finished) [17.5s][K
⠏ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [17.7s][K
⠋ Waiting for workers (2
#> waiting, 0 running, 0 finished) [17.9s][K
⠙ Waiting for workers (2 waiting, 0
#> running, 0 finished) [18.1s][K
⠹ Waiting for workers (2 waiting, 0 running, 0
#> finished) [18.3s][K
⠸ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [18.5s][K
⠼ Waiting for workers (2 waiting, 0 running, 0 finished) [18.7s][K
⠴
#> Waiting for workers (2 waiting, 0 running, 0 finished) [18.9s][K
⠦ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [19.1s][K
⠧ Waiting for workers (2
#> waiting, 0 running, 0 finished) [19.3s][K
⠇ Waiting for workers (2 waiting, 0
#> running, 0 finished) [19.5s][K
⠏ Waiting for workers (2 waiting, 0 running, 0
#> finished) [19.7s][K
⠋ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [19.9s][K
⠙ Waiting for workers (2 waiting, 0 running, 0 finished) [20.1s][K
⠹
#> Waiting for workers (2 waiting, 0 running, 0 finished) [20.3s][K
⠸ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [20.5s][K
⠼ Waiting for workers (2
#> waiting, 0 running, 0 finished) [20.7s][K
⠴ Waiting for workers (2 waiting, 0
#> running, 0 finished) [20.9s][K
⠦ Waiting for workers (2 waiting, 0 running, 0
#> finished) [21.1s][K
⠧ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [21.3s][K
⠇ Waiting for workers (2 waiting, 0 running, 0 finished) [21.5s][K
⠏
#> Waiting for workers (2 waiting, 0 running, 0 finished) [21.7s][K
⠋ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [21.9s][K
⠙ Waiting for workers (2
#> waiting, 0 running, 0 finished) [22.1s][K
⠹ Waiting for workers (2 waiting, 0
#> running, 0 finished) [22.3s][K
⠸ Waiting for workers (2 waiting, 0 running, 0
#> finished) [22.5s][K
⠼ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [22.7s][K
⠴ Waiting for workers (2 waiting, 0 running, 0 finished) [22.9s][K
⠦
#> Waiting for workers (2 waiting, 0 running, 0 finished) [23.1s][K
⠧ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [23.3s][K
⠇ Waiting for workers (2
#> waiting, 0 running, 0 finished) [23.5s][K
⠏ Waiting for workers (2 waiting, 0
#> running, 0 finished) [23.7s][K
⠋ Waiting for workers (2 waiting, 0 running, 0
#> finished) [23.9s][K
⠙ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [24.1s][K
⠹ Waiting for workers (2 waiting, 0 running, 0 finished) [24.3s][K
⠸
#> Waiting for workers (2 waiting, 0 running, 0 finished) [24.5s][K
⠼ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [24.7s][K
⠴ Waiting for workers (2
#> waiting, 0 running, 0 finished) [24.9s][K
⠦ Waiting for workers (2 waiting, 0
#> running, 0 finished) [25.1s][K
⠧ Waiting for workers (2 waiting, 0 running, 0
#> finished) [25.3s][K
⠇ Waiting for workers (2 waiting, 0 running, 0 finished)
#> [25.5s][K
⠏ Waiting for workers (2 waiting, 0 running, 0 finished) [25.7s][K
⠋
#> Waiting for workers (2 waiting, 0 running, 0 finished) [25.9s][K
⠙ Waiting for
#> workers (2 waiting, 0 running, 0 finished) [26.1s][K
⠹ Waiting for workers (2
#> waiting, 0 running, 0 finished) [26.3s][K
⠸ Waiting for workers (1 waiting, 0
#> running, 1 finished) [26.5s][K
[K
```
now submit a task that will call this function:
``` r
id <- task_create_expr(
example(16),
parallel = hipercow_parallel(use_rrq = TRUE),
resources = resources)
#> ✔ Submitted task '71e64618d20f71852f7c0097d51799a5' using 'windows'
id
#> [1] "71e64618d20f71852f7c0097d51799a5"
```
Here, `id` is a *hipercow* task identifier. This sends an additional task to the queue, so now we have five things running on the cluster (four workers and one task). The task runs, picks up the controller, then uses it do distribute a (trivial) calculation over the four workers.
``` r
task_wait(id)
#> [1] TRUE
task_result(id)
#> [1] 44.4692
```
You can see from the worker logs here the tasks being split between the workers:
``` r
rrq_worker_log_tail(n = 32)
#> worker_id child time command
#> 1 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_START
#> 2 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_COMPLETE
#> 3 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_START
#> 4 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_COMPLETE
#> 5 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_START
#> 6 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_COMPLETE
#> 7 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_START
#> 8 rrq-7010f758-ea39208bac92 NA 1732816685 TASK_COMPLETE
#> 9 rrq-7010f758-bd7c13c3581f NA 1732816713 ALIVE
#> 10 rrq-7010f758-bd7c13c3581f NA 1732816713 ENVIR
#> 11 rrq-7010f758-bd7c13c3581f NA 1732816713 ENVIR
#> 12 rrq-7010f758-bd7c13c3581f NA 1732816713 QUEUE
#> 13 rrq-7010f758-917984e8738f NA 1732816713 ALIVE
#> 14 rrq-7010f758-917984e8738f NA 1732816713 ENVIR
#> 15 rrq-7010f758-917984e8738f NA 1732816713 ENVIR
#> 16 rrq-7010f758-917984e8738f NA 1732816713 QUEUE
#> 17 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 18 rrq-7010f758-917984e8738f NA 1732816744 TASK_START
#> 19 rrq-7010f758-bd7c13c3581f NA 1732816744 TASK_START
#> 20 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 21 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 22 rrq-7010f758-bd7c13c3581f NA 1732816744 TASK_COMPLETE
#> 23 rrq-7010f758-917984e8738f NA 1732816744 TASK_COMPLETE
#> 24 rrq-7010f758-bd7c13c3581f NA 1732816744 TASK_START
#> 25 rrq-7010f758-917984e8738f NA 1732816744 TASK_START
#> 26 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 27 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 28 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 29 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 30 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 31 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 32 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 33 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 34 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 35 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 36 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 37 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 38 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 39 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 40 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 41 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 42 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 43 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 44 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 45 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_START
#> 46 rrq-7010f758-ea39208bac92 NA 1732816744 TASK_COMPLETE
#> 47 rrq-7010f758-bd7c13c3581f NA 1732816744 TASK_COMPLETE
#> 48 rrq-7010f758-917984e8738f NA 1732816744 TASK_COMPLETE
#> message
#> 1 fac7edb4aba7afb25a5f5fb89dd9953e
#> 2 fac7edb4aba7afb25a5f5fb89dd9953e
#> 3 1498b3eea50b6a6e6e3d77af58bd8221
#> 4 1498b3eea50b6a6e6e3d77af58bd8221
#> 5 9882b0b7f866bd795c609a0050046127
#> 6 9882b0b7f866bd795c609a0050046127
#> 7 fa7582fb84ac23c84daca75876249cf9
#> 8 fa7582fb84ac23c84daca75876249cf9
#> 9
#> 10 new
#> 11 create
#> 12 default
#> 13
#> 14 new
#> 15 create
#> 16 default
#> 17 da0da47f97109e83a1058b190c9f8332
#> 18 1492f25d1b38f004ff5fd2981afb7cb9
#> 19 70533fd8d88fa4da0381b9f11b54731a
#> 20 da0da47f97109e83a1058b190c9f8332
#> 21 a24702958ddc18f1acb762f4841535e6
#> 22 70533fd8d88fa4da0381b9f11b54731a
#> 23 1492f25d1b38f004ff5fd2981afb7cb9
#> 24 8b74ff24f7db85876ffa7da746711ae6
#> 25 5c366be9197c398bc25afbeef925de4c
#> 26 a24702958ddc18f1acb762f4841535e6
#> 27 166b5210958ad0ca2997b4d32d5e84aa
#> 28 166b5210958ad0ca2997b4d32d5e84aa
#> 29 420cb53445a3e1c04dee89751836eb4d
#> 30 420cb53445a3e1c04dee89751836eb4d
#> 31 34e98c52297f50b9ce5f3bd88f011276
#> 32 34e98c52297f50b9ce5f3bd88f011276
#> 33 db9cb6dd182f537e1bf086854c1fd2ec
#> 34 db9cb6dd182f537e1bf086854c1fd2ec
#> 35 18aa71b506647566a08b83ac4c4efa06
#> 36 18aa71b506647566a08b83ac4c4efa06
#> 37 1876225bb4f5a0b9ed103f87c4589c6e
#> 38 1876225bb4f5a0b9ed103f87c4589c6e
#> 39 d50cdad8eec494e653f6a0c4bb5dc27c
#> 40 d50cdad8eec494e653f6a0c4bb5dc27c
#> 41 c42185da2edacbec230f13c912938759
#> 42 c42185da2edacbec230f13c912938759
#> 43 14f3608cc4663a84576108cf9822421f
#> 44 14f3608cc4663a84576108cf9822421f
#> 45 97ad2df3d8cc470a32879d7992feb617
#> 46 97ad2df3d8cc470a32879d7992feb617
#> 47 8b74ff24f7db85876ffa7da746711ae6
#> 48 5c366be9197c398bc25afbeef925de4c
```
This example is trivial, but you could submit 10 workers each using a 32 core node, and then use a single core task to farm out a series of large simulations across your bank of computers. Or create a 500 single core workers (so ~25% of the cluster) and smash through a huge number of simulations with minimal overhead.
# Tricks and tips
This section will expand as we document patterns that have been useful.
## Controlling the worker environment
The workers will use the `rrq` environment if it exists, failing that the `default` environment. So if you need different packages and sources loaded on the workers on your normal tasks, you can do this by creating a different environment
``` r
hipercow_environment_create("rrq", packages = "cowsay")
#> ✔ Created environment 'rrq'
#> ℹ Refreshing existing rrq worker environments
#> ✔ Using existing rrq queue 'rrq:7010f758'
```
You can submit your workers with any resources and parallel control you want (see `vignettes("parallel")` for details); pass these as `resources` and `parallel` to `hipercow_rrq_workers_submit()`.
# General considerations
## Stopping redundant workers
By default, workers will live for 10 minutes after finishing their last task. This means that most of the time that you use workers you can largely forget about cleanup. If you want be polite and give up these resources early (this would be important if you wanted to launch new workers, or if you were using a large fraction of the cluster), you can tell them to stop after completion of the last job:
``` r
hipercow_rrq_stop_workers_once_idle()
#> ✔ Using existing rrq queue 'rrq:7010f758'
#> ✔ Sent message to 3 workers
#> ℹ Workers will stop 60 seconds after their last task
#> ℹ Current worker status: IDLE (3)
```
## Permanence
You should not treat data in a submitted task as permanent; it is subject for deletion at any point! So your aim should be to pull the data out of rrq as soon as you can. Practically we won't delete data from the database for at least a few days after creation, but we make no guarantees. We'll describe cleanup here later.
We reserve the right to delete things from the Redis database without warning, though we will try and be polite about doing this.
## Object storage
Redis is an *in memory* datastore, which means that all the inputs and outputs from your tasks are stored in memory on the head node. This means that you do need to be careful about what you store as part of your tasks. We will refuse to save any object larger than 100KB once serialised (approximately the size of a file created by `saveRDS()` without using compression).
We encourage you to make use of rrq's task deletion using `rrq::rrq_task_delete()` to delete tasks once you are done with them. You can pass a long vector efficiently into this function.
If you need to save large outputs you will need to write them to a file (e.g., with `saveRDS()` rather than returning them from the function or expression set as the target of the rrq task. If you are submitting a very large number of tasks that take a short and deterministic time to run this can put a lot of load on the file server, so be sure you are using a project share and not a personal share when using the windows cluster (see `vignette("windows")`).