Package 'queuer'

Title: Queue Tasks
Description: Queue tasks to number of backends.
Authors: Rich FitzJohn [aut, cre], Imperial College of Science, Technology and Medicine [cph]
Maintainer: Rich FitzJohn <[email protected]>
License: MIT + file LICENSE
Version: 0.5.0
Built: 2024-12-16 02:57:45 UTC
Source: https://github.com/mrc-ide/queuer

Help Index


Progress bar with timeout

Description

Progress bar with timeout

Usage

progress_timeout(total, timeout, ..., show = NULL, label = NULL, digits = 0)

Arguments

total

Total number of expected things. Use NULL if you want to wait on a single thing

timeout

The number of seconds to wait

...

Additional arguments to progress::progress_bar$new

show

Flag to indicate if the bar should be displayed. If NULL then the global options queuer.progress_show and queuer.progress_suppress are used to determine if the bar is shown (with suppress overriding show). This will evaluate to TRUE by default. A logical flag overrides queuer.progress_show but not queuer.progress_suppress.

label

An optional label to prefix the timeout bar with (will not be padded with space), or in the case of total = NULL to indicate what is being waited on.

digits

The number of digits of accuracy to display the remaining time in


Base class for queues

Description

A base class, on top of which queues can be developed. This includes all methods except for support for actually submitting tasks.

Public fields

context

The context object

Methods

Public methods


Method new()

Constructor

Usage
queue_base$new(context_id, root = NULL, initialize = TRUE)
Arguments
context_id

A context identifier; either a context object or an name/id of a saved context (see context::context_save()).

root

Root path to load contexts from, if using a string identifier for context_id. If context_id is a context object then root must be NULL.

initialize

Logical, indicating if the context should be loaded immediately. If you want to run tasks this must be TRUE, but to query it can be FALSE. See context::context_load() and the ⁠$initialise_context()⁠ method.


Method initialize_context()

Load the context. This causes the packages to be loaded and all script files to be sourced. This is required before any tasks can be queued, because we need to be check against this environment to work out what is available on any workers.

Usage
queue_base$initialize_context()

Method task_list()

List all tasks known to this queue

Usage
queue_base$task_list()

Method task_status()

Return the status of selected tasks

Usage
queue_base$task_status(task_ids = NULL, named = TRUE)
Arguments
task_ids

Task identifiers to query. If NULL, then all tasks are queried.

named

Logical, indicating if the status result should be named by by task id.


Method task_times()

Return the times taken by tasks as a data.frame

Usage
queue_base$task_times(task_ids = NULL, unit_elapsed = "secs", sorted = TRUE)
Arguments
task_ids

Task identifiers to query. If NULL, then all tasks are queried.

unit_elapsed

Time unit to use for the elapsed fields

sorted

Logical indicating of the fields should be sorted by submitted time.


Method task_get()

Retrieve a task by id

Usage
queue_base$task_get(task_id, check_exists = TRUE)
Arguments
task_id

A task identifier (hexadecimal string)

check_exists

Logical, indicating if we should check that the task exists.


Method task_result()

Retrieve a task's result

Usage
queue_base$task_result(task_id)
Arguments
task_id

A task identifier (hexadecimal string)


Method task_delete()

Delete tasks

Usage
queue_base$task_delete(task_ids)
Arguments
task_ids

A vector of task identifiers (each a hexadecimal string)


Method task_retry_failed()

Retry failed tasks

Usage
queue_base$task_retry_failed(task_ids)
Arguments
task_ids

A vector of task identifiers (each a hexadecimal string)


Method task_bundle_list()

List all known task bundles

Usage
queue_base$task_bundle_list()

Method task_bundle_info()

List all known task bundles along with information about what was run and when.

Usage
queue_base$task_bundle_info()

Method task_bundle_get()

Get a task bundle by name

Usage
queue_base$task_bundle_get(name)
Arguments
name

A task bundle identifier (a string of the form adjective_anmimal)


Method task_bundle_retry_failed()

Retry failed tasks in a bundle

Usage
queue_base$task_bundle_retry_failed(name)
Arguments
name

A task bundle identifier (a string of the form adjective_anmimal)


Method enqueue()

Queue a task

Usage
queue_base$enqueue(
  expr,
  envir = parent.frame(),
  submit = TRUE,
  name = NULL,
  depends_on = NULL
)
Arguments
expr

An unevaluated expression to put on the queue

envir

The environment that you would run this expression in locally. This will be used to copy across any dependent variables. For example, if your expression is sum(1 + a), we will also send the value of a to the worker along with the expression.

submit

Logical indicating if the task should be submitted

name

Optional name for the task

depends_on

Optional vector of task ids to depend on


Method enqueue_()

Queue a task

Usage
queue_base$enqueue_(
  expr,
  envir = parent.frame(),
  submit = TRUE,
  name = NULL,
  depends_on = NULL
)
Arguments
expr

A quoted expression to put on the queue

envir

The environment that you would run this expression in locally. This will be used to copy across any dependent variables. For example, if your expression is sum(1 + a), we will also send the value of a to the worker along with the expression.

submit

