In a perfect world, nothing would fail, and this vignette would not be needed. But here we are.
When you run tasks within a single process and things go wrong, you know immediately you see it happen. With a queuing system like rrq it’s less obvious when something bad has happened because the task is running on another process, possibly on a different machine!
rrq
picks this up and marks the task
as errored, and moves on to the next thing. You can then see that this
error has happened as its status.This vignette discusses how you can mitigate against and recover from these sorts of issues. We start with the expected errors and build up to considerations for creating a fault tolerant queue.
We start with the simplest sort of fault, and can just as easily
happen locally as remotely with rrq
. In this case the
handling is fairly well defined and there’s not much you need to do.
This section is not really “fault tolerance” at all, but simply how rrq
handles errors and what you can do about it.
Consider this simple function which would fit a linear model between two variables:
library(rrq)
obj <- rrq_controller(paste0("rrq:", ids::random_id(bytes = 4)))
rrq_default_controller_set(obj)
rrq_worker_envir_set(rrq_envir(sources = "fault.R"))
w <- rrq_worker_spawn()
#> ℹ Spawning 1 worker with prefix 'loverless_cattle'
In the happy case, everything works as expected:
x <- runif(5)
y <- 2 * x + rnorm(length(x), 0, 0.2)
t <- rrq_task_create_expr(fit_model(x, y))
rrq_task_wait(t, 10)
#> [1] TRUE
rrq_task_status(t)
#> [1] "COMPLETE"
rrq_task_result(t)
#>
#> Call:
#> lm(formula = y ~ x)
#>
#> Coefficients:
#> (Intercept) x
#> -0.05859 2.24645
But if we provide invalid input, the task will error:
t <- rrq_task_create_expr(fit_model(x, NULL))
rrq_task_wait(t, 10)
#> [1] FALSE
rrq_task_result(t)
#> <rrq_task_error>
#> from: model.frame.default(formula = y ~ x, drop.unused.levels = TRUE)
#> error: invalid type (NULL) for variable 'y'
#> queue: rrq:8777a78c
#> task: 8706f6c5f86f961e2b292e4b86a0594d
#> status: ERROR
#> * To throw this error, use stop() with it
#> * This error has a stack trace, use '$trace' to see it
The first thing to note here is that the task does not throw an error
when you fetch the result, either via task_wait
as above,
or via task_result
:
r <- rrq_task_result(t)
r
#> <rrq_task_error>
#> from: model.frame.default(formula = y ~ x, drop.unused.levels = TRUE)
#> error: invalid type (NULL) for variable 'y'
#> queue: rrq:8777a78c
#> task: 8706f6c5f86f961e2b292e4b86a0594d
#> status: ERROR
#> * To throw this error, use stop() with it
#> * This error has a stack trace, use '$trace' to see it
However, the result will be an object of rrq_task_error
which you can test for using inherits
:
The default behaviour of rrq
is not to error when
fetching a task as that would require that you use tryCatch
everywhere where you retrieve tasks that might have failed, and because
errors are often interesting themselves. For example,
rrq_task_error
objects include stack traces alongside the
error:
r$trace
#> ▆
#> 1. ├─rlang::try_fetch(...)
#> 2. │ ├─base::tryCatch(...)
#> 3. │ │ └─base (local) tryCatchList(expr, classes, parentenv, handlers)
#> 4. │ │ └─base (local) tryCatchOne(expr, names, parentenv, handlers[[1L]])
#> 5. │ │ └─base (local) doTryCatch(return(expr), name, parentenv, handler)
#> 6. │ └─base::withCallingHandlers(...)
#> 7. ├─base::eval(task$expr, envir)
#> 8. │ └─base::eval(task$expr, envir)
#> 9. │ └─fit_model(x, NULL)
#> 10. │ ├─stats::lm(y ~ x)
#> 11. │ │ └─base::eval(mf, parent.frame())
#> 12. │ │ └─base::eval(mf, parent.frame())
#> 13. │ ├─stats::model.frame(formula = y ~ x, drop.unused.levels = TRUE)
#> 14. │ └─stats::model.frame.default(formula = y ~ x, drop.unused.levels = TRUE)
#> 15. └─base::.handleSimpleError(...)
#> 16. └─rlang (local) h(simpleError(msg, call))
#> 17. └─handlers[[3L]](cnd)
These are rlang
stack traces, which are somewhat richer
than those produced by traceback()
, containing the same set
of stacks but arranged in a tree. See the documentation
there for details. This error object will also include any warnings
captured while the task ran.
Objects of class rrq_task_error
inherit from
error
and condition
so, once thrown, will
behave as expected in programs using errors for flow control (e.g., with
tryCatch
); you can throw them yourself with
stop()
:
stop(r)
#> Error in model.frame.default(formula = y ~ x, drop.unused.levels = TRUE): invalid type (NULL) for variable 'y'
You can also change the default behaviour to error on failure by
passing error = TRUE
to rrq_task_result()
or
rrq_tasks_result()
, which will immediately rethrow the
error in your R session (your program could then stop or you could again
catch it with tryCatch
):
rrq_task_result(t, error = TRUE)
#> Error in model.frame.default(formula = y ~ x, drop.unused.levels = TRUE): invalid type (NULL) for variable 'y'
This process is unchanged if the task is run in a separate process
(with separate_process = TRUE
passed to
enqueue
), with the same status and return type, and with
the trace information available after failure.
Running tasks in separate processes (e.g.,
rrq_task_create_expr(mytask(), separate_process = TRUE)
) is
the simplest way of making things more resilient because this creates a
layer of isolation between the worker and the task. If your task crashes
R (e.g., a segmentation fault due to a bug in your C/C++ code) or is
killed by the operating system then the worker process survives and can
update the keys in Redis directly to advertise this fact. This is
generally much nicer than when the worker dies and the task status
cannot be updated.
The downside of using separate processes is that it is much slower;
compare the time taken to queue, run an retrieve a trivial task run in
the same worker process (look at the elapsed
entry)
system.time(
rrq_task_wait(rrq_task_create_expr(identity(1)), 10))
#> user system elapsed
#> 0.001 0.001 0.002
with the same task run in a separate process on the worker
system.time(
rrq_task_wait(rrq_task_create_expr(identity(1), separate_process = TRUE), 10))
#> user system elapsed
#> 0.00 0.00 0.27
We expect this difference to be ~100 fold in local workers. Almost all the cost is due to the overhead of starting and terminating a fresh R session. If your queue uses a lot of packages there will be additional overhead here too as these are loaded. However, this cost is fixed and will decrease as a fraction of the total running time as the running time of your task increases. So for long-running tasks the additional safety of separate processes is probably worthwhile. For smaller tasks you may want to make sure you run a heartbeat process so that you can handle failures via that route.
Consider running some long running job; this one simply sleeps for an hour
To simulate the process crashing, we’ve killed it. Because we have
queued this on a separate process we can use
rrq_task_info()
to fetch the process id (PID) of the
process that the task is running in (this is different to that of the
worker).
Because the task was run in a separate process, our worker could detect that the task has died unexpectedly:
This is different to the error status in the previous section (that was ERROR). Note that if we had not run the task in a separate process the task status would be unchanged as RUNNING) because nothing could ever update the task status!
Retrieving the result has similar behaviour to the error case; we
don’t throw but instead return an object of class
rrq_task_error
(which also inherits from error
and condition
). However, this time there’s really not much
extra information in the error:
r <- rrq_task_result(t)
r
#> <rrq_task_error>
#> error: Task not successful: DIED
#> queue: rrq:8777a78c
#> task: c845b831c3df187ba772cbe51b5ec8aa
#> status: DIED
#> * To throw this error, use stop() with it
There’s also no trace available
In this section we outline what you can do about unexplained and unreported failures in your task, or loss of workers. This will typically be that your worker has crashed (due to your task crashing it perhaps), killed (e.g., by the operating system or an administrator) or the loss of the machine that it is working on.
In order to enable fault tolerance for this sort of issue, you first
need to enable a “heartbeat” on the worker processes. This is a second
process on each worker that periodically writes to the Redis database on
a key that will expire in a time slightly longer than that period, in
effect making a dead man’s
switch - see rrq::rrq_heartbeat
for details. So if the
worker process dies for any reason, then after a while we’ll detect that
as its key has expired. We can then take some action.
To enable the heartbeat, save a worker configuration before starting a worker:
When you spawn a worker it will pick up this configuration, and we’ll be able to detect if it has died.
In order to simulate the loss of the worker, we will kill it after starting a long-running task:
Now, we can queue some long running process, which this worker will start:
The worker will pick the task up fairly quickly, and the status will
change to RUNNING
:
We kill the worker (simulating the job crashing, or the machine turning off, etc):
Because the worker has been killed, it can’t write to redis to tell us that the task can’t be completed, so this status will never change:
The heartbeat will persist for 3 times the period given above (this multiplier is not configurable and while any number greater than 1 should be OK, we picked this as it allows for occasional network connectivity issues or slowness on the node - we may reduce it in a future version). This means that after 9s (3 * 3s) the key will have expired:
We can then use the worker_detect_exited()
method to
clean up
rrq_worker_detect_exited()
#> Lost 1 worker:
#> - lovesick_bobolink_1
#> Orphaning 1 task:
#> - cc995285ca5d639e3bed3ab958a7b241
At this point, the statuses of our task and worker are correct:
Fetching the task result provides the same DIED error as above:
rrq_task_result(t)
#> <rrq_task_error>
#> error: Task not successful: DIED
#> queue: rrq:8777a78c
#> task: cc995285ca5d639e3bed3ab958a7b241
#> status: DIED
#> * To throw this error, use stop() with it
This is still not terribly useful, as we have not provided any mechanism to automatically requeue a lost task or restart a dead worker, which we cover in the next section.
Regardless of how your tasks got to be in a broken situation, the mechanism for getting them out of this situation remains the same; you want to retry the task:
Alternatively, perhaps your task ran to successful completion but you simply want to rerun it (e.g., some stochastic algorithm that is displaying a non-fatal pathology).
You can use rrq_task_retry
to retry any number of tasks
that have completed (in one of the terminal states - including
COMPLETE
but also ERROR
,
CANCELLED
DIED
or TIMEOUT
). This
is nondestructive to any of the task data, in particular its result, so
you will still have access to the failed run and its stack trace (but
see below).
In this example, we’ll run some “model” that may or may not converge, and we want to retry until it does. This represents some badly behaved task with a stochastic component that will eventually work if tried enough times:
stochastic_failure <- function(p) {
x <- runif(1)
if (x < p) {
stop(sprintf("Convergence failure - x is only %0.2f", x))
}
x
}
We can create a new environment for our worker, to pick up this function:
rrq_worker_envir_set(rrq_envir(sources = "fault2.R"))
w <- rrq::rrq_worker_spawn()
#> ℹ Spawning 1 worker with prefix 'polonium_uromastyxspinipes'
Now, we enqueue a task as usual, then wait on it
t1 <- rrq_task_create_expr(stochastic_failure(0.5))
rrq_task_wait(t1, timeout = 10)
#> [1] FALSE
rrq_task_result(t1)
#> <rrq_task_error>
#> from: stochastic_failure(0.5)
#> error: Convergence failure - x is only 0.27
#> queue: rrq:8777a78c
#> task: 64ddd6f44b196d8f3daa08900b73c86c
#> status: ERROR
#> * To throw this error, use stop() with it
#> * This error has a stack trace, use '$trace' to see it
Oh no, this task has failed!
We can use the task_retry
method to requeue this job,
meaning that it will run exactly as before. However, the random number
stream has moved on and this task will return a different answer -
perhaps it will work this time?
t2 <- rrq_task_retry(t1)
rrq_task_wait(t2, timeout = 10)
#> [1] FALSE
rrq_task_result(t2)
#> <rrq_task_error>
#> from: stochastic_failure(0.5)
#> error: Convergence failure - x is only 0.37
#> queue: rrq:8777a78c
#> task: 486f5531b0e093a00c4d94f81e74a7a0
#> status: ERROR
#> * To throw this error, use stop() with it
#> * This error has a stack trace, use '$trace' to see it
The task has failed again. Note here that task_retry
has
taken a task id and returned a new task id, which we have waited on.
However, we could also have used the original id (more on this
below).
Let’s give it one more go:
t3 <- rrq_task_retry(t2)
rrq_task_wait(t3, timeout = 10)
#> [1] TRUE
rrq_task_result(t3)
#> [1] 0.5728534
Success! The task has run to completion and we no longer have an error.
When you retry a task, rrq sets up redirects that point from the old task id to the new one, and in most cases you can use whichever you find more convenient. For example, we can now read the successful task result from any of the task ids:
rrq_task_results(c(t1, t2, t3))
#> [[1]]
#> [1] 0.5728534
#>
#> [[2]]
#> [1] 0.5728534
#>
#> [[3]]
#> [1] 0.5728534
To prevent this, pass follow = FALSE
to return the
original result
rrq_task_results(c(t1, t2, t3), follow = FALSE)
#> [[1]]
#> <rrq_task_error>
#> from: stochastic_failure(0.5)
#> error: Convergence failure - x is only 0.27
#> queue: rrq:8777a78c
#> task: 64ddd6f44b196d8f3daa08900b73c86c
#> status: ERROR
#> * To throw this error, use stop() with it
#> * This error has a stack trace, use '$trace' to see it
#>
#> [[2]]
#> <rrq_task_error>
#> from: stochastic_failure(0.5)
#> error: Convergence failure - x is only 0.37
#> queue: rrq:8777a78c
#> task: 486f5531b0e093a00c4d94f81e74a7a0
#> status: ERROR
#> * To throw this error, use stop() with it
#> * This error has a stack trace, use '$trace' to see it
#>
#> [[3]]
#> [1] 0.5728534
Here, we can see that the first two times we ran the task they
failed, along with the error and any backtraces. The same logic applies
to other functions, such as rrq_task_status()
rrq_task_status(c(t1, t2, t3))
#> [1] "COMPLETE" "COMPLETE" "COMPLETE"
rrq_task_status(c(t1, t2, t3), follow = FALSE)
#> [1] "MOVED" "MOVED" "COMPLETE"
Note that the task status above has overwritten the original status (previously it was ERROR, but now it is MOVED). However, you can always read the original status in the error object’s status field, should you need it.
You can also extract times that events happened with
rrq_task_times()
rrq_task_times(c(t1, t2, t3))
#> submit start complete moved
#> 64ddd6f44b196d8f3daa08900b73c86c 1721033848 1721033848 1721033848 NA
#> 486f5531b0e093a00c4d94f81e74a7a0 1721033848 1721033848 1721033848 NA
#> 1df424045659911ead3673895160f480 1721033848 1721033848 1721033848 NA
rrq_task_times(c(t1, t2, t3), follow = FALSE)
#> submit start complete moved
#> 64ddd6f44b196d8f3daa08900b73c86c 1721033848 1721033848 1721033848 1721033848
#> 486f5531b0e093a00c4d94f81e74a7a0 1721033848 1721033848 1721033848 1721033848
#> 1df424045659911ead3673895160f480 1721033848 1721033848 1721033848 NA
This means that most of the time you can ignore that a task has been retried and work with whatever task id you have handy.
You can inspect the chain of tasks by looking at the return values
rrq_task_info()
. In the above example we have a chain of
three tasks
r backquote(paste0(substr(c(t1, t2, t3), 1, 6), "...", collapse = " -> "))
,
which we can discover this way:
rrq_task_info(t1)
#> $id
#> [1] "64ddd6f44b196d8f3daa08900b73c86c"
#>
#> $status
#> [1] "MOVED"
#>
#> $queue
#> [1] "default"
#>
#> $separate_process
#> [1] FALSE
#>
#> $timeout
#> NULL
#>
#> $worker
#> [1] "polonium_uromastyxspinipes_1"
#>
#> $pid
#> NULL
#>
#> $depends
#> $depends$up
#> NULL
#>
#> $depends$down
#> NULL
#>
#>
#> $moved
#> $moved$up
#> NULL
#>
#> $moved$down
#> [1] "486f5531b0e093a00c4d94f81e74a7a0" "1df424045659911ead3673895160f480"
Here, the moved
field shows the tasks that are upstream
(up
) and downstream (down
) of this task. For
tasks that have never been retried both elements are NULL
.
The elements are always ordered from oldest to newest, and chains of
tasks are always linear (i.e., there is no forking).
For the middle task in the chain, both up
and
down
point at different tasks:
rrq_task_info(t2)$moved
#> $up
#> [1] "64ddd6f44b196d8f3daa08900b73c86c"
#>
#> $down
#> [1] "1df424045659911ead3673895160f480"
and for the leaf task:
It is possible to change the default behaviour of follow
to not follow through a task chain. Doing this might be slightly more
efficient in cases where you are interested in running large numbers of
small tasks and do not want to retry failed tasks. This is because
almost every request that involves a task id will have to check to see
if it has been moved.
To change this behaviour, pass the follow
argument to
the queue constructor, for example:
obj_nofollow <- rrq_controller(obj$queue_id, follow = FALSE)
rrq_task_result(t1, controller = obj_nofollow)
#> <rrq_task_error>
#> from: stochastic_failure(0.5)
#> error: Convergence failure - x is only 0.27
#> queue: rrq:8777a78c
#> task: 64ddd6f44b196d8f3daa08900b73c86c
#> status: ERROR
#> * To throw this error, use stop() with it
#> * This error has a stack trace, use '$trace' to see it
Here, we would create a queue controller object with the default
follow behaviour set to FALSE
and so accessing a task
result of a moved task would not search down through any potential
retries unless follow = TRUE
was explicitly passed.
Deletion will operate on all tasks in a chain. So if you delete a task that has been retried then it deletes everything upstream (up to the original task) and downstream (down to the last time that the task was retried). This is because otherwise it is too easy to end up with inconsistent state. So if we run
all three tasks in the chain are deleted:
Currently this system does not allow you to do a few things that would be useful
We will relax these limitations soon.