--- title: "Workers" author: "Rich FitzJohn" date: "2021-08-17" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Workers} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- # Running heaps of jobs without annoying your colleagues If you have thousands and thousands of jobs to submit at once you may not want to flood the cluster with them all at once. Each job submission is relatively slow (the HPC tools that the web interface has to use are relatively slow). The actual queue that the cluster uses doesn't seem to like processing tens of thousands of job, and can slow down. And if you take up the whole cluster someone may come and knock on your office and complain at you. At the same time, batching your jobs up into little bits and manually sending them off is a pain and work better done by a computer. An alternative is to submit a set of "workers" to the cluster, and then submit jobs to them. This is done with the [`rrq`](https://github.com/mrc-ide/rrq) package, along with a [`redis`](https://redis.io) server running on the cluster. ## Overview ## Getting started To get started you will need to install the `rrq` package locally. ```r drat::add("mrc-ide") install.packages("rrq") ``` Then construct the context as before ```r root <- "contexts" ctx <- context::context_save(root, sources = "mysources.R") #> [ open:db ] rds ``` There are two ways we can proceed from here; the first - "workers" - is very similar to the non-worker workflow and is described first. The second - "rrq" - is a bit more involved and is described second. ## Workers Then configure and create the queue; the `use_workers` argument is important here as it: * ensures that the `rrq` package is available on the cluster, where your workers will run * changes the behaviour of the `$enqueue` method so that jobs are not sent to the HPC scheduler but to the `rrq` scheduler * enables the `$submit_workers` method which you will use to create workers on the cluster However, everything else will appear the same. ```r config <- didehpc::didehpc_config(use_workers = TRUE) obj <- didehpc::queue_didehpc(ctx, config = config) #> Loading context 02a9261fe4e9e6554d76936ecb35cef0 #> [ context ] 02a9261fe4e9e6554d76936ecb35cef0 #> [ library ] #> [ namespace ] #> [ source ] mysources.R #> Running installation script on cluster #> ,:\ /:. #> // \_()_/ \\ #> || | | || CONAN THE LIBRARIAN #> || | | || Library: Q:\didehpc\20210817-145020\contexts\lib\windows\4.0 #> || |____| || Bootstrap: T:\conan\bootstrap\4.0 #> \\ / || \ // Cache: Q:\didehpc\20210817-145020\contexts\conan\cache/pkg #> `:/ || \;' Policy: lazy #> || Repos: #> || * https://mrc-ide.github.io/didehpc-pkgs #> XX * https://cloud.r-project.org #> XX Packages: #> XX * rrq #> XX * callr #> OO #> `' #> i Loading metadata database #> v Loading metadata database ... done #> i Getting 8 pkgs (2.79 MB), 1 cached #> v Got rrq 0.4.4 (source) (118.54 kB) #> v Got prettyunits 1.1.1 (windows) (37.72 kB) #> v Got progress 1.2.2 (windows) (85.86 kB) #> v Got docopt 0.7.1 (windows) (245.69 kB) #> v Got hms 1.1.0 (windows) (104.22 kB) #> v Got redux 1.1.0 (windows) (287.97 kB) #> v Got callr 3.7.0 (windows) (439.25 kB) #> v Got processx 3.5.2 (windows) (1.25 MB) #> v Got ps 1.6.0 (windows) (775.40 kB) #> v Installed docopt 0.7.1 (719ms) #> v Installed hms 1.1.0 (829ms) #> v Installed progress 1.2.2 (907ms) #> v Installed prettyunits 1.1.1 (1s) #> v Installed callr 3.7.0 (1.1s) #> v Installed redux 1.1.0 (1.3s) #> i Building rrq 0.4.4 #> v Installed processx 3.5.2 (6.7s) #> v Installed ps 1.6.0 (6.8s) #> v Built rrq 0.4.4 (8s) #> v Installed rrq 0.4.4 (438ms) #> v Summary: 9 new 15 kept in 27.8s #> Done! ``` You can now submit ```r t <- obj$enqueue(random_walk(0, 10)) ``` This job will stay pending forever as the HPC scheduler will never run it ```r t$status() #> [1] "PENDING" t$times() #> task_id submitted started finished #> 1 cb4d883580667f393a2945a05688620a 2021-08-17 14:53:51 #> waiting running idle #> 1 0.2062078 NA NA ``` You must submit actual workers in order to actually run things. This could have been done before submitting the tasks, though workers will time out after 10 minutes of inactivity, if you have very many jobs to save your workers might exit before the work starts! The argument is the number of workers to submit. Each worker is equivalent to a job that your configuration would otherwise create (in terms of cores selected). ```r workers <- obj$submit_workers(2) #> Submitting 2 workers with base name 'epicurean_xiaosaurus' #> submitting (-) [===================================] 100% | giving up in 600 s workers #> [1] "epicurean_xiaosaurus_1" "epicurean_xiaosaurus_2" ``` All workers get names in the form `__` so that you can remember which workers you set off. They will turn off after 10 minutes of inactivity by default (you can tweak this with the `worker_timeout` argument to `didehpc_config` or by sending a `TIMEOUT_SET` message). One advantage over the usual queuing approach here is that you will not wait for anyone else's jobs to complete once you have reserved your workers. ```r t$wait(10) #> (-) waiting for cb4d883...20a, giving up in 9.5 s #> [1] 0.92566336 -0.58321241 0.54753392 0.72564391 0.51248705 -0.22445805 #> [7] 0.03966805 -0.25494044 -0.16625861 -1.97337247 ``` We're going to interact with the rrq object a bit ```r rrq <- obj$rrq_controller() rrq #> #> Public: #> bulk_wait: function (x, timeout = Inf, time_poll = 1, progress = NULL, delete = TRUE) #> con: redis_api, R6 #> deferred_list: function () #> destroy: function (delete = TRUE, worker_stop_type = "message", worker_stop_timeout = 0) #> enqueue: function (expr, envir = parent.frame(), queue = NULL, separate_process = FALSE, #> enqueue_: function (expr, envir = parent.frame(), queue = NULL, separate_process = FALSE, #> enqueue_bulk: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL, #> enqueue_bulk_: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL, #> envir: function (create, notify = TRUE) #> initialize: function (queue_id, con = redux::hiredis()) #> keys: list #> lapply: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL, #> lapply_: function (x, fun, ..., dots = NULL, envir = parent.frame(), queue = NULL, #> message_get_response: function (message_id, worker_ids = NULL, named = TRUE, delete = FALSE, #> message_has_response: function (message_id, worker_ids = NULL, named = TRUE) #> message_response_ids: function (worker_id) #> message_send: function (command, args = NULL, worker_ids = NULL) #> message_send_and_wait: function (command, args = NULL, worker_ids = NULL, named = TRUE, #> queue_id: 02a9261fe4e9e6554d76936ecb35cef0 #> queue_length: function (queue = NULL) #> queue_list: function (queue = NULL) #> queue_remove: function (task_ids, queue = NULL) #> task_cancel: function (task_id, wait = TRUE, delete = TRUE) #> task_data: function (task_id) #> task_delete: function (task_ids, check = TRUE) #> task_exists: function (task_ids = NULL) #> task_list: function () #> task_overview: function (task_ids = NULL) #> task_position: function (task_ids, missing = 0L, queue = NULL) #> task_preceeding: function (task_id, queue = NULL) #> task_progress: function (task_id) #> task_result: function (task_id) #> task_status: function (task_ids = NULL) #> task_wait: function (task_id, timeout = Inf, time_poll = 1, progress = NULL) #> tasks_result: function (task_ids) #> tasks_wait: function (task_ids, timeout = Inf, time_poll = 1, progress = NULL) #> worker_config_list: function () #> worker_config_read: function (name) #> worker_config_save: function (name, time_poll = NULL, timeout = NULL, queue = NULL, #> worker_delete_exited: function (worker_ids = NULL) #> worker_detect_exited: function () #> worker_info: function (worker_ids = NULL) #> worker_len: function () #> worker_list: function () #> worker_list_exited: function () #> worker_load: function (worker_ids = NULL) #> worker_log_tail: function (worker_ids = NULL, n = 1) #> worker_process_log: function (worker_id) #> worker_status: function (worker_ids = NULL) #> worker_stop: function (worker_ids = NULL, type = "message", timeout = 0, time_poll = 0.05, #> worker_task_id: function (worker_ids = NULL) #> Private: #> scripts: list #> store: object_store, R6 ``` This is another R6 object, though this one at least has decent documentation - see the `rrq::rrq_controller` for details of each method You can see what your workers have been up to with the `workers_log_tail` command: ```r rrq$worker_log_tail(n = Inf) #> worker_id time command #> 1 epicurean_xiaosaurus_1 1629208433 ALIVE #> 2 epicurean_xiaosaurus_1 1629208433 TASK_START #> 3 epicurean_xiaosaurus_2 1629208434 ALIVE #> 4 epicurean_xiaosaurus_1 1629208434 TASK_COMPLETE #> message #> 1 #> 2 977ba6cb22c03767d8bb2386b0c2b271 #> 3 #> 4 977ba6cb22c03767d8bb2386b0c2b271 ``` The `time` column represents seconds - relative seconds should still be useful here. As before, logging works on a per-task basis: ```r t$log() #> [ open:db ] rds #> [ context ] 02a9261fe4e9e6554d76936ecb35cef0 #> [ library ] #> [ namespace ] #> [ source ] mysources.R #> [ root ] contexts #> [ context ] 02a9261fe4e9e6554d76936ecb35cef0 #> [ task ] cb4d883580667f393a2945a05688620a #> [ expr ] random_walk(0, 10) #> [ start ] 2021-08-17 14:53:54.075 #> [ ok ] #> [ end ] 2021-08-17 14:53:54.231 ``` Find out how long your workers will persist for: ```r rrq$message_send_and_wait("TIMEOUT_GET", worker_ids = workers) #> $epicurean_xiaosaurus_1 #> timeout remaining #> 600 600 #> #> $epicurean_xiaosaurus_2 #> timeout remaining #> 600.0000 599.4219 ``` Other than that, hopefully everything else continues as normal. We can submit a bunch of jobs and run them using `$lapply`: ```r sizes <- 3:8 grp <- obj$lapply(sizes, random_walk, x = 0) #> Creating bundle: 'robust_krill' #> [ bulk ] Creating 6 tasks #> submitting 6 tasks ``` Task status: ```r grp$status() #> e505c76477549a36f04625d810b494e1 8f2525389b6ba6ca19034567599a42be #> "PENDING" "PENDING" #> 579c132c388fb39f88992691a9ff16a1 bacdb4853335a3bc3ff9008d3bc09a93 #> "PENDING" "PENDING" #> 5aa65c5b6b3b41b5afdf471a2e75886b 001a83fce62afb5eb9c49eda46d4c3cf #> "PENDING" "PENDING" ``` Collect the results: ```r res <- grp$wait(5) #> (-) [=======================>------------------------] 50% | giving up in 4 s #> (\) [=======================>------------------------] 50% | giving up in 4 s #> (|) [================================================] 100% | giving up in 3 s res #> [[1]] #> [1] -0.7841728 -1.9470967 -2.1056223 #> #> [[2]] #> [1] -0.4309173 -1.6581243 -0.4737780 0.4492074 #> #> [[3]] #> [1] -1.4924993 -2.0190762 -0.6046004 -1.3086768 -1.4110236 #> #> [[4]] #> [1] -1.184490257 -0.196637540 -0.191229471 -0.004466775 0.483852828 #> [6] 1.066925267 #> #> [[5]] #> [1] 0.6912883 -0.6199649 -1.4499917 -2.5812726 -1.4212357 -1.4081750 -2.2605613 #> #> [[6]] #> [1] -0.54840267 0.05881601 0.71391542 1.40774062 1.85894605 3.56734897 #> [7] 4.74281970 5.10295235 ``` While workers will turn off automatically, it's polite to turn them off as soon as you're done using `obj$stop_workers()` Alternatively, after submitting a bunch of jobs you can run ```r rrq$message_send("TIMEOUT_SET", 0) ``` which will mean that the workers will stop immediately after not receiving a task (so after they finish processing all your jobs they'll stop one by one). Practically this still takes one minute because that's the polling timeout time (I may be able to improve this later). ```r obj$stop_workers() Sys.sleep(1) rrq$worker_log_tail(workers, n = Inf) #> worker_id time command #> 1 epicurean_xiaosaurus_1 1629208433 ALIVE #> 2 epicurean_xiaosaurus_1 1629208433 TASK_START #> 3 epicurean_xiaosaurus_2 1629208434 ALIVE #> 4 epicurean_xiaosaurus_1 1629208434 TASK_COMPLETE #> 5 epicurean_xiaosaurus_1 1629208435 MESSAGE #> 6 epicurean_xiaosaurus_1 1629208435 RESPONSE #> 7 epicurean_xiaosaurus_2 1629208435 MESSAGE #> 8 epicurean_xiaosaurus_2 1629208435 RESPONSE #> 9 epicurean_xiaosaurus_1 1629208435 TASK_START #> 10 epicurean_xiaosaurus_2 1629208435 TASK_START #> 11 epicurean_xiaosaurus_1 1629208436 TASK_COMPLETE #> 12 epicurean_xiaosaurus_1 1629208436 TASK_START #> 13 epicurean_xiaosaurus_2 1629208436 TASK_COMPLETE #> 14 epicurean_xiaosaurus_2 1629208436 TASK_START #> 15 epicurean_xiaosaurus_1 1629208436 TASK_COMPLETE #> 16 epicurean_xiaosaurus_1 1629208436 TASK_START #> 17 epicurean_xiaosaurus_2 1629208436 TASK_COMPLETE #> 18 epicurean_xiaosaurus_2 1629208436 TASK_START #> 19 epicurean_xiaosaurus_1 1629208437 TASK_COMPLETE #> 20 epicurean_xiaosaurus_2 1629208437 TASK_COMPLETE #> 21 epicurean_xiaosaurus_1 1629208437 MESSAGE #> 22 epicurean_xiaosaurus_1 1629208437 RESPONSE #> 23 epicurean_xiaosaurus_1 1629208437 STOP #> 24 epicurean_xiaosaurus_2 1629208437 MESSAGE #> 25 epicurean_xiaosaurus_2 1629208437 RESPONSE #> 26 epicurean_xiaosaurus_2 1629208437 STOP #> message #> 1 #> 2 977ba6cb22c03767d8bb2386b0c2b271 #> 3 #> 4 977ba6cb22c03767d8bb2386b0c2b271 #> 5 TIMEOUT_GET #> 6 TIMEOUT_GET #> 7 TIMEOUT_GET #> 8 TIMEOUT_GET #> 9 96b4a731efe9bf2efb5e3e7e6aef781a #> 10 1df4287014a01c65f807c39cc84e5482 #> 11 96b4a731efe9bf2efb5e3e7e6aef781a #> 12 26c86df13bf2c438bd3d09db57a3627a #> 13 1df4287014a01c65f807c39cc84e5482 #> 14 7cbb27b699ffc184723d884e896a1c30 #> 15 26c86df13bf2c438bd3d09db57a3627a #> 16 e544f4fafc61746df4b5d6b80b95338d #> 17 7cbb27b699ffc184723d884e896a1c30 #> 18 b03fa5fd3f1509e6e98e9707cce0d445 #> 19 e544f4fafc61746df4b5d6b80b95338d #> 20 b03fa5fd3f1509e6e98e9707cce0d445 #> 21 STOP #> 22 STOP #> 23 OK (BYE) #> 24 STOP #> 25 STOP #> 26 OK (BYE) rrq$destroy() ``` ## rrq In this model, we create a very lightweight queue which in turn creates very lightweight tasks. This avoids even more overhead than the approach above, though it can be more difficult to debug because less information is saved. Rather than round-tripping data through the disk, everything goes via the redis server. The first part here looks very similar, except that we use `use_rrq = TRUE` rather than `use_workers` ```r config <- didehpc::didehpc_config(use_rrq = TRUE) obj <- didehpc::queue_didehpc(ctx, config = config) #> Loading context 02a9261fe4e9e6554d76936ecb35cef0 #> [ context ] 02a9261fe4e9e6554d76936ecb35cef0 #> [ library ] #> [ namespace ] #> [ source ] mysources.R ``` We still submit workers ```r workers <- obj$submit_workers(10) #> Submitting 10 workers with base name 'vegetarian_alaskajingle' #> submitting (-) [======>----------------------------] 20% | giving up in 599 s #> submitting (\) [=========>-------------------------] 30% | giving up in 599 s #> submitting (|) [=============>---------------------] 40% | giving up in 598 s #> submitting (/) [=================>-----------------] 50% | giving up in 597 s #> submitting (-) [====================>--------------] 60% | giving up in 597 s #> submitting (\) [=======================>-----------] 70% | giving up in 596 s #> submitting (|) [===========================>-------] 80% | giving up in 596 s #> submitting (/) [===============================>---] 90% | giving up in 596 s #> submitting (-) [===================================] 100% | giving up in 595 s workers #> [1] "vegetarian_alaskajingle_1" "vegetarian_alaskajingle_2" #> [3] "vegetarian_alaskajingle_3" "vegetarian_alaskajingle_4" #> [5] "vegetarian_alaskajingle_5" "vegetarian_alaskajingle_6" #> [7] "vegetarian_alaskajingle_7" "vegetarian_alaskajingle_8" #> [9] "vegetarian_alaskajingle_9" "vegetarian_alaskajingle_10" ``` To send tasks to these workers we directly use the `rrq_controller` object - we'll not use the `queue_didehpc` object from this point. ```r rrq <- obj$rrq_controller() ``` This will look and act a lot like the main didehpc queue controller, but with a few differences. Tasks will come back as plain strings rather than user-friendly objects and `lapply` and `enqueue_bulk` are now blocking operations by default. Most tasks will clean up after they delete rather than leaving a persistent record on disk. The payback for this is potentially very fast task turnarounds and better behaviour with the disk under heavy load. ```r t <- rrq$enqueue(sin(1)) rrq$task_wait(t, 10) #> [1] 0.841471 ``` For example; submitting 50 trivial tasks to our pool of workers and retrieving the results: ```r system.time(res <- rrq$lapply(1:50, sin)) #> user system elapsed #> 0.026 0.000 0.325 ``` or 500 tasks: ```r system.time(res <- rrq$lapply(1:500, sin, progress = FALSE)) #> user system elapsed #> 0.161 0.052 1.476 ``` Across the network, the latency here is ~1/600 s per task. On fi--didemrchnb it will hopefully be a bit faster because of the infiniband network. ```r rrq$worker_stop() rrq$destroy() ``` It is theoretically possible to submit a cluster job that creates an `rrq_controller` and controls the second queue. To do that you need to write function like: ``` get_rrq_controller <- function(x, ...) { queue_id <- Sys.getenv("CONTEXT_ID", "") stopifnot(queue_id != "") rrq::rrq_controller$new((queue_id) } ``` within your sources, then you can use it in place of running (say) a `lapply()` call in your code. This approach allows a relatively simple form of inter-process communication. Talk to Rich if this is something you might have a use for, if you have simulation needs that are larger than a single node. ## Advanced use: different worker and task resources Suppose that we want to submit a job where we have a single process which orchestrates a group of workers each of which takes up an entire node. We used this pattern in the covid response where we wanted to run different MCMC chains on different nodes, each using 32 cores, but we also needed a single "controlling" process to organise collecting results from these nodes. What we want to do do is specify a different set of resources to be used by the workers than by tasks submitted by `$enqueue()`. Note that this only makes sense when using `use_rrq = TRUE`. For example, to submit a controlling process that uses the `GeneralNodes` template and one core (the default) but worker processes that can use 8 cores, we might write ```r config <- didehpc::didehpc_config( use_rrq = TRUE, worker_resource = didehpc::worker_resource(cores = 8)) config #> #> - cluster: fi--dideclusthn #> - credentials: #> - username: rfitzjoh #> - password: ******************* #> - username: rfitzjoh #> - resource: #> - template: GeneralNodes #> - parallel: FALSE #> - count: 1 #> - type: Cores #> - shares: #> - home: (local) /home/rich/net/home => \\fi--san03.dide.ic.ac.uk\homes\rfitzjoh => Q: (remote) #> - temp: (local) /home/rich/net/temp => \\fi--didef3.dide.ic.ac.uk\tmp => T: (remote) #> - use_workers: FALSE #> - use_rrq: TRUE #> - worker_timeout: 600 #> - conan_bootstrap: TRUE #> - r_version: 4.0.3 #> - use_java: FALSE #> - redis_host: fi--dideclusthn.dide.ic.ac.uk #> - worker_resource: #> - template: GeneralNodes #> - parallel: TRUE #> - count: 8 #> - type: Cores ``` See the `worker_resource` section here indicates different resources to the `resource` section. Now, when submitting workers with `obj$submit_workers` each worker will be able to use 8 cores, but a task submitted by `$enqueue()` will only be able to use one. You might then submit a task that uses the `get_rrq_controller` trick above as part of your single core job, which can then farm out work to your workers using the `rrq` queue object. ## Cleaning up Above we use `rrq$destroy()` to clean up every trace of the queue. Periodically we may flush the entire Redis database, or set all keys to expire after a day or so. Do not leave any important information in here please.