In addition to passing tasks (and results) between a controller and workers, the controller can also send “messages” to workers. This vignette shows what the possible messages do.
Generally, you will not need to read this vignette unless you are either programming against rrq in interesting ways, or you have unusual needs. Some of the interfaces here may be useful for debugging (particularly directing particular workers to execute particular pieces of code).
In order to do this, we’re going to need a queue and a worker:
library(rrq)
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
logdir <- tempfile()
w <- rrq_worker_spawn(logdir = logdir)
#> ℹ Spawning 1 worker with prefix 'linear_atlanticbluetang'
worker_id <- w$id
On startup the worker log contains:
[2024-11-11 10:32:39.497445] ALIVE
[2024-11-11 10:32:39.498483] ENVIR new
[2024-11-11 10:32:39.498956] QUEUE default
__
______________ _/ /
______ / ___/ ___/ __ `/ /_____
/_____/ / / / / / /_/ /_/_____/
______ /_/ /_/ \__, (_) ______
/_____/ /_/ /_____/
worker: linear_atlanticbluetang_1
config: localhost
rrq_version: 0.7.20
platform: x86_64-pc-linux-gnu (64-bit)
running: Ubuntu 24.04 LTS
hostname: wpia-dide242
username: pl2113
queue: rrq:10a1ca54:queue:default
wd: /home/pl2113/Work/rrq/vignettes_src
pid: 3167060
redis_host: 127.0.0.1
redis_port: 6379
heartbeat_key: <not set>
offload_path: <not set>
Because one of the main effects of messages is to print to the worker logfile, we’ll print this fairly often.
The queue sends a message for one or more workers to process. The message has an identifier that is derived from the current time. Messages are written to a first-in-first-out queue, per worker, and are processed independently by workers who do not look to see if other workers have messages or are processing them.
As soon as a worker has finished processing any current job it will process the message (it must wait to finish a current job but will not start any further jobs).
Once the message has been processed (see below) a response will be written to a response list with the same identifier as the message.
Some messages interact with the worker timeout:
PING
, ECHO
, EVAL
,
INFO
PAUSE
, RESUME
and
REFRESH
will reset the timer, as if a task had been
runTIMEOUT_SET
explicitly interacts with the timerTIMEOUT_GET
does not reset the timer, reporting the
remaining timeSTOP
causes the worker to exit, so has no interaction
with the timerPING
The PING
message simply asks the worker to return
PONG
. It’s useful for diagnosing communication issues
because it does so little
The message id is going to be useful for getting responses:
(this is derived from the current time, according to Redis which is the central reference point of time for the whole system).
[2024-11-11 10:32:39.690975] MESSAGE PING
PONG
[2024-11-11 10:32:39.692034] RESPONSE PING
The logfile prints:
PING
(MESSAGE PING
)PONG
to the R message streamRESPONSE PONG
), which means that
something is written to the response stream.We can access the same bits of information in the worker log:
rrq_worker_log_tail(n = Inf)
#> worker_id child time command message
#> 1 linear_atlanticbluetang_1 NA 1731321159 ALIVE
#> 2 linear_atlanticbluetang_1 NA 1731321159 ENVIR new
#> 3 linear_atlanticbluetang_1 NA 1731321159 QUEUE default
#> 4 linear_atlanticbluetang_1 NA 1731321160 MESSAGE PING
#> 5 linear_atlanticbluetang_1 NA 1731321160 RESPONSE PING
This includes the ALIVE
message as the worker comes
up.
Inspecting the logs is fine for interactive use, but it’s going to be more useful often to poll for a response.
We already know that our worker has a response, but we can ask anyway:
Or inversely we can as what messages a given worker has responses for:
To fetch the responses from all workers it was sent to (always returning a named list):
or to fetch the response from a given worker:
The response can be deleted by passing delete = TRUE
to
this method:
rrq_message_get_response(message_id, worker_id, delete = TRUE)
#> $linear_atlanticbluetang_1
#> [1] "PONG"
after which recalling the message will throw an error:
rrq_message_get_response(message_id, worker_id)
#> Error in `rrq_message_get_response()`:
#> ! Response missing for worker: 'linear_atlanticbluetang_1'
There is also a timeout
argument that lets you wait
until a response is ready (as in rrq_task_wait()
).
rrq_task_create_expr(Sys.sleep(2))
#> [1] "789b5e06abbad488f770c1bcc5e87545"
message_id <- rrq_message_send("PING")
rrq_message_get_response(
message_id, worker_id, delete = TRUE, timeout = 10)
#> $linear_atlanticbluetang_1
#> [1] "PONG"
Looking at the log will show what went on here:
rrq_worker_log_tail(n = 4)
#> worker_id child time command
#> 1 linear_atlanticbluetang_1 NA 1731321160 TASK_START
#> 2 linear_atlanticbluetang_1 NA 1731321162 TASK_COMPLETE
#> 3 linear_atlanticbluetang_1 NA 1731321162 MESSAGE
#> 4 linear_atlanticbluetang_1 NA 1731321162 RESPONSE
#> message
#> 1 789b5e06abbad488f770c1bcc5e87545
#> 2 789b5e06abbad488f770c1bcc5e87545
#> 3 PING
#> 4 PING
However, because the message is only processed after the task is completed, the response takes a while to come back. Equivalently, from the worker log:
[2024-11-11 10:32:39.838001] TASK_START 789b5e06abbad488f770c1bcc5e87545
[2024-11-11 10:32:41.844936] TASK_COMPLETE 789b5e06abbad488f770c1bcc5e87545
[2024-11-11 10:32:41.846925] MESSAGE PING
PONG
[2024-11-11 10:32:41.848314] RESPONSE PING
ECHO
This is basically like PING
and not very interesting; it
prints an arbitrary string to the log. It always returns
"OK"
as a response.
message_id <- rrq_message_send("ECHO", "hello world!")
rrq_message_get_response(message_id, worker_id, timeout = 10)
#> $linear_atlanticbluetang_1
#> [1] "OK"
[2024-11-11 10:32:42.366155] MESSAGE ECHO
hello world!
[2024-11-11 10:32:42.366688] RESPONSE ECHO
INFO
The INFO
command refreshes and returns the worker
information.
We already have a copy of the worker info; it was created when the worker started up:
rrq_worker_info()[[worker_id]]
#> <rrq_worker_info>
#> worker: linear_atlanticbluetang_1
#> config: localhost
#> rrq_version: 0.7.20
#> platform: x86_64-pc-linux-gnu (64-bit)
#> running: Ubuntu 24.04 LTS
#> hostname: wpia-dide242
#> username: pl2113
#> queue: rrq:10a1ca54:queue:default
#> wd: /home/pl2113/Work/rrq/vignettes_src
#> pid: 3167060
#> redis_host: 127.0.0.1
#> redis_port: 6379
#> heartbeat_key: NULL
#> offload_path: NULL
We can force the worker to refresh:
Here’s the new worker information, complete with an updated
envir
field:
rrq_message_get_response(message_id, worker_id, timeout = 10)
#> $linear_atlanticbluetang_1
#> $linear_atlanticbluetang_1$worker
#> [1] "linear_atlanticbluetang_1"
#>
#> $linear_atlanticbluetang_1$config
#> [1] "localhost"
#>
#> $linear_atlanticbluetang_1$rrq_version
#> [1] "0.7.20"
#>
#> $linear_atlanticbluetang_1$platform
#> [1] "x86_64-pc-linux-gnu (64-bit)"
#>
#> $linear_atlanticbluetang_1$running
#> [1] "Ubuntu 24.04 LTS"
#>
#> $linear_atlanticbluetang_1$hostname
#> [1] "wpia-dide242"
#>
#> $linear_atlanticbluetang_1$username
#> [1] "pl2113"
#>
#> $linear_atlanticbluetang_1$queue
#> [1] "rrq:10a1ca54:queue:default"
#>
#> $linear_atlanticbluetang_1$wd
#> [1] "/home/pl2113/Work/rrq/vignettes_src"
#>
#> $linear_atlanticbluetang_1$pid
#> [1] 3167060
#>
#> $linear_atlanticbluetang_1$redis_host
#> [1] "127.0.0.1"
#>
#> $linear_atlanticbluetang_1$redis_port
#> [1] 6379
#>
#> $linear_atlanticbluetang_1$heartbeat_key
#> NULL
#>
#> $linear_atlanticbluetang_1$offload_path
#> NULL
EVAL
Evaluate an arbitrary R expression, passed as a string (not as any sort of unevaluated or quoted expression). This expression is evaluated in the global environment, which is not the environment in which queued code is evaluated in.
message_id <- rrq_message_send("EVAL", "1 + 1")
rrq_message_get_response(message_id, worker_id, timeout = 10)
#> $linear_atlanticbluetang_1
#> [1] 2
This could be used to evaluate code that has side effects, such as installing packages. However, due to limitations with how R loads packages the only way to update and reload a package is going to be to restart the worker.
PAUSE
/ RESUME
The PAUSE
/ RESUME
messages can be used to
prevent workers from picking up new work (and then allowing them to
start again).
rrq_worker_status()
#> linear_atlanticbluetang_1
#> "IDLE"
message_id <- rrq_message_send("PAUSE")
rrq_message_get_response(message_id, worker_id, timeout = 10)
#> $linear_atlanticbluetang_1
#> [1] "OK"
rrq_worker_status()
#> linear_atlanticbluetang_1
#> "PAUSED"
Once paused workers ignore tasks, which stay on the queue:
Sending a RESUME
message unpauses the worker:
SET_TIMEOUT
/ GET_TIMEOUT
Workers will quit after being left idle for more than a certain time;
this is their timeout. Only processing tasks counts as work (not
messages). You can query the timeout with GET_TIMEOUT
and
set it with SET_TIMEOUT
. For our worker above the timeout
is infinite; it will never quit:
rrq_message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $linear_atlanticbluetang_1
#> timeout_idle remaining
#> Inf Inf
We can set this to a finite value, in seconds:
rrq_message_send_and_wait("TIMEOUT_SET", 600, worker_ids = worker_id)
#> $linear_atlanticbluetang_1
#> [1] "OK"
Here the timeout is set to 10 minutes (600s).
Once set, the TIMEOUT_GET
returns the length of time
remaining before the worker exits
rrq_message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $linear_atlanticbluetang_1
#> timeout_idle remaining
#> 600.0000 599.9358
Sys.sleep(5)
rrq_message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $linear_atlanticbluetang_1
#> timeout_idle remaining
#> 600.0000 594.8764
One useful pattern is to send work to workers, then set the timeout to zero. This means that when work is complete they will exit (almost) immediately:
ids <- rrq_task_create_bulk_call(function(x) {
Sys.sleep(0.5)
runif(x)
}, 1:5)
rrq_message_send("TIMEOUT_SET", 0, worker_id)
rrq_task_wait(ids)
#> [1] TRUE
rrq_task_results(ids)
#> [[1]]
#> [1] 0.3373661
#>
#> [[2]]
#> [1] 0.9112179 0.3083427
#>
#> [[3]]
#> [1] 0.08386948 0.35809694 0.70711586
#>
#> [[4]]
#> [1] 0.3996886 0.4476056 0.1349520 0.5158503
#>
#> [[5]]
#> [1] 0.59790526 0.09680605 0.53178950 0.58914220 0.15977853
The worker will remain idle for 60s (by default) which is the length of time that one poll for work lasts, then it will exit.
There are other methods that are typically used via methods on the
[rrq_controller
] object.
REFRESH
: requests that the worker refresh its
evaluation environment. Typically used via
rrq_worker_envir_set()
STOP
: sent with a informational message as an argument,
requests that the worker stop. Typically used via
rrq_worker_stop()