---
title: "hipercow"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{hipercow}
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
---
```{r setup, include = FALSE}
source("common.R")
vignette_root <- new_hipercow_root_path()
fs::file_copy("simulation.R", vignette_root)
fs::file_copy("simulation-bulk.R", vignette_root)
set_vignette_root(vignette_root)
cleanup <- withr::with_dir(
vignette_root,
hipercow::hipercow_example_helper(with_logging = FALSE,
new_directory = FALSE,
initialise = FALSE))
```
Parallel computing on a cluster can be more challenging than running things locally because it's often the first time that you need to package up code to run elsewhere, and when things go wrong it's more difficult to get information on why things failed.
Much of the difficulty of getting things running involves working out what your code depends on, and getting that installed in the right place on a computer that you can't physically poke at. The next set of problems is dealing with the ballooning set of files that end up being created - templates, scripts, output files, etc.
The `hipercow` package aims to remove some of this pain, with the aim that running a task on the cluster should be (almost) as straightforward as running things locally, at least once some basic setup is done.
At the moment, this document assumes that we will be using the "Windows" cluster, which implies the existence of some future non-Windows cluster. Stay tuned.
This manual is structured in escalating complexity, following the chain of things that a hypothetical user might encounter as they move from their first steps on the cluster through to running enormous batches of tasks.
# Installing prerequisites
Install the required packages from our "r-universe". Be sure to run this in a fresh session.
```r
install.packages(
"hipercow",
repos = c("https://mrc-ide.r-universe.dev", "https://cloud.r-project.org"))
```
Once installed you can load the package with
```{r}
library(hipercow)
```
or use the package by prefixing the calls below with `hipercow::`, as you prefer.
Follow any platform-specific instructions in `vignettes("")`; this will depend on the cluster you intend to use:
* Windows: `vignette("windows")`
# Filesystems and paths
We need a concept of a "root"; the point in the filesystem we can think of everything relative to. This will feel familiar to you if you have used git or orderly, as these all have a root (and this root will be a fine place to put your cluster work). Typically all paths will be *within* this root directory, and paths above it, or absolute paths in general, effectively cease to exist. If your project works this way then it's easy to move around, which is exactly what we need to do in order to run it on the cluster.
If you are using RStudio, then we strongly recommend using an [RStudio project](https://support.posit.co/hc/en-us/articles/200526207-Using-RStudio-Projects).
# Initialising
Run
```{r}
hipercow_init()
```
which will write things to a new path `hipercow/` within your working directory.
After initialisation you will typically want to configure a "driver", which controls how tasks are sent to clusters. At the moment the only option is the DIDE windows cluster so for practical work you would write:
```r
hipercow_configure(driver = "dide-windows")
```
however, for this vignette we will use a special "example" driver which simulates what the cluster will do (don't use this for anything yourself, it really won't help):
```{r}
hipercow_configure(driver = "example")
```
You can run initialisation and configuration in one step by running
```r
hipercow_init(driver = "dide-windows")
```
After initialisation and configuration you can see the computed configuration by running `hipercow_configuration()`:
```{r}
hipercow_configuration()
```
Here, you can see versions of important packages, information about where you are working, and information about how you intend to interact with the cluster. See `vignette("windows")` for example output you might expect on the Windows cluster, which includes information about mapping of your paths onto those of the cluster, the version of R you will use, and other information.
If you have issues with `hipercow` we will always want to see the output of `hipercow_configuration()`.
# Running your first task
The first time you use the tools (ever, in a while, or on a new machine) we recommend sending off a tiny task to make sure that everything is working as expected:
```{r create}
id <- task_create_expr(sessionInfo())
```
This creates a new task that will run the expression `sessionInfo()` on the cluster. The `task_create_expr()` function works by so-called ["non standard evaluation"](https://adv-r.hadley.nz/metaprogramming.html) and the expression is not evaluated from your R session, but sent to run on another machine.
The `id` returned is just an ugly hex string:
```{r}
id
```
```{r, include = FALSE}
# Ensure that the task is finished, makes the rest of the doc nicer to read
task_wait(id)
```
Many other functions accept this `id` as an argument. You can get the status of the task, which will have finished now because it really does not take very long:
```{r status}
task_status(id)
```
Once the task has completed you can inspect the result:
```{r result}
task_result(id)
```
Because we are using the "example" driver here, this is the same as the result that you'd get running `sessionInfo()` directly, just with more steps. See `vignette("windows")` for an example that runs on Windows.
# Using functions you have written
It's unlikely that the code you want to run on the cluster is one of the functions built into R itself; more likely you have written a simulation or similar and you want to run *that* instead. In order to do this, we need to tell the cluster where to find your code. There are two broad places where code that you want to run is likely to be found **script files** and **packages**; we start with the former here, and deal with packages in much more detail in `vignette("packages")`.
Suppose you have a file `simulation.R` containing some simulation:
```{r, results = "asis", echo = FALSE}
r_output(readLines("simulation.R"))
```
We can't run this on the cluster immediately, because the cluster does not know about the new function:
```{r create_walk1}
id <- task_create_expr(random_walk(0, 10))
task_wait(id)
task_status(id)
task_result(id)
```
(See `vignette("troubleshooting")` for more on failures.)
We need to tell hipercow to `source()` the file `simulation.R` before running the task. To do this we use `hipercow_environment_create()` to create an "environment" (not to be confused with R's environments) in which to run things:
```{r create_environment}
hipercow_environment_create(sources = "simulation.R")
```
Now we can run our simulation:
```{r create_walk2}
id <- task_create_expr(random_walk(0, 10))
task_wait(id)
task_result(id)
```
* You can have multiple environments and each task can be set to run in a different environment
* Each environment can source any number of source files, and load any number of packages
* This will become the mechanism by which environments on parallel workers (via `parallel`, `future` or `rrq`) will set up their environments
Read more about environments in `vignette("environments")`
# Getting information about tasks
Once you have created (and submitted) tasks, they will be queued by the cluster and eventually run. The hope is that we surface enough information to make it easy for you to see how things are going and what has gone wrong.
## Fetching information with `task_info()`
The primary function for fetching information about a task is `task_info()`:
```{r}
task_info(id)
```
This prints out core information about the task; its identifier (`r inline(id)`) and status (`success`), along with information about what sort of task it was, what expression it had, variables it used, the environment it executed in and the time that key events happened for the task (when it was created, started and finished).
This display is meant to be friendly; if you need to compute on this information, you can access the times by reading the `$times` element of the `task_info()` return value:
```{r}
task_info(id)$times
```
Likewise, the information about the task itself is within `$data`. To work with the underling data you might just unclass the object to see the structure:
```{r}
unclass(task_info(id))
```
but note that the exact structure is subject to (infrequent) change.
## Fetching logs with `task_log_show`
Every task will produce some logs, and these can be an important part of understanding what they did and why they went wrong.
You can view the log with `task_log_show()`
```{r}
task_log_show(id)
```
This prints the contents of the logs to the screen; you can access the values directly with `task_log_value(id)`. The format of the logs will be generally the same for all tasks; after the header saying where we are running, some information about the task will be printed (its identifier, the time, details about the task itself), then any logs that come from calls to `message()` and `print()` within the queued function (within the "task logs" section; here that is empty because our task prints nothing). Finally, a summary will be printed with the final status, final time (and elapsed time), then any warnings that were produced will be flushed (see `vignette("troubleshooting")` for more on warnings).
There is a second log too, the "outer" log, which is generally less interesting so it is not the default. These logs come from the cluster scheduler itself and show the startup process that leads up to (and after) the code that hipercow itself runs. It will differ from driver to driver. In addition, this log may not be available forever; the windows cluster retains it only for a couple of weeks:
```{r}
task_log_show(id, outer = TRUE)
```
The logs returned by `task_log_show(id, outer = FALSE)` are the logs generated by the statement containing `Rscript -e`.
## Watching logs with `task_log_watch`
If your task is still running, you can stream logs to your computer using `task_log_watch()`; this will print new logs line-by-line as they arrive (with a delay of up to 1s by default). This can be useful while debugging something to give the illusion that you're running it locally.
Using `Ctrl-C` (or `ESC` in RStudio) to escape will only stop log streaming and not the underlying task.
# Running many tasks at once
Running one task on the cluster is nice, because it takes the load off your laptop, but it's generally not why you're going through this process. More likely, you have **many, similar, tasks** that you want to set running at once. You might be:
* Fitting a model to a series of countries
* Exploring uncertainty in a parameter
* Running a series of stochastic processes
In all these cases, you would want to submit a group of **related** tasks, sharing a common function, but differing in the data passed into that function. We call this the "bulk interface", and it is the simplest and usually most effective way of getting started with parallel computing.
This sort of problem is referred to as ["embarrassingly parallel"](https://en.wikipedia.org/wiki/Embarrassingly_parallel); this is not a pejorative, it just means that your work decomposes into a bunch of independent chunks and all we have to do is start them. You are already familiar with things that can be run this way: anything that can be run with R's `lapply` could be parallelised.
There are two similar bulk creation functions, which differ based on the way the data you have are structured:
* `task_create_bulk_call` is used where you have a `list`, where each element represents the key input to some computation (this is similar to `lapply()`)
* `task_create_bulk_expr` is used where you have a `data.frame`, where each **row** represents the inputs to some computation (this is a little similar to `dplyr`'s [`rowwise`](https://dplyr.tidyverse.org/reference/rowwise.html) support)
## Bulk call, or "parallel map"
The bulk call interface is the one that might feel most familiar to you; it is modelled on ideas from functions like `lapply` (or, if you use `purrr`, its `map()` function). The idea is simple, we have a list of data and we apply some function to each element within it.
We'll start with a reminder of how `lapply` works, then adapt this to run in parallel on a cluster. Imagine that we want to run some simple simulation with a different parameter. In this example we simulate `n` samples from a normal distribution and compute the observed mean and variance:
```{r, results = "asis", echo = FALSE}
r_output(readLines("simulation-bulk.R"))
```
We can run locally it like this:
```{r}
source("simulation-bulk.R")
mysim(0, 1)
```
Suppose that we have a vector of means to run this with:
```{r}
mu <- c(0, 1, 2, 3, 4)
```
We can apply `mysim` to each of the elements of `mu` by writing:
```{r}
lapply(mu, mysim, sd = 1)
```
Of note here:
* Only the `mu` argument was iterated over
* We provided a `sd` argument that was passed through to every call to `mysim`
* We get back a list in return, the same length as `mu`, with each element the result of applying `mysim` to that element.
Nothing in the above said anything about the *order* in which these calculations were carried out; one might assume that we applied `myfun` to the first element of `mu` at once, then the second, but that is just conjecture. This last point seems a bit silly, but is a useful condition to think about when considering what can be parallelised; if you can run a "loop" backwards and get the same answer (ignoring things like the specific draws from random number generating functions) then your problem is well suited to being parallelised.
Our function `mysim` is in a file called `simulation-bulk.R`, which we'll add to our environment so that it's available on the cluster (alongside `random_walk` from above):
```{r}
hipercow_environment_create(sources = c("simulation.R", "simulation-bulk.R"))
```
We can then submit tasks to the cluster using `task_create_bulk_call`:
```{r}
bundle <- task_create_bulk_call(mysim, mu, args = list(sd = 1))
bundle
```
This creates a **task bundle** which groups together related tasks. There is a whole set of functions for working with bundles that behave similarly to the task query functions. So where `task_status()` retrieves the status for a single task, we can get the status for a bundle by running
```{r}
hipercow_bundle_status(bundle)
```
which returns a vector over all tasks included in the bundle. You can also "reduce" this status to the "worst" status over all tasks:
```{r}
hipercow_bundle_status(bundle, reduce = TRUE)
```
Similarly, you can wait for a whole bundle to complete
```{r}
hipercow_bundle_wait(bundle)
```
And then get the results as a list
```{r}
hipercow_bundle_result(bundle)
```
This flow (`create`, `wait`, `result`) is equivalent to `lapply` and produces data of the same shape in return, but the tasks will be carried out in parallel! Each task is submitted to the cluster and picked up by the first available node. You might submit 100 tasks and if the cluster is quiet, a few seconds later all of them will be running at the same time.
We might want to vary *both* `mu` and `sd`, in which case it might be convenient to keep track of our inputs in a `data.frame`:
```{r}
pars <- data.frame(mu = mu, sd = sqrt(mu + 1))
```
We can use `task_create_bundle_call` with this, too:
```{r}
b <- task_create_bulk_call(mysim, pars)
hipercow_bundle_wait(b)
hipercow_bundle_result(b)
```
This iterates over the data in a **row-wise** way. Note that this is very different to `lapply` which would iterate over **columns** (in practice we find that this is almost never what people want). The names of the columns must match the names of your function arguments and all columns must be used.
We could have passed additional arguments here too, for example changing `n`:
```{r}
b <- task_create_bulk_call(mysim, pars, args = list(n = 40))
```
## Bulk expression
We also support a bulk expression interface, which can be clearer than the above.
```{r}
b <- task_create_bulk_expr(mysim(mu, sd, n = 40), pars)
```
This would again work row-wise over `pars` but evaluate the expression in the first argument with the data found in the `data.frame`. This would allow you to use different column names if convenient:
```{r}
pars <- data.frame(mean = mu, stddev = sqrt(mu + 1))
b <- task_create_bulk_expr(mysim(mean, stddev, n = 40), pars)
```
```{r, include = FALSE}
# Flush any running tasks:
hipercow_bundle_wait(b)
```
## More on bundles
You can do most things to bundles that you can do to tasks:
Action | Single task | Bundle
-------+--------------+-------
Get result | `task_result` | `hipercow_bundle_result`
Wait for completion | `task_wait` | `hipercow_bundle_wait`
Retry failed tasks (see below) | `task_retry` | `hipercow_bundle_retry`
List | `task_list` | `hipercow_bundle_list`
Cancel | `task_cancel`|`hipercow_bundle_cancel`
Get log value | `task_log_value` | `hipercow_bundle_log_value`
There is no equivalent of `task_log_watch` or `task_log_show` because we can't easily do this for multiple tasks at the same time in a satisfactory way.
`hipercow_bundle_delete` will delete bundles, but leave tasks alone. `hipercow_purge` will delete tasks, causing actual deletion of data.
Some of these functions naturally have slightly different semantics to the single-task function; for example, `hipercow_bundle_result()` returns a list of results and `hipewcow_bundle_wait` has an option `fail_early` to control if it shold return `FALSE` as soon as any task fails.
## Picking bundles back up again later
You can use the `hipercow_bundle_list()` function to list known bundles:
```{r}
hipercow_bundle_list()
```
Each bundle has a name (automatically generated by default) and the time that it was created. If you have launched a bundle and for some reason lost your session (e.g., windows update has rebooted your computer) you can use this to get your ids back.
```{r}
name <- hipercow_bundle_list()$name[[1]]
bundle <- hipercow_bundle_load(name)
```
If you're not sure what you launched, you can use `task_info`:
```{r}
task_info(bundle$ids[[1]])
```
You can make this process a bit more friendly by setting your own name into the bundle when creating it using the `bundle_name` argument:
```{r}
pars <- data.frame(mean = mu, stddev = sqrt(mu + 1))
b <- task_create_bulk_expr(mysim(mean, stddev, n = 40), pars,
bundle_name = "final_runs_v2")
```
## Making bundles from tasks
You can also make a bundle yourself from a group of tasks; this may be convenient if you need to launch a number of tasks individually for some reason but want to then consider them together as a group.
```{r}
id1 <- task_create_expr(mysim(1, 2))
id2 <- task_create_expr(mysim(2, 2))
b <- hipercow_bundle_create(c(id1, id2), "my_new_bundle")
b
```
We can then use this bundle as above:
```{r}
hipercow_bundle_status(b)
hipercow_bundle_wait(b)
```
# Parallel tasks
So far, the tasks we submitted have been run using a single core on the cluster, with no special other requests made. Here is a simple example using two cores; we'll use `hipercow_resources()` to specify we want two cores on the cluster, and `hipercow_parallel()` to say that we want to set up two processes on those cores, using the `parallel` package. (We also support [`future`](https://future.futureverse.org/)).
```{r parallel_task}
resources <- hipercow_resources(cores = 2)
id <- task_create_expr(
parallel::clusterApply(NULL, 1:2, function(x) Sys.sleep(5)),
parallel = hipercow_parallel("parallel"),
resources = resources)
task_wait(id)
task_info(id)
```
Both of our parallel tasks are to sleep for 5 seconds. We use `task_info()` to report how long it took for those two runs to execute; if they ran one-by-one, we'd expect around 10 seconds, but we are seeing a much shorter time than that, so our pair of processes are running at the same time.
For details on specifying resources and launching different kinds of parallel tasks, see `vignette("parallel")`.
# Understanding where variables come from
Suppose our simulation started not from 0, but from some point that we have computed locally (say `x`, imaginatively)
```{r}
x <- 100
```
You can use this value to start the simulation by running:
```{r create_walk3}
id <- task_create_expr(random_walk(x, 10))
```
Here the `x` value has come from the environment where the expression passed into `task_create_expr()` was found (specifically, we use the [`rlang` "tidy evaluation"](https://rlang.r-lib.org/reference/topic-defuse.html) framework you might be familiar with from `dplyr` and friends).
```{r result3}
task_wait(id)
task_result(id)
```
If you pass in an expression that references a value that does not exist locally, you will get a (hopefully) informative error message when the task is created:
```{r, error = TRUE, eval = FALSE}
id <- task_create_expr(random_walk(starting_point, 10))
#> Error in `rlang::env_get_list()`:
#> ! Can't find `starting_point` in environment.
```
# Cancelling tasks
You can cancel a task if it has been submitted and not completed, using `task_cancel()`:
For example, here's a task that will sleep for 10 seconds, which we submit to the cluster:
```{r}
id <- task_create_expr(Sys.sleep(10))
```
Having decided that this is a silly idea, we can try and cancel it:
```{r}
task_cancel(id)
task_status(id)
task_info(id)
```
You can cancel a task that is submitted (waiting to be picked up by a cluster) or running (though not all drivers will support this; we need to add this to the example driver still, which will improve this example!).
You can cancel many tasks at once by passing a vector of identifiers at the same time. Tasks that have finished (successfully or not) cannot be cancelled.
# Retrying tasks
There are lots of reasons why you might want to retry a task. For example:
* it failed but you think it might work next time
* you updated a package that it used, and want to try again with the new version
* you don't like the output from some stochastic function and want to generate new output
* you cancelled the task but want to try again now
You can retry tasks with `task_retry()`, which is easier than submitting a new task with the same content, and also preserves a link between retried tasks.
Our random walk will give slightly different results each time we use it, so we demonstrate the idea with that:
```{r}
id1 <- task_create_expr(random_walk(0, 10))
task_wait(id1)
task_result(id1)
```
Here we ran a random walk and it got to `r tail(task_result(id1), 1)`, which is clearly not what we were expecting. Let's try it again:
```{r}
id2 <- task_retry(id1)
```
Running `task_retry()` creates a *new* task, with a new id `r abbrev_id(id2)` compared with `r abbrev_id(id1)`.
Once this task has finished, we get a different result:
```{r}
task_wait(id2)
task_result(id2)
```
Much better!
We get a hint that this is a retried task from the `task_info()`
```{r}
task_info(id2)
```
You can see the full chain of retries here:
```{r}
task_info(id2)$retry_chain
```
Once a task has been retried it affects how you interact with the previous ids; by default they follow through to the most recent element in the chain:
```{r}
task_result(id1)
task_result(id2)
```
You can get the original result back by passing the argument `follow = FALSE`:
```{r}
task_result(id1, follow = FALSE)
task_result(id2)
```
Only tasks that have been completed (`success`, `failure` or `cancelled`) can be retried, and doing so adds a new task to the *end* of the chain; there is no branching. Retrying the `id1` here would create the chain `id1 -> id2 -> id3`, and following would select `id3` for any of the three tasks in the chain.
You cannot currently change any property of a retried task, we may change this in future.
```{r, include = FALSE}
cleanup()
```