Logical indicating if the task should be submitted

name

Optional name for the task

depends_on

Optional vector of task ids to depend on


Method enqueue_bulk()

Send a bulk set of tasks to your workers. This function is a bit like a mash-up of Map and do.call, when used with a data.frame argument, which is typically what is provided. Rather than ⁠$lapply()⁠ which applies FUN to each element of X, ⁠enqueue_bulk will apply over each row of ⁠X⁠, spreading the columms out as arguments. If you have a function ⁠f(a, b)⁠and a [data.frame] with columns⁠aandb' this should feel intuitive.

Usage
queue_base$enqueue_bulk(
  X,
  FUN,
  ...,
  do_call = TRUE,
  envir = parent.frame(),
  timeout = 0,
  time_poll = 1,
  progress = NULL,
  name = NULL,
  overwrite = FALSE,
  depends_on = NULL
)
Arguments
X

Typically a data.frame, which you want to apply FUN over, row-wise. The names of the data.frame must match the arguments of your function.

FUN

A function

...

Additional arguments to add to every call to FUN

do_call

Logical, indicating if each row of X should be treated as if it was do.call(FUN, X[i, ]) - typically this is what you want.

envir

The environment to use to try and find the function

timeout

Optional timeout, in seconds, after which an error will be thrown if the task has not completed.

time_poll

Optional time with which to "poll" for completion.

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

name

Optional name for a created bundle

overwrite

Logical, indicating if we should overwrite any bundle that exists with name name.

depends_on

Optional task ids to depend on (see context::bulk_task_save()).


Method lapply()

Apply a function over a list of data. This is equivalent to using ⁠$enqueue()⁠ over each element in the list.

Usage
queue_base$lapply(
  X,
  FUN,
  ...,
  envir = parent.frame(),
  timeout = 0,
  time_poll = 1,
  progress = NULL,
  name = NULL,
  overwrite = FALSE,
  depends_on = NULL
)
Arguments
X

A list of data to apply our function against

FUN

A function to be applied to each element of X

...

Additional arguments to add to every call to FUN

envir

The environment to use to try and find the function

timeout

Optional timeout, in seconds, after which an error will be thrown if the task has not completed.

time_poll

Optional time with which to "poll" for completion.

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

name

Optional name for a created bundle

overwrite

Logical, indicating if we should overwrite any bundle that exists with name name.

depends_on

Optional task ids to depend on (see context::bulk_task_save()).


Method mapply()

A wrapper like mapply

Send a bulk set of tasks to your workers. This function is a bit like a mash-up of Map and do.call, when used with a data.frame argument, which is typically what is provided. Rather than ⁠$lapply()⁠ which applies FUN to each element of X, ⁠enqueue_bulk will apply over each row of ⁠X⁠, spreading the columms out as arguments. If you have a function ⁠f(a, b)⁠and a [data.frame] with columns⁠aandb' this should feel intuitive.

Usage
queue_base$mapply(
  FUN,
  ...,
  MoreArgs = NULL,
  envir = parent.frame(),
  timeout = 0,
  time_poll = 1,
  progress = NULL,
  name = NULL,
  use_names = TRUE,
  overwrite = FALSE,
  depends_on = NULL
)
Arguments
FUN

A function

...

Additional arguments to add to every call to FUN

MoreArgs

As for mapply, additional arguments that apply to every function call.

envir

The environment to use to try and find the function

timeout

Optional timeout, in seconds, after which an error will be thrown if the task has not completed.

time_poll

Optional time with which to "poll" for completion.

progress

Optional logical indicating if a progress bar should be displayed. If NULL we fall back on the value of the global option rrq.progress, and if that is unset display a progress bar if in an interactive session.

name

Optional name for a created bundle

use_names

Use names

overwrite

Logical, indicating if we should overwrite any bundle that exists with name name.

depends_on

Optional task ids to depend on (see context::bulk_task_save()).

X

Typically a data.frame, which you want to apply FUN over, row-wise. The names of the data.frame must match the arguments of your function.


Method submit()

Submit a task into a queue. This is a stub method and must be overridden by a derived class for the queue to do anything.

Usage
queue_base$submit(task_ids, names = NULL, depends_on = NULL)
Arguments
task_ids

Vector of tasks to submit

names

Optional vector of names of tasks

depends_on

Optional named list of task ids to vectors of dependencies, e.g. list("t3" = c("t", "t1"), "t4" = "t)


Method unsubmit()

Unsubmit a task from the queue. This is a stub method and must be overridden by a derived class for the queue to do anything.

Usage
queue_base$unsubmit(task_ids)
Arguments
task_ids

Vector of tasks to submit


Combine task bundles

Description

Combine two or more task bundles

Usage

task_bundle_combine(..., bundles = list(...), name = NULL, overwrite = FALSE)

Arguments

...

Any number of task bundles

bundles

A list of bundles (used in place of ... and probably more useful for programming).

name

Group name

overwrite

Logical indicating if an existing bundle with the same name should be overwritten. If FALSE and a bundle with this name already exists, an error will be thrown.

Details

For now task bundles must have the same function to be combined.