--- title: "Fault tolerance" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Fault tolerance} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- In a perfect world, nothing would fail, and this vignette would not be needed. But here we are. When you run tasks within a single process and things go wrong, you know immediately you see it happen. With a queuing system like rrq it's less obvious when something bad has happened because the task is running on another process, possibly on a different machine! * Plain errors are the simplest sort of fault to recover from. Your task throws an error, `rrq` picks this up and marks the task as errored, and moves on to the next thing. You can then see that this error has happened as its status. * If your task crashes the process it can take out the worker (depending on how you have configured rrq; see below). In this case nothing updates the task status, and your task appears to stay running forever. This situation also arises if your process is killed by the operating system (e.g., out of memory) or via some other condition (e.g., an administrator terminating the process). * If your task is running on another machine and that machine becomes unreachable (switched off, disconnects from the network, etc) your task will never appear to finish. This vignette discusses how you can mitigate against and recover from these sorts of issues. We start with the expected errors and build up to considerations for creating a fault tolerant queue. ## Error handling We start with the simplest sort of fault, and can just as easily happen locally as remotely with `rrq`. In this case the handling is fairly well defined and there's not much you need to do. This section is not really "fault tolerance" at all, but simply how rrq handles errors and what you can do about it. Consider this simple function which would fit a linear model between two variables: ```r fit_model <- function(x, y) { lm(y ~ x) } ``` ```r library(rrq) obj <- rrq_controller(paste0("rrq:", ids::random_id(bytes = 4))) rrq_default_controller_set(obj) rrq_worker_envir_set(rrq_envir(sources = "fault.R")) w <- rrq_worker_spawn() #> ℹ Spawning 1 worker with prefix 'loverless_cattle' ``` In the happy case, everything works as expected: ```r x <- runif(5) y <- 2 * x + rnorm(length(x), 0, 0.2) t <- rrq_task_create_expr(fit_model(x, y)) rrq_task_wait(t, 10) #> [1] TRUE rrq_task_status(t) #> [1] "COMPLETE" rrq_task_result(t) #> #> Call: #> lm(formula = y ~ x) #> #> Coefficients: #> (Intercept) x #> -0.05859 2.24645 ``` But if we provide invalid input, the task will error: ```r t <- rrq_task_create_expr(fit_model(x, NULL)) rrq_task_wait(t, 10) #> [1] FALSE rrq_task_result(t) #> #> from: model.frame.default(formula = y ~ x, drop.unused.levels = TRUE) #> error: invalid type (NULL) for variable 'y' #> queue: rrq:8777a78c #> task: 8706f6c5f86f961e2b292e4b86a0594d #> status: ERROR #> * To throw this error, use stop() with it #> * This error has a stack trace, use '$trace' to see it ``` The first thing to note here is that the task does not throw an error when you fetch the result, either via `task_wait` as above, or via `task_result`: ```r r <- rrq_task_result(t) r #> #> from: model.frame.default(formula = y ~ x, drop.unused.levels = TRUE) #> error: invalid type (NULL) for variable 'y' #> queue: rrq:8777a78c #> task: 8706f6c5f86f961e2b292e4b86a0594d #> status: ERROR #> * To throw this error, use stop() with it #> * This error has a stack trace, use '$trace' to see it ``` However, the result will be an object of `rrq_task_error` which you can test for using `inherits`: ```r inherits(r, "rrq_task_error") #> [1] TRUE ``` The default behaviour of `rrq` is not to error when fetching a task as that would require that you use `tryCatch` everywhere where you retrieve tasks that might have failed, and because errors are often interesting themselves. For example, `rrq_task_error` objects include stack traces alongside the error: ```r r$trace #> ▆ #> 1. ├─rlang::try_fetch(...) #> 2. │ ├─base::tryCatch(...) #> 3. │ │ └─base (local) tryCatchList(expr, classes, parentenv, handlers) #> 4. │ │ └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]]) #> 5. │ │ └─base (local) doTryCatch(return(expr), name, parentenv, handler) #> 6. │ └─base::withCallingHandlers(...) #> 7. ├─base::eval(task$expr, envir) #> 8. │ └─base::eval(task$expr, envir) #> 9. │ └─fit_model(x, NULL) #> 10. │ ├─stats::lm(y ~ x) #> 11. │ │ └─base::eval(mf, parent.frame()) #> 12. │ │ └─base::eval(mf, parent.frame()) #> 13. │ ├─stats::model.frame(formula = y ~ x, drop.unused.levels = TRUE) #> 14. │ └─stats::model.frame.default(formula = y ~ x, drop.unused.levels = TRUE) #> 15. └─base::.handleSimpleError(...) #> 16. └─rlang (local) h(simpleError(msg, call)) #> 17. └─handlers[[3L]](cnd) ``` These are `rlang` stack traces, which are somewhat richer than those produced by `traceback()`, containing the same set of stacks but arranged in a tree. See the [documentation there](https://rlang.r-lib.org/reference/trace_back.html) for details. This error object will also include any warnings captured while the task ran. Objects of class `rrq_task_error` inherit from `error` and `condition` so, once thrown, will behave as expected in programs using errors for flow control (e.g., with `tryCatch`); you can throw them yourself with `stop()`: ```r stop(r) #> Error in model.frame.default(formula = y ~ x, drop.unused.levels = TRUE): invalid type (NULL) for variable 'y' ``` You can also change the default behaviour to error on failure by passing `error = TRUE` to `rrq_task_result()` or `rrq_tasks_result()`, which will immediately rethrow the error in your R session (your program could then stop or you could again catch it with `tryCatch`): ```r rrq_task_result(t, error = TRUE) #> Error in model.frame.default(formula = y ~ x, drop.unused.levels = TRUE): invalid type (NULL) for variable 'y' ``` This process is unchanged if the task is run in a separate process (with `separate_process = TRUE` passed to `enqueue`), with the same status and return type, and with the trace information available after failure. ## Increasing resilience via separate processes Running tasks in separate processes (e.g., `rrq_task_create_expr(mytask(), separate_process = TRUE)`) is the simplest way of making things more resilient because this creates a layer of isolation between the worker and the task. If your task crashes R (e.g., a segmentation fault due to a bug in your C/C++ code) or is killed by the operating system then the worker process survives and can update the keys in Redis directly to advertise this fact. This is generally much nicer than when the worker dies and the task status cannot be updated. The downside of using separate processes is that it is much slower; compare the time taken to queue, run an retrieve a trivial task run in the same worker process (look at the `elapsed` entry) ```r system.time( rrq_task_wait(rrq_task_create_expr(identity(1)), 10)) #> user system elapsed #> 0.001 0.001 0.002 ``` with the same task run in a separate process on the worker ```r system.time( rrq_task_wait(rrq_task_create_expr(identity(1), separate_process = TRUE), 10)) #> user system elapsed #> 0.00 0.00 0.27 ``` We expect this difference to be ~100 fold in local workers. Almost all the cost is due to the overhead of starting and terminating a fresh R session. If your queue uses a lot of packages there will be additional overhead here too as these are loaded. However, this cost is fixed and will decrease as a fraction of the total running time as the running time of your task increases. So for long-running tasks the additional safety of separate processes is probably worthwhile. For smaller tasks you may want to make sure you run a heartbeat process so that you can handle failures via that route. Consider running some long running job; this one simply sleeps for an hour ```r t <- rrq_task_create_expr(Sys.sleep(3600), separate_process = TRUE) ``` To simulate the process crashing, we've killed it. Because we have queued this on a separate process we can use `rrq_task_info()` to fetch the process id (PID) of the process that the task is running in (this is different to that of the worker). ```r info <- rrq_task_info(t) tools::pskill(info$pid) ``` Because the task was run in a separate process, our worker could detect that the task has died unexpectedly: ```r rrq_task_status(t) #> [1] "DIED" ``` This is different to the error status in the previous section (that was ERROR). Note that if we had not run the task in a separate process the task status would be unchanged as RUNNING) because nothing could ever update the task status! Retrieving the result has similar behaviour to the error case; we don't throw but instead return an object of class `rrq_task_error` (which also inherits from `error` and `condition`). However, this time there's really not much extra information in the error: ```r r <- rrq_task_result(t) r #> #> error: Task not successful: DIED #> queue: rrq:8777a78c #> task: c845b831c3df187ba772cbe51b5ec8aa #> status: DIED #> * To throw this error, use stop() with it ``` There's also no trace available ```r r$trace #> NULL ``` ## Loss of workers In this section we outline what you can do about unexplained and unreported failures in your task, or loss of workers. This will typically be that your worker has crashed (due to your task crashing it perhaps), killed (e.g., by the operating system or an administrator) or the loss of the machine that it is working on. In order to enable fault tolerance for this sort of issue, you first need to enable a "heartbeat" on the worker processes. This is a second process on each worker that periodically writes to the Redis database on a key that will expire in a time slightly longer than that period, in effect making a [dead man's switch](https://en.wikipedia.org/wiki/Dead_man%27s_switch) - see `rrq::rrq_heartbeat` for details. So if the worker process dies for any reason, then after a while we'll detect that as its key has expired. We can then take some action. To enable the heartbeat, save a worker configuration before starting a worker: ```r rrq_worker_config_save( "localhost", rrq_worker_config(heartbeat_period = 3)) ``` When you spawn a worker it will pick up this configuration, and we'll be able to detect if it has died. ```r w <- rrq_worker_spawn() #> ℹ Spawning 1 worker with prefix 'lovesick_bobolink' ``` In order to simulate the loss of the worker, we will kill it after starting a long-running task: Now, we can queue some long running process, which this worker will start: ```r t <- rrq_task_create_expr(Sys.sleep(3600)) ``` The worker will pick the task up fairly quickly, and the status will change to `RUNNING`: ```r rrq_task_status(t) #> [1] "RUNNING" rrq_worker_status(w$id) #> lovesick_bobolink_1 #> "BUSY" ``` We kill the worker (simulating the job crashing, or the machine turning off, etc): ```r w$kill() ``` Because the worker has been killed, it can't write to redis to tell us that the task can't be completed, so this status will never change: ```r rrq_task_status(t) #> [1] "RUNNING" rrq_worker_status(w$id) #> lovesick_bobolink_1 #> "BUSY" ``` The heartbeat will persist for 3 times the period given above (this multiplier is not configurable and while any number greater than 1 should be OK, we picked this as it allows for occasional network connectivity issues or slowness on the node - we may reduce it in a future version). This means that after 9s (3 * 3s) the key will have expired: ```r Sys.sleep(10) ``` We can then use the `worker_detect_exited()` method to clean up ```r rrq_worker_detect_exited() #> Lost 1 worker: #> - lovesick_bobolink_1 #> Orphaning 1 task: #> - cc995285ca5d639e3bed3ab958a7b241 ``` At this point, the statuses of our task and worker are correct: ```r rrq_task_status(t) #> [1] "DIED" rrq_worker_status(w$id) #> lovesick_bobolink_1 #> "LOST" ``` Fetching the task result provides the same DIED error as above: ```r rrq_task_result(t) #> #> error: Task not successful: DIED #> queue: rrq:8777a78c #> task: cc995285ca5d639e3bed3ab958a7b241 #> status: DIED #> * To throw this error, use stop() with it ``` This is still not terribly useful, as we have not provided any mechanism to automatically requeue a lost task or restart a dead worker, which we cover in the next section. ## Retrying tasks Regardless of how your tasks got to be in a broken situation, the mechanism for getting them out of this situation remains the same; you want to retry the task: * Your task might have errored because of some resource being unavailable or full and now it is back on line * The worker might have died because someone turned off the machine * Your memory errors only happen sometimes and you feel lucky (in this case you should also fix your program!) Alternatively, perhaps your task ran to successful completion but you simply want to rerun it (e.g., some stochastic algorithm that is displaying a non-fatal pathology). You can use `rrq_task_retry` to retry any number of tasks that have *completed* (in one of the terminal states - including `COMPLETE` but also `ERROR`, `CANCELLED` `DIED` or `TIMEOUT`). This is nondestructive to any of the task data, in particular its result, so you will still have access to the failed run and its stack trace (but see below). ### An example where retrying fixes an error In this example, we'll run some "model" that may or may not converge, and we want to retry until it does. This represents some badly behaved task with a stochastic component that will eventually work if tried enough times: ```r stochastic_failure <- function(p) { x <- runif(1) if (x < p) { stop(sprintf("Convergence failure - x is only %0.2f", x)) } x } ``` We can create a new environment for our worker, to pick up this function: ```r rrq_worker_envir_set(rrq_envir(sources = "fault2.R")) w <- rrq::rrq_worker_spawn() #> ℹ Spawning 1 worker with prefix 'polonium_uromastyxspinipes' ``` Now, we enqueue a task as usual, then wait on it ```r t1 <- rrq_task_create_expr(stochastic_failure(0.5)) rrq_task_wait(t1, timeout = 10) #> [1] FALSE rrq_task_result(t1) #> #> from: stochastic_failure(0.5) #> error: Convergence failure - x is only 0.27 #> queue: rrq:8777a78c #> task: 64ddd6f44b196d8f3daa08900b73c86c #> status: ERROR #> * To throw this error, use stop() with it #> * This error has a stack trace, use '$trace' to see it ``` Oh no, this task has failed! We can use the `task_retry` method to requeue this job, meaning that it will run exactly as before. However, the random number stream has moved on and this task will return a different answer - perhaps it will work this time? ```r t2 <- rrq_task_retry(t1) rrq_task_wait(t2, timeout = 10) #> [1] FALSE rrq_task_result(t2) #> #> from: stochastic_failure(0.5) #> error: Convergence failure - x is only 0.37 #> queue: rrq:8777a78c #> task: 486f5531b0e093a00c4d94f81e74a7a0 #> status: ERROR #> * To throw this error, use stop() with it #> * This error has a stack trace, use '$trace' to see it ``` The task has failed again. Note here that `task_retry` has taken a task id and returned a new task id, which we have waited on. However, we could also have used the original id (more on this below). Let's give it one more go: ```r t3 <- rrq_task_retry(t2) rrq_task_wait(t3, timeout = 10) #> [1] TRUE rrq_task_result(t3) #> [1] 0.5728534 ``` Success! The task has run to completion and we no longer have an error. When you retry a task, rrq sets up redirects that point from the old task id to the new one, and in most cases you can use whichever you find more convenient. For example, we can now read the successful task result from any of the task ids: ```r rrq_task_results(c(t1, t2, t3)) #> [[1]] #> [1] 0.5728534 #> #> [[2]] #> [1] 0.5728534 #> #> [[3]] #> [1] 0.5728534 ``` To prevent this, pass `follow = FALSE` to return the original result ```r rrq_task_results(c(t1, t2, t3), follow = FALSE) #> [[1]] #> #> from: stochastic_failure(0.5) #> error: Convergence failure - x is only 0.27 #> queue: rrq:8777a78c #> task: 64ddd6f44b196d8f3daa08900b73c86c #> status: ERROR #> * To throw this error, use stop() with it #> * This error has a stack trace, use '$trace' to see it #> #> [[2]] #> #> from: stochastic_failure(0.5) #> error: Convergence failure - x is only 0.37 #> queue: rrq:8777a78c #> task: 486f5531b0e093a00c4d94f81e74a7a0 #> status: ERROR #> * To throw this error, use stop() with it #> * This error has a stack trace, use '$trace' to see it #> #> [[3]] #> [1] 0.5728534 ``` Here, we can see that the first two times we ran the task they failed, along with the error and any backtraces. The same logic applies to other functions, such as `rrq_task_status()` ```r rrq_task_status(c(t1, t2, t3)) #> [1] "COMPLETE" "COMPLETE" "COMPLETE" rrq_task_status(c(t1, t2, t3), follow = FALSE) #> [1] "MOVED" "MOVED" "COMPLETE" ``` Note that the task status above has overwritten the original status (previously it was ERROR, but now it is MOVED). However, you can always read the original status in the error object's status field, should you need it. You can also extract times that events happened with `rrq_task_times()` ```r rrq_task_times(c(t1, t2, t3)) #> submit start complete moved #> 64ddd6f44b196d8f3daa08900b73c86c 1721033848 1721033848 1721033848 NA #> 486f5531b0e093a00c4d94f81e74a7a0 1721033848 1721033848 1721033848 NA #> 1df424045659911ead3673895160f480 1721033848 1721033848 1721033848 NA rrq_task_times(c(t1, t2, t3), follow = FALSE) #> submit start complete moved #> 64ddd6f44b196d8f3daa08900b73c86c 1721033848 1721033848 1721033848 1721033848 #> 486f5531b0e093a00c4d94f81e74a7a0 1721033848 1721033848 1721033848 1721033848 #> 1df424045659911ead3673895160f480 1721033848 1721033848 1721033848 NA ``` This means that most of the time you can ignore that a task has been retried and work with whatever task id you have handy. You can inspect the chain of tasks by looking at the return values `rrq_task_info()`. In the above example we have a chain of three tasks `r backquote(paste0(substr(c(t1, t2, t3), 1, 6), "...", collapse = " -> "))`, which we can discover this way: ```r rrq_task_info(t1) #> $id #> [1] "64ddd6f44b196d8f3daa08900b73c86c" #> #> $status #> [1] "MOVED" #> #> $queue #> [1] "default" #> #> $separate_process #> [1] FALSE #> #> $timeout #> NULL #> #> $worker #> [1] "polonium_uromastyxspinipes_1" #> #> $pid #> NULL #> #> $depends #> $depends$up #> NULL #> #> $depends$down #> NULL #> #> #> $moved #> $moved$up #> NULL #> #> $moved$down #> [1] "486f5531b0e093a00c4d94f81e74a7a0" "1df424045659911ead3673895160f480" ``` Here, the `moved` field shows the tasks that are upstream (`up`) and downstream (`down`) of this task. For tasks that have never been retried both elements are `NULL`. The elements are always ordered from oldest to newest, and chains of tasks are always linear (i.e., there is no forking). For the middle task in the chain, both `up` and `down` point at different tasks: ```r rrq_task_info(t2)$moved #> $up #> [1] "64ddd6f44b196d8f3daa08900b73c86c" #> #> $down #> [1] "1df424045659911ead3673895160f480" ``` and for the leaf task: ```r rrq_task_info(t3)$moved #> $up #> [1] "64ddd6f44b196d8f3daa08900b73c86c" "486f5531b0e093a00c4d94f81e74a7a0" #> #> $down #> NULL ``` ### Performance considerations It is possible to change the default behaviour of `follow` to not follow through a task chain. Doing this might be slightly more efficient in cases where you are interested in running large numbers of small tasks and do not want to retry failed tasks. This is because almost every request that involves a task id will have to check to see if it has been moved. To change this behaviour, pass the `follow` argument to the queue constructor, for example: ```r obj_nofollow <- rrq_controller(obj$queue_id, follow = FALSE) rrq_task_result(t1, controller = obj_nofollow) #> #> from: stochastic_failure(0.5) #> error: Convergence failure - x is only 0.27 #> queue: rrq:8777a78c #> task: 64ddd6f44b196d8f3daa08900b73c86c #> status: ERROR #> * To throw this error, use stop() with it #> * This error has a stack trace, use '$trace' to see it ``` Here, we would create a queue controller object with the default follow behaviour set to `FALSE` and so accessing a task result of a moved task would not search down through any potential retries unless `follow = TRUE` was explicitly passed. ### Deleting tasks that have been retried Deletion will operate on all tasks in a chain. So if you delete a task that has been retried then it deletes everything upstream (up to the original task) and downstream (down to the last time that the task was retried). This is because otherwise it is too easy to end up with inconsistent state. So if we run ```r rrq_task_delete(t1) ``` all three tasks in the chain are deleted: ```r rrq_task_status(c(t1, t2, t3), follow = FALSE) #> [1] "MISSING" "MISSING" "MISSING" ``` ### Going further Currently this system does not allow you to do a few things that would be useful * You can't change queue when you retry a task (for configurations with multiple queues) * You can't automatically retry a task on failure (e.g., try this up to 5 times before giving up entirely) * You can't retry tasks that have become impossible due to failure of a dependent task, even if that task has been retried and since succeeded We will relax these limitations soon.