Title: | Queue Tasks |
---|---|
Description: | Queue tasks to number of backends. |
Authors: | Rich FitzJohn [aut, cre], Imperial College of Science, Technology and Medicine [cph] |
Maintainer: | Rich FitzJohn <[email protected]> |
License: | MIT + file LICENSE |
Version: | 0.5.0 |
Built: | 2024-10-17 03:28:37 UTC |
Source: | https://github.com/mrc-ide/queuer |
Progress bar with timeout
progress_timeout(total, timeout, ..., show = NULL, label = NULL, digits = 0)
progress_timeout(total, timeout, ..., show = NULL, label = NULL, digits = 0)
total |
Total number of expected things. Use |
timeout |
The number of seconds to wait |
... |
Additional arguments to
|
show |
Flag to indicate if the bar should be displayed. If
|
label |
An optional label to prefix the timeout bar with
(will not be padded with space), or in the case of |
digits |
The number of digits of accuracy to display the remaining time in |
A base class, on top of which queues can be developed. This includes all methods except for support for actually submitting tasks.
context
The context object
new()
Constructor
queue_base$new(context_id, root = NULL, initialize = TRUE)
context_id
A context identifier; either a context
object or an name/id of a saved context (see
context::context_save()
).
root
Root path to load contexts from, if using a string
identifier for context_id
. If context_id
is a context
object then root
must be NULL
.
initialize
Logical, indicating if the context should be
loaded immediately. If you want to run tasks this must be
TRUE
, but to query it can be FALSE
. See
context::context_load()
and the $initialise_context()
method.
initialize_context()
Load the context. This causes the packages to be loaded and all script files to be sourced. This is required before any tasks can be queued, because we need to be check against this environment to work out what is available on any workers.
queue_base$initialize_context()
task_list()
List all tasks known to this queue
queue_base$task_list()
task_status()
Return the status of selected tasks
queue_base$task_status(task_ids = NULL, named = TRUE)
task_ids
Task identifiers to query. If NULL
, then all
tasks are queried.
named
Logical, indicating if the status result should be named by by task id.
task_times()
Return the times taken by tasks as a data.frame
queue_base$task_times(task_ids = NULL, unit_elapsed = "secs", sorted = TRUE)
task_ids
Task identifiers to query. If NULL
, then all
tasks are queried.
unit_elapsed
Time unit to use for the elapsed fields
sorted
Logical indicating of the fields should be sorted by submitted time.
task_get()
Retrieve a task by id
queue_base$task_get(task_id, check_exists = TRUE)
task_id
A task identifier (hexadecimal string)
check_exists
Logical, indicating if we should check that the task exists.
task_result()
Retrieve a task's result
queue_base$task_result(task_id)
task_id
A task identifier (hexadecimal string)
task_delete()
Delete tasks
queue_base$task_delete(task_ids)
task_ids
A vector of task identifiers (each a hexadecimal string)
task_retry_failed()
Retry failed tasks
queue_base$task_retry_failed(task_ids)
task_ids
A vector of task identifiers (each a hexadecimal string)
task_bundle_list()
List all known task bundles
queue_base$task_bundle_list()
task_bundle_info()
List all known task bundles along with information about what was run and when.
queue_base$task_bundle_info()
task_bundle_get()
Get a task bundle by name
queue_base$task_bundle_get(name)
name
A task bundle identifier (a string of the form
adjective_anmimal
)
task_bundle_retry_failed()
Retry failed tasks in a bundle
queue_base$task_bundle_retry_failed(name)
name
A task bundle identifier (a string of the form
adjective_anmimal
)
enqueue()
Queue a task
queue_base$enqueue( expr, envir = parent.frame(), submit = TRUE, name = NULL, depends_on = NULL )
expr
An unevaluated expression to put on the queue
envir
The environment that you would run this expression in
locally. This will be used to copy across any dependent variables.
For example, if your expression is sum(1 + a)
, we will also send
the value of a
to the worker along with the expression.
submit
Logical indicating if the task should be submitted
name
Optional name for the task
depends_on
Optional vector of task ids to depend on
enqueue_()
Queue a task
queue_base$enqueue_( expr, envir = parent.frame(), submit = TRUE, name = NULL, depends_on = NULL )
expr
A quoted expression to put on the queue
envir
The environment that you would run this expression in
locally. This will be used to copy across any dependent variables.
For example, if your expression is sum(1 + a)
, we will also send
the value of a
to the worker along with the expression.
submit
Logical indicating if the task should be submitted
name
Optional name for the task
depends_on
Optional vector of task ids to depend on
enqueue_bulk()
Send a bulk set of tasks to your workers.
This function is a bit like a mash-up of Map and do.call,
when used with a data.frame argument, which is typically what
is provided. Rather than $lapply()
which applies FUN
to each
element of X
, enqueue_bulk will apply over each row of
X, spreading the columms out as arguments. If you have a function
f(a, b)and a [data.frame] with columns
aand
b' this
should feel intuitive.
queue_base$enqueue_bulk( X, FUN, ..., do_call = TRUE, envir = parent.frame(), timeout = 0, time_poll = 1, progress = NULL, name = NULL, overwrite = FALSE, depends_on = NULL )
X
Typically a data.frame, which you want to apply FUN
over, row-wise. The names of the data.frame
must match the
arguments of your function.
FUN
A function
...
Additional arguments to add to every call to FUN
do_call
Logical, indicating if each row of X
should be
treated as if it was do.call(FUN, X[i, ])
- typically this is what
you want.
envir
The environment to use to try and find the function
timeout
Optional timeout, in seconds, after which an error will be thrown if the task has not completed.
time_poll
Optional time with which to "poll" for completion.
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
name
Optional name for a created bundle
overwrite
Logical, indicating if we should overwrite any
bundle that exists with name name
.
depends_on
Optional task ids to depend on (see
context::bulk_task_save()
).
lapply()
Apply a function over a list of data. This is
equivalent to using $enqueue()
over each element in the list.
queue_base$lapply( X, FUN, ..., envir = parent.frame(), timeout = 0, time_poll = 1, progress = NULL, name = NULL, overwrite = FALSE, depends_on = NULL )
X
A list of data to apply our function against
FUN
A function to be applied to each element of X
...
Additional arguments to add to every call to FUN
envir
The environment to use to try and find the function
timeout
Optional timeout, in seconds, after which an error will be thrown if the task has not completed.
time_poll
Optional time with which to "poll" for completion.
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
name
Optional name for a created bundle
overwrite
Logical, indicating if we should overwrite any
bundle that exists with name name
.
depends_on
Optional task ids to depend on (see
context::bulk_task_save()
).
mapply()
A wrapper like mapply
Send a bulk set of tasks to your workers.
This function is a bit like a mash-up of Map and do.call,
when used with a data.frame argument, which is typically what
is provided. Rather than $lapply()
which applies FUN
to each
element of X
, enqueue_bulk will apply over each row of
X, spreading the columms out as arguments. If you have a function
f(a, b)and a [data.frame] with columns
aand
b' this
should feel intuitive.
queue_base$mapply( FUN, ..., MoreArgs = NULL, envir = parent.frame(), timeout = 0, time_poll = 1, progress = NULL, name = NULL, use_names = TRUE, overwrite = FALSE, depends_on = NULL )
FUN
A function
...
Additional arguments to add to every call to FUN
MoreArgs
As for mapply, additional arguments that apply to every function call.
envir
The environment to use to try and find the function
timeout
Optional timeout, in seconds, after which an error will be thrown if the task has not completed.
time_poll
Optional time with which to "poll" for completion.
progress
Optional logical indicating if a progress bar
should be displayed. If NULL
we fall back on the value of the
global option rrq.progress
, and if that is unset display a
progress bar if in an interactive session.
name
Optional name for a created bundle
use_names
Use names
overwrite
Logical, indicating if we should overwrite any
bundle that exists with name name
.
depends_on
Optional task ids to depend on (see
context::bulk_task_save()
).
X
Typically a data.frame, which you want to apply FUN
over, row-wise. The names of the data.frame
must match the
arguments of your function.
submit()
Submit a task into a queue. This is a stub method and must be overridden by a derived class for the queue to do anything.
queue_base$submit(task_ids, names = NULL, depends_on = NULL)
task_ids
Vector of tasks to submit
names
Optional vector of names of tasks
depends_on
Optional named list of task ids to vectors of dependencies, e.g. list("t3" = c("t", "t1"), "t4" = "t)
unsubmit()
Unsubmit a task from the queue. This is a stub method and must be overridden by a derived class for the queue to do anything.
queue_base$unsubmit(task_ids)
task_ids
Vector of tasks to submit
Combine two or more task bundles
task_bundle_combine(..., bundles = list(...), name = NULL, overwrite = FALSE)
task_bundle_combine(..., bundles = list(...), name = NULL, overwrite = FALSE)
... |
Any number of task bundles |
bundles |
A list of bundles (used in place of |
name |
Group name |
overwrite |
Logical indicating if an existing bundle with the
same name should be overwritten. If |
For now task bundles must have the same function to be combined.