---
title: "messages"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{messages}
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
---
In addition to passing tasks (and results) between a controller and
workers, the controller can also send "messages" to workers. This
vignette shows what the possible messages do.
In order to do this, we're going to need a queue and a worker:
```r
library(rrq)
id <- paste0("rrq:", ids::random_id(bytes = 4))
obj <- rrq_controller(id)
rrq_default_controller_set(obj)
logdir <- tempfile()
w <- rrq_worker_spawn(logdir = logdir)
#> ℹ Spawning 1 worker with prefix 'boiling_cockatoo'
worker_id <- w$id
```
On startup the worker log contains:
```plain
[2024-04-23 09:15:49.424189] ALIVE
[2024-04-23 09:15:49.425172] ENVIR new
[2024-04-23 09:15:49.425903] QUEUE default
__
______________ _/ /
______ / ___/ ___/ __ `/ /_____
/_____/ / / / / / /_/ /_/_____/
______ /_/ /_/ \__, (_) ______
/_____/ /_/ /_____/
worker: boiling_cockatoo_1
config: localhost
rrq_version: 0.7.15
platform: x86_64-pc-linux-gnu (64-bit)
running: Ubuntu 20.04.6 LTS
hostname: wpia-dide300
username: rfitzjoh
queue: rrq:7c911007:queue:default
wd: /home/rfitzjoh/Documents/src/rrq/vignettes_src
pid: 597597
redis_host: 127.0.0.1
redis_port: 6379
heartbeat_key:
```
Because one of the main effects of messages is to print to the
worker logfile, we'll print this fairly often.
## Messages and responses
1. The queue sends a message for one or more workers to process.
The message has an *identifier* that is derived from the current
time. Messages are written to a first-in-first-out queue, *per
worker*, and are processed independently by workers who do not
look to see if other workers have messages or are processing
them.
2. As soon as a worker has finished processing any current job it
will process the message (it must wait to finish a current job
but will not start any further jobs).
3. Once the message has been processed (see below) a response will
be written to a response list with the same identifier as the
message.
Some messages interact with the worker timeout:
* `PING`, `ECHO`, `EVAL`, `INFO` `PAUSE`, `RESUME` and `REFRESH` will reset the timer, as if a task had been run
* `TIMEOUT_SET` explicitly interacts with the timer
* `TIMEOUT_GET` does not reset the timer, reporting the remaining time
* `STOP` causes the worker to exit, so has no interaction with the timer
## `PING`
The `PING` message simply asks the worker to return `PONG`. It's
useful for diagnosing communication issues because it does so
little
```r
message_id <- rrq_message_send("PING")
```
The message id is going to be useful for getting responses:
```r
message_id
#> [1] "1713860149.627246"
```
(this is derived from the current time, according to Redis which is
the central reference point of time for the whole system).
```plain
[2024-04-23 09:15:49.627994] MESSAGE PING
PONG
[2024-04-23 09:15:49.628864] RESPONSE PING
```
The logfile prints:
1. the request for the `PING` (`MESSAGE PING`)
2. the value `PONG` to the R message stream
3. logging a response (`RESPONSE PONG`), which means that something is written to the response stream.
We can access the same bits of information in the worker log:
```r
rrq_worker_log_tail(n = Inf)
#> worker_id child time command message
#> 1 boiling_cockatoo_1 NA 1713860149 ALIVE
#> 2 boiling_cockatoo_1 NA 1713860149 ENVIR new
#> 3 boiling_cockatoo_1 NA 1713860149 QUEUE default
#> 4 boiling_cockatoo_1 NA 1713860150 MESSAGE PING
#> 5 boiling_cockatoo_1 NA 1713860150 RESPONSE PING
```
This includes the `ALIVE` message as the worker comes up.
Inspecting the logs is fine for interactive use, but it's going to
be more useful often to poll for a response.
We already know that our worker has a response, but we can ask anyway:
```r
rrq_message_has_response(message_id)
#> boiling_cockatoo_1
#> TRUE
```
Or inversely we can as what messages a given worker has responses for:
```r
rrq_message_response_ids(worker_id)
#> [1] "1713860149.627246"
```
To fetch the responses from all workers it was sent to (always
returning a named list):
```r
rrq_message_get_response(message_id)
#> $boiling_cockatoo_1
#> [1] "PONG"
```
or to fetch the response from a given worker:
```r
rrq_message_get_response(message_id, worker_id)
#> $boiling_cockatoo_1
#> [1] "PONG"
```
The response can be deleted by passing `delete = TRUE` to this method:
```r
rrq_message_get_response(message_id, worker_id, delete = TRUE)
#> $boiling_cockatoo_1
#> [1] "PONG"
```
after which recalling the message will throw an error:
```r
rrq_message_get_response(message_id, worker_id)
#> Error in `rrq_message_get_response()`:
#> ! Response missing for worker: 'boiling_cockatoo_1'
```
There is also a `timeout` argument that lets you wait until a response is
ready (as in `rrq_task_wait()`).
```r
rrq_task_create_expr(Sys.sleep(2))
#> [1] "1a878e7cac11aa4e7fa234286676954c"
message_id <- rrq_message_send("PING")
rrq_message_get_response(
message_id, worker_id, delete = TRUE, timeout = 10)
#> $boiling_cockatoo_1
#> [1] "PONG"
```
Looking at the log will show what went on here:
```r
rrq_worker_log_tail(n = 4)
#> worker_id child time command
#> 1 boiling_cockatoo_1 NA 1713860150 TASK_START
#> 2 boiling_cockatoo_1 NA 1713860152 TASK_COMPLETE
#> 3 boiling_cockatoo_1 NA 1713860152 MESSAGE
#> 4 boiling_cockatoo_1 NA 1713860152 RESPONSE
#> message
#> 1 1a878e7cac11aa4e7fa234286676954c
#> 2 1a878e7cac11aa4e7fa234286676954c
#> 3 PING
#> 4 PING
```
1. A task is received
2. 2s later the task is completed
3. Then the message is received
4. Then, basically instantaneously, the message is responded to
However, because the message is only processed after the task is
completed, the response takes a while to come back. Equivalently,
from the worker log:
```plain
[2024-04-23 09:15:49.751482] TASK_START 1a878e7cac11aa4e7fa234286676954c
[2024-04-23 09:15:51.754822] TASK_COMPLETE 1a878e7cac11aa4e7fa234286676954c
[2024-04-23 09:15:51.755615] MESSAGE PING
PONG
[2024-04-23 09:15:51.756114] RESPONSE PING
```
## `ECHO`
This is basically like `PING` and not very interesting; it prints
an arbitrary string to the log. It always returns `"OK"` as a
response.
```r
message_id <- rrq_message_send("ECHO", "hello world!")
rrq_message_get_response(message_id, worker_id, timeout = 10)
#> $boiling_cockatoo_1
#> [1] "OK"
```
```plain
[2024-04-23 09:15:52.269914] MESSAGE ECHO
hello world!
[2024-04-23 09:15:52.27044] RESPONSE ECHO
```
## `INFO`
The `INFO` command refreshes and returns the worker information.
We already have a copy of the worker info; it was created when the
worker started up:
```r
rrq_worker_info()[[worker_id]]
#>
#> worker: boiling_cockatoo_1
#> config: localhost
#> rrq_version: 0.7.15
#> platform: x86_64-pc-linux-gnu (64-bit)
#> running: Ubuntu 20.04.6 LTS
#> hostname: wpia-dide300
#> username: rfitzjoh
#> queue: rrq:7c911007:queue:default
#> wd: /home/rfitzjoh/Documents/src/rrq/vignettes_src
#> pid: 597597
#> redis_host: 127.0.0.1
#> redis_port: 6379
```
We can force the worker to refresh:
```r
message_id <- rrq_message_send("INFO")
```
Here's the new worker information, complete with an updated `envir`
field:
```r
rrq_message_get_response(message_id, worker_id, timeout = 10)
#> $boiling_cockatoo_1
#> $boiling_cockatoo_1$worker
#> [1] "boiling_cockatoo_1"
#>
#> $boiling_cockatoo_1$config
#> [1] "localhost"
#>
#> $boiling_cockatoo_1$rrq_version
#> [1] "0.7.15"
#>
#> $boiling_cockatoo_1$platform
#> [1] "x86_64-pc-linux-gnu (64-bit)"
#>
#> $boiling_cockatoo_1$running
#> [1] "Ubuntu 20.04.6 LTS"
#>
#> $boiling_cockatoo_1$hostname
#> [1] "wpia-dide300"
#>
#> $boiling_cockatoo_1$username
#> [1] "rfitzjoh"
#>
#> $boiling_cockatoo_1$queue
#> [1] "rrq:7c911007:queue:default"
#>
#> $boiling_cockatoo_1$wd
#> [1] "/home/rfitzjoh/Documents/src/rrq/vignettes_src"
#>
#> $boiling_cockatoo_1$pid
#> [1] 597597
#>
#> $boiling_cockatoo_1$redis_host
#> [1] "127.0.0.1"
#>
#> $boiling_cockatoo_1$redis_port
#> [1] 6379
```
## `EVAL`
Evaluate an arbitrary R expression, passed as a string (*not* as
any sort of unevaluated or quoted expression). This expression is
evaluated in the global environment, which is *not* the environment
in which queued code is evaluated in.
```r
message_id <- rrq_message_send("EVAL", "1 + 1")
rrq_message_get_response(message_id, worker_id, timeout = 10)
#> $boiling_cockatoo_1
#> [1] 2
```
This could be used to evaluate code that has side effects, such as
installing packages. However, due to limitations with how R loads
packages the only way to update and reload a package is going to be
to restart the worker.
## `PAUSE` / `RESUME`
The `PAUSE` / `RESUME` messages can be used to prevent workers from picking up new work (and then allowing them to start again).
```r
rrq_worker_status()
#> boiling_cockatoo_1
#> "IDLE"
message_id <- rrq_message_send("PAUSE")
rrq_message_get_response(message_id, worker_id, timeout = 10)
#> $boiling_cockatoo_1
#> [1] "OK"
rrq_worker_status()
#> boiling_cockatoo_1
#> "PAUSED"
```
Once paused workers ignore tasks, which stay on the queue:
```r
t <- rrq_task_create_expr(runif(5))
rrq_task_status(t)
#> [1] "PENDING"
```
Sending a `RESUME` message unpauses the worker:
```r
message_id <- rrq_message_send("RESUME")
rrq_message_get_response(message_id, worker_id, timeout = 10)
#> $boiling_cockatoo_1
#> [1] "OK"
rrq_task_wait(t, 5)
#> [1] TRUE
```
## `SET_TIMEOUT` / `GET_TIMEOUT`
Workers will quit after being left idle for more than a certain time; this is their timeout. Only processing tasks counts as work (not messages). You can query the timeout with `GET_TIMEOUT` and set it with `SET_TIMEOUT`. For our worker above the timeout is infinite; it will never quit:
```r
rrq_message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $boiling_cockatoo_1
#> timeout_idle remaining
#> Inf Inf
```
We can set this to a finite value, in seconds:
```r
rrq_message_send_and_wait("TIMEOUT_SET", 600, worker_ids = worker_id)
#> $boiling_cockatoo_1
#> [1] "OK"
```
Here the timeout is set to 10 minutes (600s).
Once set, the `TIMEOUT_GET` returns the length of time remaining before the worker exits
```r
rrq_message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $boiling_cockatoo_1
#> timeout_idle remaining
#> 600.0000 599.9353
Sys.sleep(5)
rrq_message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $boiling_cockatoo_1
#> timeout_idle remaining
#> 600.000 594.872
```
One useful pattern is to send work to workers, then set the timeout to zero. This means that when work is complete they will exit (almost) immediately:
```r
ids <- rrq_task_create_bulk_call(function(x) {
Sys.sleep(0.5)
runif(x)
}, 1:5)
rrq_message_send("TIMEOUT_SET", 0, worker_id)
rrq_task_wait(ids)
#> [1] TRUE
rrq_task_results(ids)
#> [[1]]
#> [1] 0.6824953
#>
#> [[2]]
#> [1] 0.3917506 0.7700726
#>
#> [[3]]
#> [1] 0.61080994 0.47160999 0.06371166
#>
#> [[4]]
#> [1] 0.2314934 0.4231295 0.7559091 0.4027418
#>
#> [[5]]
#> [1] 0.6109091 0.8732575 0.4324995 0.2940482 0.2035300
```
The worker will remain idle for 60s (by default) which is the length of time that one poll for work lasts, then it will exit.
```r
rrq_worker_status(worker_id)
#> boiling_cockatoo_1
#> "IDLE"
rrq_message_send_and_wait("TIMEOUT_GET", worker_ids = worker_id)
#> $boiling_cockatoo_1
#> timeout_idle remaining
#> 0 0
```
## Messages that are supported but use via wrappers:
There are other methods that are typically used via methods on the [`rrq_controller`] object.
* `REFRESH`: requests that the worker refresh its evaluation environment. Typically used via `rrq_worker_envir_set()`
* `STOP`: sent with a informational message as an argument, requests that the worker stop. Typically used via `rrq_worker_stop()`