Title: | A Distributed Worker Launcher Framework |
---|---|
Description: | In computationally demanding analysis projects, statisticians and data scientists asynchronously deploy long-running tasks to distributed systems, ranging from traditional clusters to cloud services. The 'NNG'-powered 'mirai' R package by Gao (2023) <doi:10.5281/zenodo.7912722> is a sleek and sophisticated scheduler that efficiently processes these intense workloads. The 'crew' package extends 'mirai' with a unifying interface for third-party worker launchers. Inspiration also comes from packages. 'future' by Bengtsson (2021) <doi:10.32614/RJ-2021-048>, 'rrq' by FitzJohn and Ashton (2023) <https://github.com/mrc-ide/rrq>, 'clustermq' by Schubert (2019) <doi:10.1093/bioinformatics/btz284>), and 'batchtools' by Lang, Bischel, and Surmann (2017) <doi:10.21105/joss.00135>. |
Authors: | William Michael Landau [aut, cre] (ORCID: <https://orcid.org/0000-0003-1878-3253>), Daniel Woodie [ctb], Eli Lilly and Company [cph, fnd] |
Maintainer: | William Michael Landau <will.landau.oss@gmail.com> |
License: | MIT + file LICENSE |
Version: | 1.3.0 |
Built: | 2025-09-15 12:46:39 UTC |
Source: | https://github.com/wlandau/crew |
In computationally demanding analysis projects,
statisticians and data scientists asynchronously deploy
long-running tasks to distributed systems, ranging from
traditional clusters to cloud services.
The NNG-powered
mirai
R package is a sleek and sophisticated scheduler
that efficiently processes these intense workloads.
The crew
package extends
mirai
with a unifying interface
for third-party worker launchers.
Inspiration also comes from packages
future
,
rrq
,
clustermq
,
and batchtools
.
Assert that a condition is true.
crew_assert(value = NULL, ..., message = NULL, envir = parent.frame())
crew_assert(value = NULL, ..., message = NULL, envir = parent.frame())
value |
An object or condition. |
... |
Conditions that use the |
message |
Optional message to print on error. |
envir |
Environment to evaluate the condition. |
NULL
(invisibly). Throws an error if the condition is not true.
Other utility:
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
crew_assert(1 < 2) crew_assert("object", !anyNA(.), nzchar(.)) tryCatch( crew_assert(2 < 1), crew_error = function(condition) message("false") )
crew_assert(1 < 2) crew_assert("object", !anyNA(.), nzchar(.)) tryCatch( crew_assert(2 < 1), crew_error = function(condition) message("false") )
R6
client class.R6
class for mirai
clients.
See crew_client()
.
host
See crew_client()
.
port
See crew_client()
.
tls
See crew_client()
.
serialization
See crew_client()
.
profile
Compute profile of the client.
seconds_interval
See crew_client()
.
seconds_timeout
See crew_client()
.
relay
Relay object for event-driven programming on a downstream condition variable.
started
Whether the client is started.
url
Client websocket URL.
new()
mirai
client constructor.
crew_class_client$new( host = NULL, port = NULL, tls = NULL, serialization = NULL, profile = NULL, seconds_interval = NULL, seconds_timeout = NULL, relay = NULL )
host
Argument passed from crew_client()
.
port
Argument passed from crew_client()
.
tls
Argument passed from crew_client()
.
serialization
Argument passed from crew_client()
.
profile
Argument passed from crew_client()
.
seconds_interval
Argument passed from crew_client()
.
seconds_timeout
Argument passed from crew_client()
.
relay
Argument passed from crew_client()
.
An R6
object with the client.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() client$log() client$terminate() }
validate()
Validate the client.
crew_class_client$validate()
NULL
(invisibly).
set_started()
Register the client as started.
crew_class_client$set_started()
Exported to implement the sequential controller. Only meant to be called manually inside the client or the sequential controller.
NULL
(invisibly).
start()
Start listening for workers on the available sockets.
crew_class_client$start()
NULL
(invisibly).
terminate()
Stop the mirai client and disconnect from the worker websockets.
crew_class_client$terminate()
NULL
(invisibly).
status()
Get the counters from mirai::info()
.
crew_class_client$status()
A named integer vector of task counts (awaiting, executing, completed) as well as the number of worker connections.
pids()
Deprecated on 2025-08-26 in crew
version 1.2.1.9005.
crew_class_client$pids()
The integer process ID of the current process.
Other client:
crew_client()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() client$log() client$terminate() } ## ------------------------------------------------ ## Method `crew_class_client$new` ## ------------------------------------------------ if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() client$log() client$terminate() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() client$log() client$terminate() } ## ------------------------------------------------ ## Method `crew_class_client$new` ## ------------------------------------------------ if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() client$log() client$terminate() }
R6
class for controllers.
See crew_controller()
.
profile
Character string, compute profile of the controller.
client
Client object.
launcher
Launcher object.
tasks
A list of mirai::mirai()
task objects.
The list of tasks is dynamically generated from an internal,
dictionary, so it is not as fast as a simple lookup.
reset_globals
See crew_controller()
.
since the controller was started.
reset_packages
See crew_controller()
.
since the controller was started.
reset_options
See crew_controller()
.
since the controller was started.
garbage_collection
See crew_controller()
.
since the controller was started.
crashes_max
See crew_controller()
.
backup
See crew_controller()
.
error
Tibble of task results (with one result per row)
from the last call to map(error = "stop)
.
loop
later
loop if asynchronous auto-scaling is running,
NULL
otherwise.
queue_resolved
Queue of resolved tasks.
queue_backlog
Queue of explicitly backlogged tasks.
new()
mirai
controller constructor.
crew_class_controller$new( client = NULL, launcher = NULL, reset_globals = NULL, reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, crashes_max = NULL, backup = NULL )
client
Client object. See crew_controller()
.
launcher
Launcher object. See crew_controller()
.
reset_globals
See crew_controller()
.
reset_packages
See crew_controller()
.
reset_options
See crew_controller()
.
garbage_collection
See crew_controller()
.
crashes_max
See crew_controller()
.
backup
See crew_controller()
.
An R6
controller object.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() launcher <- crew_launcher_local() controller <- crew_controller(client = client, launcher = launcher) controller$start() controller$push(name = "task", command = sqrt(4)) controller$wait() controller$pop() controller$terminate() }
validate()
Validate the controller.
crew_class_controller$validate()
NULL
(invisibly).
size()
Number of tasks in the controller.
crew_class_controller$size(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Non-negative integer, number of tasks in the controller.
empty()
Check if the controller is empty.
crew_class_controller$empty(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
A controller is empty if it has no mirai::mirai()
task objects in the controller.
There may still be other tasks running on the workers
of an empty controller, but those tasks were not submitted with
push()
or collect()
,
and they are not part of the controller task queue.
TRUE
if the controller is empty, FALSE
otherwise.
nonempty()
Check if the controller is nonempty.
crew_class_controller$nonempty(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
A controller is empty if it has no mirai::mirai()
task objects in the controller.
There may still be other tasks running on the workers
of an empty controller, but those tasks were not submitted with
push()
or collect()
,
and they are not part of the controller task queue.
TRUE
if the controller is empty, FALSE
otherwise.
resolved()
Cumulative number of resolved tasks.
crew_class_controller$resolved(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
resolved()
is cumulative: it counts all the resolved
tasks over the entire lifetime of the controller session.
Non-negative integer of length 1, number of resolved tasks. The return value is 0 if the condition variable does not exist (i.e. if the client is not running).
unresolved()
Number of unresolved tasks.
crew_class_controller$unresolved(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Non-negative integer of length 1, number of unresolved tasks.
saturated()
Check if the controller is saturated.
crew_class_controller$saturated( collect = NULL, throttle = NULL, controller = NULL )
collect
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttle
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
A controller is saturated if the number of uncollected tasks
is greater than or equal to the maximum number of workers.
You can still push tasks to a saturated controller, but
tools that use crew
such as targets
may choose not to
(for performance and user-friendliness).
TRUE
if the controller is saturated, FALSE
otherwise.
start()
Start the controller if it is not already started.
crew_class_controller$start(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Register the mirai client and register worker websockets with the launcher.
NULL
(invisibly).
started()
Check whether the controller is started.
crew_class_controller$started(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Actually checks whether the client is started.
TRUE
if the controller is started, FALSE
otherwise.
launch()
Launch one or more workers.
crew_class_controller$launch(n = 1L, controllers = NULL)
n
Number of workers to launch.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
NULL
(invisibly).
scale()
Auto-scale workers out to meet the demand of tasks.
crew_class_controller$scale(throttle = TRUE, controllers = NULL)
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
The scale()
method launches new workers to
run tasks if needed.
Invisibly returns TRUE
if auto-scaling was attempted
(throttling can skip it) and there was any relevant
auto-scaling activity (new worker launches or worker
connection/disconnection events). FALSE
otherwise.
autoscale()
Run worker auto-scaling in a later
loop
in polling intervals determined by exponential backoff.
crew_class_controller$autoscale( loop = later::current_loop(), controllers = NULL )
loop
A later
loop to run auto-scaling.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Call controller$descale()
to terminate the
auto-scaling loop.
NULL
(invisibly).
descale()
Terminate the auto-scaling loop started by
controller$autoscale()
.
crew_class_controller$descale(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
NULL
(invisibly).
crashes()
Report the number of consecutive crashes of a task.
crew_class_controller$crashes(name, controllers = NULL)
name
Character string, name of the task to check.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
See the crashes_max
argument of crew_controller()
.
Non-negative integer, number of consecutive times the task crashed.
push()
Push a task to the head of the task list.
crew_class_controller$push( command, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, scale = TRUE, throttle = TRUE, name = NULL, save_command = NULL, controller = NULL )
command
Language object with R code to run.
data
Named list of local data objects in the evaluation environment.
globals
Named list of objects to temporarily assign to the
global environment for the task.
This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals
argument
of crew_controller_local()
.
substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of the
command
argument. If TRUE
(default) then command
is quoted
literally as you write it, e.g.
push(command = your_function_call())
. If FALSE
, then crew
assumes command
is a language object and you are passing its
value, e.g. push(command = quote(your_function_call()))
.
substitute = TRUE
is appropriate for interactive use,
whereas substitute = FALSE
is meant for automated R programs
that invoke crew
controllers.
seed
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed
argument of set.seed()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
algorithm
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind
argument of RNGkind()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the lib.loc
argument of require()
.
seconds_timeout
Optional task timeout passed to the .timeout
argument of mirai::mirai()
(after converting to milliseconds).
scale
Logical, whether to automatically call scale()
to auto-scale workers to meet the demand of the task load. Also
see the throttle
argument.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
name
Character string, name of the task. If NULL
, then
a random name is generated automatically.
The name of the task must not conflict with the name of another
task pushed to the controller. Any previous task with the same name
must first be popped before a new task with that name can be pushed.
save_command
Deprecated on 2025-01-22 (crew
version
0.10.2.9004) and no longer used.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Invisibly return the mirai
object of the pushed task.
This allows you to interact with the task directly, e.g.
to create a promise object with promises::as.promise()
.
walk()
Apply a single command to multiple inputs, and return control to the user without waiting for any task to complete.
crew_class_controller$walk( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, names = NULL, save_command = NULL, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over.
For example, to run function calls
f(x = 1, y = "a")
and f(x = 2, y = "b")
,
set command
to f(x, y)
, and set iterate
to
list(x = c(1, 2), y = c("a", "b"))
. The individual
function calls are evaluated as
f(x = iterate$x[[1]], y = iterate$y[[1]])
and
f(x = iterate$x[[2]], y = iterate$y[[2]])
.
All the elements of iterate
must have the same length.
If there are any name conflicts between iterate
and data
,
iterate
takes precedence.
data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily
assign to the global environment for each task. This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals
argument of crew_controller_local()
.
Objects in this list are treated as single
values and are held constant for each iteration of the map.
substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of the
command
argument. If TRUE
(default) then command
is quoted
literally as you write it, e.g.
push(command = your_function_call())
. If FALSE
, then crew
assumes command
is a language object and you are passing its
value, e.g. push(command = quote(your_function_call()))
.
substitute = TRUE
is appropriate for interactive use,
whereas substitute = FALSE
is meant for automated R programs
that invoke crew
controllers.
seed
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed
argument of set.seed()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
algorithm
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind
argument of RNGkind()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the lib.loc
argument of require()
.
seconds_timeout
Optional task timeout passed to the .timeout
argument of mirai::mirai()
(after converting to milliseconds).
names
Optional character of length 1, name of the element of
iterate
with names for the tasks. If names
is supplied,
then iterate[[names]]
must be a character vector.
save_command
Deprecated on 2025-01-22 (crew
version
0.10.2.9004). The command is always saved now.
verbose
Logical of length 1, whether to print to a progress bar when pushing tasks.
scale
Logical, whether to automatically scale workers to meet
demand. See also the throttle
argument.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
In contrast to walk()
, map()
blocks the local R session
and waits for all tasks to complete.
Invisibly returns a list of mirai
task objects for the
newly created tasks. The order of tasks in the list matches the
order of data in the iterate
argument.
map()
Apply a single command to multiple inputs, wait for all tasks to complete, and return the results of all tasks.
crew_class_controller$map( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_interval = NULL, seconds_timeout = NULL, names = NULL, save_command = NULL, error = "stop", warnings = TRUE, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over.
For example, to run function calls
f(x = 1, y = "a")
and f(x = 2, y = "b")
,
set command
to f(x, y)
, and set iterate
to
list(x = c(1, 2), y = c("a", "b"))
. The individual
function calls are evaluated as
f(x = iterate$x[[1]], y = iterate$y[[1]])
and
f(x = iterate$x[[2]], y = iterate$y[[2]])
.
All the elements of iterate
must have the same length.
If there are any name conflicts between iterate
and data
,
iterate
takes precedence.
data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily
assign to the global environment for each task. This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals
argument of crew_controller_local()
.
Objects in this list are treated as single
values and are held constant for each iteration of the map.
substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of the
command
argument. If TRUE
(default) then command
is quoted
literally as you write it, e.g.
push(command = your_function_call())
. If FALSE
, then crew
assumes command
is a language object and you are passing its
value, e.g. push(command = quote(your_function_call()))
.
substitute = TRUE
is appropriate for interactive use,
whereas substitute = FALSE
is meant for automated R programs
that invoke crew
controllers.
seed
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed
argument of set.seed()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
algorithm
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind
argument of RNGkind()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the lib.loc
argument of require()
.
seconds_interval
Deprecated on 2025-01-17 (crew
version
0.10.2.9003). Instead, the seconds_interval
argument passed
to crew_controller_group()
is used as seconds_max
in a crew_throttle()
object which orchestrates exponential
backoff.
seconds_timeout
Optional task timeout passed to the .timeout
argument of mirai::mirai()
(after converting to milliseconds).
names
Optional character string, name of the element of
iterate
with names for the tasks. If names
is supplied,
then iterate[[names]]
must be a character vector.
save_command
Deprecated on 2025-01-22 (crew
version
0.10.2.9004). The command is always saved now.
error
Character of length 1, choice of action if a task was not successful. Possible values:
"stop"
: throw an error in the main R session instead of returning
a value. In case of an error, the results from the last errored
map()
are in the error
field
of the controller, e.g. controller_object$error
. To reduce
memory consumption, set controller_object$error <- NULL
after
you are finished troubleshooting.
"warn"
: throw a warning. This allows the return value with
all the error messages and tracebacks to be generated.
"silent"
: do nothing special.
NOTE: the only kinds of errors considered here are errors at the R
level. A crashed tasks will return a status of "crash"
in the output
and not trigger an error in map()
unless crashes_max
is reached.
warnings
Logical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.
verbose
Logical of length 1, whether to print to a progress bar as tasks resolve.
scale
Logical, whether to automatically scale workers to meet
demand. See also the throttle
argument.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
map()
cannot be used unless all prior tasks are
completed and popped. You may need to wait and then pop them
manually. Alternatively, you can start over: either call
terminate()
on the current controller object to reset it, or
create a new controller object entirely.
A tibble
of results and metadata: one row per task
and columns corresponding to the output of pop()
.
pop()
Pop a completed task from the results data frame.
crew_class_controller$pop( scale = TRUE, collect = NULL, throttle = TRUE, error = NULL, controllers = NULL )
scale
Logical of length 1,
whether to automatically call scale()
to auto-scale workers to meet the demand of the task load.
Scaling up on pop()
may be important
for transient or nearly transient workers that tend to drop off
quickly after doing little work.
See also the throttle
argument.
collect
Deprecated in version 0.5.0.9003 (2023-10-02).
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
error
NULL
or character of length 1, choice of action if
the popped task threw an error. Possible values:
"stop"
: throw an error in the main R session instead of returning
a value.
"warn"
: throw a warning.
NULL
or "silent"
: do not react to errors.
NOTE: the only kinds of errors considered here are errors at the R
level. A crashed tasks will return a status of "crash"
in the output
and not trigger an error in pop()
unless crashes_max
is reached.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
If not task is currently completed, pop()
will attempt to auto-scale workers as needed.
If there is no task to collect, return NULL
. Otherwise,
return a one-row tibble
with the following columns.
name
: the task name.
command
: a character string with the R command.
result
: a list containing the return value of the R command.
NA
if the task failed.
status
: a character string. "success"
if the task succeeded,
"cancel"
if the task was canceled with
the cancel()
controller method,
"crash"
if the worker running the task exited before
it could complete the task, or "error"
for any other kind of error.
error
: the first 2048 characters of the error message if
the task status is not "success"
, NA
otherwise.
Messages for crashes and cancellations are captured here
alongside ordinary R-level errors.
code
: an integer code denoting the specific exit status:
0
for successful tasks, -1
for tasks with an error in the R
command of the task, and another positive integer with an NNG
status code if there is an error at the NNG/nanonext
level.
nanonext::nng_error()
can interpret these codes.
trace
: the first 2048 characters of the text of the traceback
if the task threw an error, NA
otherwise.
warnings
: the first 2048 characters. of the text of
warning messages that the task may have generated, NA
otherwise.
seconds
: number of seconds that the task ran.
seed
: the single integer originally supplied to push()
,
NA
otherwise. The pseudo-random number generator state
just prior to the task can be restored using
set.seed(seed = seed, kind = algorithm)
, where seed
and
algorithm
are part of this output.
algorithm
: name of the pseudo-random number generator algorithm
originally supplied to push()
,
NA
otherwise. The pseudo-random number generator state
just prior to the task can be restored using
set.seed(seed = seed, kind = algorithm)
, where seed
and
algorithm
are part of this output.
controller
: name of the crew
controller where the task ran.
worker
: name of the crew
worker that ran the task.
collect()
Pop all available task results and return them in a tidy
tibble
.
crew_class_controller$collect( scale = TRUE, throttle = TRUE, error = NULL, controllers = NULL )
scale
Logical of length 1,
whether to automatically call scale()
to auto-scale workers to meet the demand of the task load.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
error
NULL
or character of length 1, choice of action if
the popped task threw an error. Possible values:
* "stop"
: throw an error in the main R session instead of
returning a value.
* "warn"
: throw a warning.
* NULL
or "silent"
: do not react to errors.
NOTE: the only kinds of errors considered here are errors at the R
level. A crashed tasks will return a status of "crash"
in the output
and not trigger an error in collect()
unless crashes_max
is reached.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
A tibble
of results and metadata of all resolved tasks,
with one row per task. Returns NULL
if there are no tasks
to collect. See pop()
for details on the columns of the
returned tibble
.
wait()
Wait for tasks.
crew_class_controller$wait( mode = "all", seconds_interval = NULL, seconds_timeout = Inf, scale = TRUE, throttle = TRUE, controllers = NULL )
mode
Character string, name of the waiting condition.
wait(mode = "all")
waits until all tasks in the mirai
compute profile resolve, and
wait(mode = "one")
waits until at least one task is available
to push()
or collect()
from the controller.
The former still works if the controller is not the only
means of submitting tasks to the compute profile,
whereas the latter assumes only the controller submits tasks.
seconds_interval
Deprecated on 2025-01-17 (crew
version
0.10.2.9003). Instead, the seconds_interval
argument passed
to crew_controller_group()
is used as seconds_max
in a crew_throttle()
object which orchestrates exponential
backoff.
seconds_timeout
Timeout length in seconds waiting for tasks.
scale
Logical, whether to automatically call scale()
to auto-scale workers to meet the demand of the task load.
See also the throttle
argument.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
The wait()
method blocks the calling R session
until the condition in the mode
argument is met.
During the wait, wait()
iteratively auto-scales the workers.
A logical of length 1, invisibly.
wait(mode = "all")
returns TRUE
if all tasks in the mirai
compute profile have resolved (FALSE
otherwise).
wait(mode = "one")
returns TRUE
if the controller is ready
to pop or collect at least one resolved task (FALSE
otherwise).
wait(mode = "one")
assumes all
tasks were submitted through the controller and not by other means.
push_backlog()
Push the name of a task to the backlog.
crew_class_controller$push_backlog(name, controller = NULL)
name
Character of length 1 with the task name to push to the backlog.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
pop_backlog()
pops the tasks that can be pushed
without saturating the controller.
NULL
(invisibly).
pop_backlog()
Pop the task names from the head of the backlog which can be pushed without saturating the controller.
crew_class_controller$pop_backlog(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Character vector of task names which can be pushed to the
controller without saturating it. If the controller is saturated,
character(0L)
is returned.
summary()
Summarize the collected tasks of the controller.
crew_class_controller$summary(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
A data frame of cumulative summary statistics on the tasks
collected through pop()
and collect()
.
It has one row and the following columns:
controller
: name of the controller.
seconds
: total number of runtime in seconds.
tasks
: total number of tasks collected.
success
: total number of collected tasks that did not crash
or error.
error
: total number of tasks with errors, either in the R code
of the task or an NNG-level error that is not a cancellation
or crash.
crash
: total number of crashed tasks (where the worker exited
unexpectedly while it was running the task).
cancel
: total number of tasks interrupted with the cancel()
controller method.
warning
: total number of tasks with one or more warnings.
cancel()
Cancel one or more tasks.
crew_class_controller$cancel(names = character(0L), all = FALSE)
names
Character vector of names of tasks to cancel.
Those names must have been manually supplied by push()
.
all
TRUE
to cancel all tasks, FALSE
otherwise.
all = TRUE
supersedes the names
argument.
NULL
(invisibly).
pids()
Deprecated on 2025-08-26 in crew
version 1.2.1.9005.
crew_class_controller$pids(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
The integer process ID of the current process.
terminate()
Terminate the workers and the mirai
client.
crew_class_controller$terminate(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
NULL
(invisibly).
Other controller:
crew_controller()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() launcher <- crew_launcher_local() controller <- crew_controller(client = client, launcher = launcher) controller$start() controller$push(name = "task", command = sqrt(4)) controller$wait() controller$pop() controller$terminate() } ## ------------------------------------------------ ## Method `crew_class_controller$new` ## ------------------------------------------------ if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() launcher <- crew_launcher_local() controller <- crew_controller(client = client, launcher = launcher) controller$start() controller$push(name = "task", command = sqrt(4)) controller$wait() controller$pop() controller$terminate() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() launcher <- crew_launcher_local() controller <- crew_controller(client = client, launcher = launcher) controller$start() controller$push(name = "task", command = sqrt(4)) controller$wait() controller$pop() controller$terminate() } ## ------------------------------------------------ ## Method `crew_class_controller$new` ## ------------------------------------------------ if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() launcher <- crew_launcher_local() controller <- crew_controller(client = client, launcher = launcher) controller$start() controller$push(name = "task", command = sqrt(4)) controller$wait() controller$pop() controller$terminate() }
R6
class for controller groups.
controllers
List of R6
controller objects.
relay
Relay object for event-driven programming on a downstream condition variable.
new()
Multi-controller constructor.
crew_class_controller_group$new(controllers = NULL, relay = NULL)
controllers
List of R6
controller objects.
relay
Relay object for event-driven programming on a downstream condition variable.
An R6
object with the controller group object.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { persistent <- crew_controller_local(name = "persistent") transient <- crew_controller_local( name = "transient", tasks_max = 1L ) group <- crew_controller_group(persistent, transient) group$start() group$push(name = "task", command = sqrt(4), controller = "transient") group$wait() group$pop() group$terminate() }
validate()
Validate the client.
crew_class_controller_group$validate()
NULL
(invisibly).
size()
Number of tasks in the selected controllers.
crew_class_controller_group$size(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
Non-negative integer, number of tasks in the controller.
empty()
See if the controllers are empty.
crew_class_controller_group$empty(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push()
.
TRUE
if all the selected controllers are empty,
FALSE
otherwise.
nonempty()
Check if the controller group is nonempty.
crew_class_controller_group$nonempty(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push()
.
TRUE
if the controller is empty, FALSE
otherwise.
resolved()
Number of resolved mirai()
tasks.
crew_class_controller_group$resolved(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
resolved()
is cumulative: it counts all the resolved
tasks over the entire lifetime of the controller session.
Non-negative integer of length 1,
number of resolved mirai()
tasks.
The return value is 0 if the condition variable does not exist
(i.e. if the client is not running).
unresolved()
Number of unresolved mirai()
tasks.
crew_class_controller_group$unresolved(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
Non-negative integer of length 1,
number of unresolved mirai()
tasks.
saturated()
Check if a controller is saturated.
crew_class_controller_group$saturated( collect = NULL, throttle = NULL, controller = NULL )
collect
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttle
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
controller
Character vector of length 1 with the controller name.
Set to NULL
to select the default controller that push()
would choose.
A controller is saturated if the number of uncollected tasks
is greater than or equal to the maximum number of workers.
You can still push tasks to a saturated controller, but
tools that use crew
such as targets
may choose not to
(for performance and user-friendliness).
TRUE
if all the selected controllers are saturated,
FALSE
otherwise.
start()
Start one or more controllers.
crew_class_controller_group$start(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
NULL
(invisibly).
started()
Check whether all the given controllers are started.
crew_class_controller_group$started(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
Actually checks whether all the given clients are started.
TRUE
if the controllers are started, FALSE
if any are not.
launch()
Launch one or more workers on one or more controllers.
crew_class_controller_group$launch(n = 1L, controllers = NULL)
n
Number of workers to launch in each controller selected.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
NULL
(invisibly).
scale()
Automatically scale up the number of workers if needed in one or more controller objects.
crew_class_controller_group$scale(throttle = TRUE, controllers = NULL)
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
See the scale()
method in individual controller classes.
Invisibly returns TRUE
if there was any relevant
auto-scaling activity (new worker launches or worker
connection/disconnection events) (FALSE
otherwise).
autoscale()
Run worker auto-scaling in a later
loop.
crew_class_controller_group$autoscale( loop = later::current_loop(), controllers = NULL )
loop
A later
loop to run auto-scaling.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
NULL
(invisibly).
descale()
Terminate the auto-scaling loop started by
controller$autoscale()
.
crew_class_controller_group$descale(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
NULL
(invisibly).
crashes()
Report the number of consecutive crashes of a task, summed over all selected controllers in the group.
crew_class_controller_group$crashes(name, controllers = NULL)
name
Character string, name of the task to check.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
See the crashes_max
argument of crew_controller()
.
Number of consecutive crashes of the named task, summed over all the controllers in the group.
push()
Push a task to the head of the task list.
crew_class_controller_group$push( command, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, scale = TRUE, throttle = TRUE, name = NULL, save_command = NULL, controller = NULL )
command
Language object with R code to run.
data
Named list of local data objects in the evaluation environment.
globals
Named list of objects to temporarily assign to the
global environment for the task. See the reset_globals
argument of crew_controller_local()
.
substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of the
command
argument. If TRUE
(default) then command
is quoted
literally as you write it, e.g.
push(command = your_function_call())
. If FALSE
, then crew
assumes command
is a language object and you are passing its
value, e.g. push(command = quote(your_function_call()))
.
substitute = TRUE
is appropriate for interactive use,
whereas substitute = FALSE
is meant for automated R programs
that invoke crew
controllers.
seed
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed
argument of set.seed()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
algorithm
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind
argument of RNGkind()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the lib.loc
argument of require()
.
seconds_timeout
Optional task timeout passed to the .timeout
argument of mirai::mirai()
(after converting to milliseconds).
scale
Logical, whether to automatically scale workers to meet
demand. See the scale
argument of the push()
method of
ordinary single controllers.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
name
Character string, name of the task. If NULL
,
a random name is automatically generated.
The task name must not conflict with an existing task
in the controller where it is submitted.
To reuse the name, wait for the existing task
to finish, then either pop()
or collect()
it
to remove it from its controller.
save_command
Deprecated on 2025-01-22
(crew
version 0.10.2.9004).
controller
Character of length 1,
name of the controller to submit the task.
If NULL
, the controller defaults to the
first controller in the list.
Invisibly return the mirai
object of the pushed task.
This allows you to interact with the task directly, e.g.
to create a promise object with promises::as.promise()
.
walk()
Apply a single command to multiple inputs, and return control to the user without waiting for any task to complete.
crew_class_controller_group$walk( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, names = NULL, save_command = NULL, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over.
For example, to run function calls
f(x = 1, y = "a")
and f(x = 2, y = "b")
,
set command
to f(x, y)
, and set iterate
to
list(x = c(1, 2), y = c("a", "b"))
. The individual
function calls are evaluated as
f(x = iterate$x[[1]], y = iterate$y[[1]])
and
f(x = iterate$x[[2]], y = iterate$y[[2]])
.
All the elements of iterate
must have the same length.
If there are any name conflicts between iterate
and data
,
iterate
takes precedence.
data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily
assign to the global environment for each task. This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals
argument of crew_controller_local()
.
Objects in this list are treated as single
values and are held constant for each iteration of the map.
substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of the
command
argument. If TRUE
(default) then command
is quoted
literally as you write it, e.g.
push(command = your_function_call())
. If FALSE
, then crew
assumes command
is a language object and you are passing its
value, e.g. push(command = quote(your_function_call()))
.
substitute = TRUE
is appropriate for interactive use,
whereas substitute = FALSE
is meant for automated R programs
that invoke crew
controllers.
seed
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed
argument of set.seed()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
algorithm
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind
argument of RNGkind()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the lib.loc
argument of require()
.
seconds_timeout
Optional task timeout passed to the .timeout
argument of mirai::mirai()
(after converting to milliseconds).
names
Optional character of length 1, name of the element of
iterate
with names for the tasks. If names
is supplied,
then iterate[[names]]
must be a character vector.
save_command
Deprecated on 2025-01-22
(crew
version 0.10.2.9004).
verbose
Logical of length 1, whether to print to a progress bar when pushing tasks.
scale
Logical, whether to automatically scale workers to meet
demand. See also the throttle
argument.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
controller
Character of length 1,
name of the controller to submit the tasks.
If NULL
, the controller defaults to the
first controller in the list.
In contrast to walk()
, map()
blocks the local R session
and waits for all tasks to complete.
Invisibly returns a list of mirai
task objects for the
newly created tasks. The order of tasks in the list matches the
order of data in the iterate
argument.
map()
Apply a single command to multiple inputs.
crew_class_controller_group$map( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_interval = NULL, seconds_timeout = NULL, names = NULL, save_command = NULL, error = "stop", warnings = TRUE, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over.
For example, to run function calls
f(x = 1, y = "a")
and f(x = 2, y = "b")
,
set command
to f(x, y)
, and set iterate
to
list(x = c(1, 2), y = c("a", "b"))
. The individual
function calls are evaluated as
f(x = iterate$x[[1]], y = iterate$y[[1]])
and
f(x = iterate$x[[2]], y = iterate$y[[2]])
.
All the elements of iterate
must have the same length.
If there are any name conflicts between iterate
and data
,
iterate
takes precedence.
data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily
assign to the global environment for each task. This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals
argument of crew_controller_local()
.
Objects in this list are treated as single
values and are held constant for each iteration of the map.
substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of the
command
argument. If TRUE
(default) then command
is quoted
literally as you write it, e.g.
push(command = your_function_call())
. If FALSE
, then crew
assumes command
is a language object and you are passing its
value, e.g. push(command = quote(your_function_call()))
.
substitute = TRUE
is appropriate for interactive use,
whereas substitute = FALSE
is meant for automated R programs
that invoke crew
controllers.
seed
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed
argument of set.seed()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
algorithm
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind
argument of RNGkind()
if not NULL
.
If algorithm
and seed
are both NULL
,
then the random number generator defaults to the
recommended widely spaced worker-specific
L'Ecuyer streams as supported by mirai::nextstream()
.
See vignette("parallel", package = "parallel")
for details.
packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the lib.loc
argument of require()
.
seconds_interval
Deprecated on 2025-01-17 (crew
version
0.10.2.9003). Instead, the seconds_interval
argument passed
to crew_controller_group()
is used as seconds_max
in a crew_throttle()
object which orchestrates exponential
backoff.
seconds_timeout
Optional task timeout passed to the .timeout
argument of mirai::mirai()
(after converting to milliseconds).
names
Optional character of length 1, name of the element of
iterate
with names for the tasks. If names
is supplied,
then iterate[[names]]
must be a character vector.
save_command
Deprecated on 2025-01-22
(crew
version 0.10.2.9004).
error
Character vector of length 1, choice of action if a task has an error. Possible values:
"stop"
: throw an error in the main R session instead of returning
a value. In case of an error, the results from the last errored
map()
are in the error
field
of the controller, e.g. controller_object$error
. To reduce
memory consumption, set controller_object$error <- NULL
after
you are finished troubleshooting.
"warn"
: throw a warning. This allows the return value with
all the error messages and tracebacks to be generated.
"silent"
: do nothing special.
warnings
Logical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.
verbose
Logical of length 1, whether to print progress messages.
scale
Logical, whether to automatically scale workers to meet
demand. See also the throttle
argument.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
controller
Character of length 1,
name of the controller to submit the tasks.
If NULL
, the controller defaults to the
first controller in the list.
The idea comes from functional programming: for example,
the map()
function from the purrr
package.
A tibble
of results and metadata: one row per task and
columns corresponding to the output of pop()
.
pop()
Pop a completed task from the results data frame.
crew_class_controller_group$pop( scale = TRUE, collect = NULL, throttle = TRUE, error = NULL, controllers = NULL )
scale
Logical, whether to automatically scale workers to meet
demand. See the scale
argument of the pop()
method of
ordinary single controllers.
collect
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
error
NULL
or character of length 1, choice of action if
the popped task threw an error. Possible values:
"stop"
: throw an error in the main R session instead of returning
a value.
"warn"
: throw a warning.
NULL
or "silent"
: do not react to errors.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
If there is no task to collect, return NULL
. Otherwise,
return a one-row tibble
with the same columns as pop()
for ordinary controllers.
collect()
Pop all available task results and return them in a tidy
tibble
.
crew_class_controller_group$collect( scale = TRUE, throttle = TRUE, error = NULL, controllers = NULL )
scale
Logical of length 1,
whether to automatically call scale()
to auto-scale workers to meet the demand of the task load.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
error
NULL
or character of length 1, choice of action if
the popped task threw an error. Possible values:
"stop"
: throw an error in the main R session instead of returning
a value.
"warn"
: throw a warning.
NULL
or "silent"
: do not react to errors.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
A tibble
of results and metadata of all resolved tasks,
with one row per task. Returns NULL
if there are no available
results.
wait()
Wait for tasks.
crew_class_controller_group$wait( mode = "all", seconds_interval = NULL, seconds_timeout = Inf, scale = TRUE, throttle = TRUE, controllers = NULL )
mode
Character string, name of the waiting condition.
wait(mode = "all")
waits until all tasks in the mirai
compute profile resolve, and
wait(mode = "one")
waits until at least one task is available
to push()
or collect()
from the controller.
The former still works if the controller is not the only
means of submitting tasks to the compute profile,
whereas the latter assumes only the controller submits tasks.
seconds_interval
Deprecated on 2025-01-17 (crew
version
0.10.2.9003). Instead, the seconds_interval
argument passed
to crew_controller_group()
is used as seconds_max
in a crew_throttle()
object which orchestrates exponential
backoff.
seconds_timeout
Timeout length in seconds waiting for results to become available.
scale
Logical of length 1, whether to call scale_later()
on each selected controller to schedule auto-scaling.
See the scale
argument of the wait()
method of
ordinary single controllers.
throttle
TRUE
to skip auto-scaling if it already happened
within the last polling interval. FALSE
to auto-scale
every time scale()
is called. Throttling avoids
overburdening the mirai
dispatcher and other resources.
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
The wait()
method blocks the calling R session
until the condition in the mode
argument is met.
During the wait, wait()
iteratively auto-scales the workers.
A logical of length 1, invisibly.
wait(mode = "all")
returns TRUE
if all tasks in the mirai
compute profile have resolved (FALSE
otherwise).
wait(mode = "one")
returns TRUE
if the controller is ready
to pop or collect at least one resolved task (FALSE
otherwise).
wait(mode = "one")
assumes all
tasks were submitted through the controller and not by other means.
push_backlog()
Push the name of a task to the backlog.
crew_class_controller_group$push_backlog(name, controller = NULL)
name
Character of length 1 with the task name to push to the backlog.
controller
Character vector of length 1 with the controller name.
Set to NULL
to select the default controller that push_backlog()
would choose.
pop_backlog()
pops the tasks that can be pushed
without saturating the controller.
NULL
(invisibly).
pop_backlog()
Pop the task names from the head of the backlog which can be pushed without saturating the controller.
crew_class_controller_group$pop_backlog(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
Character vector of task names which can be pushed to the
controller without saturating it. If the controller is saturated,
character(0L)
is returned.
summary()
Summarize the workers of one or more controllers.
crew_class_controller_group$summary(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
A data frame of aggregated worker summary statistics
of all the selected controllers. It has one row per worker,
and the rows are grouped by controller.
See the documentation of the summary()
method of the controller
class for specific information about the columns in the output.
pids()
Deprecated on 2025-08-26 in crew
version 1.2.1.9005.
crew_class_controller_group$pids(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
The integer process ID of the current process.
terminate()
Terminate the workers and disconnect the client for one or more controllers.
crew_class_controller_group$terminate(controllers = NULL)
controllers
Character vector of controller names.
Set to NULL
to select all controllers.
NULL
(invisibly).
Other controller_group:
crew_controller_group()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { persistent <- crew_controller_local(name = "persistent") transient <- crew_controller_local( name = "transient", tasks_max = 1L ) group <- crew_controller_group(persistent, transient) group$start() group$push(name = "task", command = sqrt(4), controller = "transient") group$wait() group$pop() group$terminate() } ## ------------------------------------------------ ## Method `crew_class_controller_group$new` ## ------------------------------------------------ if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { persistent <- crew_controller_local(name = "persistent") transient <- crew_controller_local( name = "transient", tasks_max = 1L ) group <- crew_controller_group(persistent, transient) group$start() group$push(name = "task", command = sqrt(4), controller = "transient") group$wait() group$pop() group$terminate() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { persistent <- crew_controller_local(name = "persistent") transient <- crew_controller_local( name = "transient", tasks_max = 1L ) group <- crew_controller_group(persistent, transient) group$start() group$push(name = "task", command = sqrt(4), controller = "transient") group$wait() group$pop() group$terminate() } ## ------------------------------------------------ ## Method `crew_class_controller_group$new` ## ------------------------------------------------ if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { persistent <- crew_controller_local(name = "persistent") transient <- crew_controller_local( name = "transient", tasks_max = 1L ) group <- crew_controller_group(persistent, transient) group$start() group$push(name = "task", command = sqrt(4), controller = "transient") group$wait() group$pop() group$terminate() }
R6
class for sequential controllers.
See crew_controller_sequential()
.
crew::crew_class_controller
-> crew_class_controller_sequential
crew::crew_class_controller$collect()
crew::crew_class_controller$crashes()
crew::crew_class_controller$empty()
crew::crew_class_controller$initialize()
crew::crew_class_controller$map()
crew::crew_class_controller$nonempty()
crew::crew_class_controller$pids()
crew::crew_class_controller$pop()
crew::crew_class_controller$size()
crew::crew_class_controller$started()
crew::crew_class_controller$summary()
crew::crew_class_controller$validate()
crew::crew_class_controller$walk()
resolved()
Number of resolved tasks.
crew_class_controller_sequential$resolved(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
resolved()
is cumulative: it counts all the resolved
tasks over the entire lifetime of the controller session.
For the sequential controller, tasks are resolved as soon as they
are pushed.
Non-negative integer of length 1, number of resolved tasks. The return value is 0 if the condition variable does not exist (i.e. if the client is not running).
unresolved()
Number of unresolved tasks.
crew_class_controller_sequential$unresolved(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns 0 always because the sequential controller resolves tasks as soon as they are pushed.
saturated()
Check if the controller is saturated.
crew_class_controller_sequential$saturated( collect = NULL, throttle = NULL, controller = NULL )
collect
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttle
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Always returns FALSE
for the sequential controller
because tasks run immediately on the local process and there
are no workers.
start()
Start the controller if it is not already started.
crew_class_controller_sequential$start(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
For the sequential controller, there is nothing to do except register the client as started.
NULL
(invisibly).
launch()
Does nothing for the sequential controller.
crew_class_controller_sequential$launch(n = 1L, controllers = NULL)
n
Number of workers to launch.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
NULL
(invisibly).
scale()
Does nothing for the sequential controller.
crew_class_controller_sequential$scale(throttle = TRUE, controllers = NULL)
throttle
Not applicable to the sequential controller.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Invisibly returns FALSE
.
autoscale()
Not applicable to the sequential controller.
crew_class_controller_sequential$autoscale(loop = NULL, controllers = NULL)
loop
Not used by sequential controllers. Included to ensure the signature is compatible with the analogous method of controller groups.
controllers
Not used by sequential controllers. Included to ensure the signature is compatible with the analogous method of controller groups.
descale()
Not applicable to the sequential controller.
crew_class_controller_sequential$descale(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
NULL
(invisibly).
push()
Push a task to the head of the task list.
crew_class_controller_sequential$push( command, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, scale = TRUE, throttle = TRUE, name = NULL, save_command = NULL, controller = NULL )
command
Language object with R code to run.
data
Named list of local data objects in the evaluation environment.
globals
Named list of objects to temporarily assign to the
global environment for the task.
This list should
include any functions you previously defined in the global
environment which are required to run tasks.
See the reset_globals
argument
of crew_controller_local()
.
substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of the
command
argument. If TRUE
(default) then command
is quoted
literally as you write it, e.g.
push(command = your_function_call())
. If FALSE
, then crew
assumes command
is a language object and you are passing its
value, e.g. push(command = quote(your_function_call()))
.
substitute = TRUE
is appropriate for interactive use,
whereas substitute = FALSE
is meant for automated R programs
that invoke crew
controllers.
seed
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
seed
argument of set.seed()
if not NULL
.
If algorithm
and seed
are both NULL
for the sequential
controller, then the random number generator defaults to the
current RNG of the local R session where the sequential
controller lives.
algorithm
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the kind
argument of RNGkind()
if not NULL
.
If algorithm
and seed
are both NULL
for the sequential
controller, then the random number generator defaults to the
current RNG of the local R session where the sequential
controller lives.
packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the lib.loc
argument of require()
.
seconds_timeout
Not used in the sequential controller..
scale
Not used in the sequential controller.
throttle
Not used in the sequential controller.
name
Character string, name of the task. If NULL
, then
a random name is generated automatically.
The name of the task must not conflict with the name of another
task pushed to the controller. Any previous task with the same name
must first be popped before a new task with that name can be pushed.
save_command
Deprecated on 2025-01-22 (crew
version
0.10.2.9004) and no longer used.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Invisibly returns a mirai
-like list where the data
element is the result of the task.
wait()
Not applicable to the sequential controller.
crew_class_controller_sequential$wait( mode = "all", seconds_interval = NULL, seconds_timeout = Inf, scale = TRUE, throttle = TRUE, controllers = NULL )
mode
Not applicable to the sequential controller.
seconds_interval
Not applicable to the sequential controller.
seconds_timeout
Not applicable to the sequential controller.
scale
Not applicable to the sequential controller.
throttle
Not applicable to the sequential controller.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Always returns TRUE
(invisibly)
for the sequential controller.
push_backlog()
Not applicable to the sequential controller.
crew_class_controller_sequential$push_backlog(name, controller = NULL)
name
Character of length 1 with the task name to push to the backlog.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
NULL
(invisibly).
pop_backlog()
Not applicable to the sequential controller.
crew_class_controller_sequential$pop_backlog(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Always character(0L)
for the sequential controller.
cancel()
Not applicable to the sequential controller.
crew_class_controller_sequential$cancel(names = character(0L), all = FALSE)
names
Not applicable to the sequential controller.
all
Not applicable to the sequential controller.
terminate()
Terminate the controller.
crew_class_controller_sequential$terminate(controllers = NULL)
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
NULL
(invisibly).
Other sequential controllers:
crew_controller_sequential()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { controller <- crew_controller_sequential() controller$push(name = "task", command = sqrt(4)) controller$pop() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { controller <- crew_controller_sequential() controller$push(name = "task", command = sqrt(4)) controller$pop() }
R6
abstract class to build other subclasses
which launch and manage workers.
async
Deprecated on 2025-08-27 (crew
version 1.2.1.9009).
name
See crew_launcher()
.
workers
See crew_launcher()
.
seconds_interval
See crew_launcher()
.
seconds_timeout
See crew_launcher()
.
seconds_launch
See crew_launcher()
.
seconds_idle
See crew_launcher()
.
seconds_wall
See crew_launcher()
.
tasks_max
See crew_launcher()
.
tasks_timers
See crew_launcher()
.
tls
See crew_launcher()
.
r_arguments
See crew_launcher()
.
options_metrics
See crew_launcher()
.
url
Websocket URL for worker connections.
profile
mirai
compute profile of the launcher.
launches
Data frame tracking worker launches with one row per launch. Each launch may create more than one worker. Old superfluous rows are periodically discarded for efficiency.
throttle
A crew_throttle()
object to throttle scaling.
failed
Number of failed worker launches
(launches that exceed seconds_launch
seconds to dial in).
new()
Launcher constructor.
crew_class_launcher$new( name = NULL, workers = NULL, seconds_interval = NULL, seconds_timeout = NULL, seconds_launch = NULL, seconds_idle = NULL, seconds_wall = NULL, seconds_exit = NULL, tasks_max = NULL, tasks_timers = NULL, reset_globals = NULL, reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, crashes_error = NULL, launch_max = NULL, tls = NULL, processes = NULL, r_arguments = NULL, options_metrics = NULL )
name
See crew_launcher()
.
workers
See crew_launcher()
.
seconds_interval
See crew_launcher()
.
seconds_timeout
See crew_launcher()
.
seconds_launch
See crew_launcher()
.
seconds_idle
See crew_launcher()
.
seconds_wall
See crew_launcher()
.
seconds_exit
See crew_launcher()
.
tasks_max
See crew_launcher()
.
tasks_timers
See crew_launcher()
.
reset_globals
Deprecated. See crew_launcher()
.
reset_packages
Deprecated. See crew_launcher()
.
reset_options
Deprecated. See crew_launcher()
.
garbage_collection
Deprecated. See crew_launcher()
.
crashes_error
See crew_launcher()
.
launch_max
Deprecated.
tls
See crew_launcher()
.
processes
Deprecated on 2025-08-27 (crew
version 1.2.1.9009).
r_arguments
See crew_launcher()
.
options_metrics
See crew_launcher()
.
An R6
object with the launcher.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local() launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() }
validate()
Validate the launcher.
crew_class_launcher$validate()
NULL
(invisibly).
poll()
Poll the throttle.
crew_class_launcher$poll()
TRUE
to run whatever work comes next, FALSE
to skip
until the appropriate time.
settings()
List of arguments for mirai::daemon()
.
crew_class_launcher$settings()
List of arguments for mirai::daemon()
.
call()
Create a call to crew_worker()
to
help create custom launchers.
crew_class_launcher$call(worker = NULL)
worker
Deprecated on 2025-08-28 (crew
version 1.2.1.9009).
Character string with a call to crew_worker()
.
launcher <- crew_launcher_local() launcher$start(url = "tcp://127.0.0.1:57000", profile = "profile") launcher$call() launcher$terminate()
start()
Start the launcher.
crew_class_launcher$start(url = NULL, profile = NULL, sockets = NULL)
url
Character string, websocket URL for worker connections.
profile
Character string, mirai
compute profile.
sockets
Deprecated on 2025-01-28 (crew
version 1.0.0).
NULL
(invisibly).
terminate()
Terminate the whole launcher, including all workers.
crew_class_launcher$terminate()
NULL
(invisibly).
launch()
Launch a worker.
crew_class_launcher$launch(n = 1L)
n
Positive integer, number of workers to launch.
Handle of the launched worker.
launch_worker()
Abstract worker launch method.
crew_class_launcher$launch_worker(call)
call
Character of length 1 with a namespaced call to
crew_worker()
which will run in the worker and accept tasks.
Launcher plugins will overwrite this method.
A handle to mock the worker launch.
launch_workers()
Launch multiple workers.
crew_class_launcher$launch_workers(call, n)
call
Character of length 1 with a namespaced call to
crew_worker()
which will run in each worker and accept tasks.
n
Positive integer, number of workers to launch.
Launcher plugins may overwrite this method to launch multiple workers from a single system call.
A handle to mock the worker launch.
scale()
Auto-scale workers out to meet the demand of tasks.
crew_class_launcher$scale(status, throttle = NULL)
status
A mirai
status list with worker and task information.
throttle
Deprecated, only used in the controller
as of 2025-01-16 (crew
version 0.10.2.9003).
Invisibly returns TRUE
if there was any relevant
auto-scaling activity (new worker launches or worker
connection/disconnection events) (FALSE
otherwise).
terminate_workers()
Deprecated on 2025-08-26
(crew
version 1.2.1.9004).
crew_class_launcher$terminate_workers()
NULL
(invisibly).
crashes()
Deprecated on 2025-01-28 (crew
version 1.0.0).
crew_class_launcher$crashes(index = NULL)
index
Unused argument.
The integer 1, for compatibility.
set_name()
Deprecated on 2025-01-28 (crew
version 1.0.0).
crew_class_launcher$set_name(name)
name
Name to set for the launcher.
NULL
(invisibly).
clone()
The objects of this class are cloneable with this method.
crew_class_launcher$clone(deep = FALSE)
deep
Whether to make a deep clone.
Other launcher:
crew_launcher()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local() launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() } ## ------------------------------------------------ ## Method `crew_class_launcher$new` ## ------------------------------------------------ if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local() launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() } ## ------------------------------------------------ ## Method `crew_class_launcher$call` ## ------------------------------------------------ launcher <- crew_launcher_local() launcher$start(url = "tcp://127.0.0.1:57000", profile = "profile") launcher$call() launcher$terminate()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local() launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() } ## ------------------------------------------------ ## Method `crew_class_launcher$new` ## ------------------------------------------------ if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local() launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() } ## ------------------------------------------------ ## Method `crew_class_launcher$call` ## ------------------------------------------------ launcher <- crew_launcher_local() launcher$start(url = "tcp://127.0.0.1:57000", profile = "profile") launcher$call() launcher$terminate()
R6
class to launch and manage local process workers.
crew::crew_class_launcher
-> crew_class_launcher_local
options_local
crew::crew_class_launcher$call()
crew::crew_class_launcher$crashes()
crew::crew_class_launcher$launch()
crew::crew_class_launcher$launch_workers()
crew::crew_class_launcher$poll()
crew::crew_class_launcher$scale()
crew::crew_class_launcher$set_name()
crew::crew_class_launcher$settings()
crew::crew_class_launcher$start()
crew::crew_class_launcher$terminate()
crew::crew_class_launcher$terminate_workers()
new()
Local launcher constructor.
crew_class_launcher_local$new( name = NULL, workers = NULL, seconds_interval = NULL, seconds_timeout = NULL, seconds_launch = NULL, seconds_idle = NULL, seconds_wall = NULL, seconds_exit = NULL, tasks_max = NULL, tasks_timers = NULL, crashes_error = NULL, tls = NULL, processes = NULL, r_arguments = NULL, options_metrics = NULL, options_local = NULL )
name
See crew_launcher()
.
workers
See crew_launcher()
.
seconds_interval
See crew_launcher()
.
seconds_timeout
See crew_launcher()
.
seconds_launch
See crew_launcher()
.
seconds_idle
See crew_launcher()
.
seconds_wall
See crew_launcher()
.
seconds_exit
See crew_launcher()
.
tasks_max
See crew_launcher()
.
tasks_timers
See crew_launcher()
.
crashes_error
See crew_launcher()
.
tls
See crew_launcher()
.
processes
See crew_launcher()
.
r_arguments
See crew_launcher()
.
options_metrics
options_local
reset_globals
Deprecated. See crew_launcher()
.
reset_packages
Deprecated. See crew_launcher()
.
reset_options
Deprecated. See crew_launcher()
.
garbage_collection
Deprecated. See crew_launcher()
.
An R6
object with the local launcher.
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local(name = client$name) launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() }
validate()
Validate the local launcher.
crew_class_launcher_local$validate()
NULL
(invisibly).
launch_worker()
Launch a local process worker which will dial into a socket.
crew_class_launcher_local$launch_worker(call)
call
Character of length 1 with a namespaced call to
crew_worker()
which will run in the worker and accept tasks.
The call
argument is R code that will run to
initiate the worker. Together, the launcher
, worker
,
and instance
arguments are useful for
constructing informative job names.
A handle object to allow the termination of the worker later on.
clone()
The objects of this class are cloneable with this method.
crew_class_launcher_local$clone(deep = FALSE)
deep
Whether to make a deep clone.
Other plugin_local:
crew_controller_local()
,
crew_launcher_local()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local(name = client$name) launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() } ## ------------------------------------------------ ## Method `crew_class_launcher_local$new` ## ------------------------------------------------ if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local(name = client$name) launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local(name = client$name) launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() } ## ------------------------------------------------ ## Method `crew_class_launcher_local$new` ## ------------------------------------------------ if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local(name = client$name) launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() }
Local monitor R6
class
See crew_monitor_local()
.
dispatchers()
List the process IDs of the running mirai
dispatcher
processes.
crew_class_monitor_local$dispatchers(user = ps::ps_username())
user
Character of length 1, user ID to filter on. NULL
to list processes of all users (not recommended).
Integer vector of process IDs of the running mirai
dispatcher
processes.
daemons()
List the process IDs of the locally running mirai
daemon
processes which are not crew
workers.
crew_class_monitor_local$daemons(user = ps::ps_username())
user
Character of length 1, user ID to filter on. NULL
to list processes of all users (not recommended).
Integer vector of process IDs of the locally running
mirai
daemon processes which are not crew
workers.
workers()
List the process IDs of locally running crew
workers
launched by the local controller (crew_controller_local()
).
crew_class_monitor_local$workers(user = ps::ps_username())
user
Character of length 1, user ID to filter on. NULL
to list processes of all users (not recommended).
Only the workers running on your local computer are listed.
Workers that are not listed include jobs on job schedulers like
SLURM or jobs on cloud services like AWS Batch. To monitor
those worker processes, please consult the monitor objects in
the relevant third-party launcher plugins such as crew.cluster
and crew.aws.batch
.
Integer vector of process IDs of locally running crew
workers
launched by the local controller (crew_controller_local()
).
terminate()
Terminate the given process IDs.
crew_class_monitor_local$terminate(pids)
pids
Integer vector of process IDs of local processes to terminate.
Termination happens with the operating system signal
given by crew_terminate_signal()
.
NULL
(invisibly).
Other monitor:
crew_monitor_local()
R6
relay class.R6
class for relay configuration.
See crew_relay()
.
condition
Main condition variable.
from
Condition variable to relay from.
to
Condition variable to relay to.
throttle
A crew_throttle()
object for wait()
.
new()
Relay constructor.
crew_class_relay$new(throttle)
throttle
A crew_throttle()
object.
A crew_relay()
object.
validate()
Validate the object.
crew_class_relay$validate()
NULL
(invisibly).
start()
Start the relay object.
crew_class_relay$start()
NULL
(invisibly).
terminate()
Terminate the relay object.
crew_class_relay$terminate()
NULL
(invisibly).
set_from()
Set the condition variable to relay from.
crew_class_relay$set_from(from)
from
Condition variable to relay from.
NULL
(invisibly).
set_to()
Set the condition variable to relay to.
crew_class_relay$set_to(to)
to
Condition variable to relay to.
NULL
(invisibly).
wait()
Wait until an unobserved task resolves or the timeout is reached. Use the throttle to determine the waiting time.
crew_class_relay$wait()
NULL
(invisibly).
Other relay:
crew_relay()
crew_relay()
crew_relay()
R6
throttle class.R6
class for throttle configuration.
See crew_throttle()
.
seconds_max
See crew_throttle()
.
seconds_min
See crew_throttle()
.
seconds_start
See crew_throttle()
.
base
See crew_throttle()
.
seconds_interval
Current wait time interval.
polled
Positive numeric of length 1,
millisecond timestamp of the last time poll()
returned TRUE
.
NULL
if poll()
was never called on the current object.
new()
Throttle constructor.
crew_class_throttle$new( seconds_max = NULL, seconds_min = NULL, seconds_start = NULL, base = NULL )
seconds_max
See crew_throttle()
.
seconds_min
See crew_throttle()
.
seconds_start
See crew_throttle()
.
base
See crew_throttle()
.
An R6
object with throttle configuration.
throttle <- crew_throttle(seconds_max = 1) throttle$poll() throttle$poll()
validate()
Validate the object.
crew_class_throttle$validate()
NULL
(invisibly).
poll()
Poll the throttler.
crew_class_throttle$poll()
TRUE
if poll()
did not return TRUE
in the last
max
seconds, FALSE
otherwise.
accelerate()
Divide seconds_interval
by base
.
crew_class_throttle$accelerate()
NULL
(invisibly). Called for its side effects.
decelerate()
Multiply seconds_interval
by base
.
crew_class_throttle$decelerate()
NULL
(invisibly). Called for its side effects.
reset()
Reset the throttle object so the next poll()
returns
TRUE
, and reset the wait time interval to its initial value.
crew_class_throttle$reset()
NULL
(invisibly).
update()
Reset the throttle when there is activity and decelerate it gradually when there is no activity.
crew_class_throttle$update(activity)
activity
TRUE
if there is activity, FALSE
otherwise.
NULL
(invisibly).
Other throttle:
crew_throttle()
throttle <- crew_throttle(seconds_max = 1) throttle$poll() throttle$poll() ## ------------------------------------------------ ## Method `crew_class_throttle$new` ## ------------------------------------------------ throttle <- crew_throttle(seconds_max = 1) throttle$poll() throttle$poll()
throttle <- crew_throttle(seconds_max = 1) throttle$poll() throttle$poll() ## ------------------------------------------------ ## Method `crew_class_throttle$new` ## ------------------------------------------------ throttle <- crew_throttle(seconds_max = 1) throttle$poll() throttle$poll()
R6
TLS class.R6
class for TLS configuration.
See crew_tls()
.
mode
See crew_tls()
.
key
See crew_tls()
.
password
See crew_tls()
.
certificates
See crew_tls()
.
new()
TLS configuration constructor.
crew_class_tls$new( mode = NULL, key = NULL, password = NULL, certificates = NULL )
mode
Argument passed from crew_tls()
.
key
Argument passed from crew_tls()
.
password
Argument passed from crew_tls()
.
certificates
Argument passed from crew_tls()
.
An R6
object with TLS configuration.
crew_tls(mode = "automatic")
validate()
Validate the object.
crew_class_tls$validate(test = TRUE)
test
Logical of length 1, whether to test the TLS configuration
with nanonext::tls_config()
.
NULL
(invisibly).
client()
TLS credentials for the crew
client.
crew_class_tls$client()
NULL
or character vector, depending on the mode.
worker()
TLS credentials for crew
workers.
crew_class_tls$worker(profile)
profile
Character of length 1 with the mirai
compute profile.
NULL
or character vector, depending on the mode.
url()
Form the URL for crew
worker connections.
crew_class_tls$url(host, port)
host
Character string with the host name or IP address.
port
Non-negative integer with the port number (0 to let NNG select a random ephemeral port).
Character string with the URL.
Other tls:
crew_tls()
crew_tls(mode = "automatic") ## ------------------------------------------------ ## Method `crew_class_tls$new` ## ------------------------------------------------ crew_tls(mode = "automatic")
crew_tls(mode = "automatic") ## ------------------------------------------------ ## Method `crew_class_tls$new` ## ------------------------------------------------ crew_tls(mode = "automatic")
Deprecated on 2025-08-26 in crew
version 1.2.1.9006.
Please use crew_monitor_local()
instead.
crew_clean( dispatchers = TRUE, workers = TRUE, user = ps::ps_username(), seconds_interval = 0.25, seconds_timeout = 60, verbose = TRUE )
crew_clean( dispatchers = TRUE, workers = TRUE, user = ps::ps_username(), seconds_interval = 0.25, seconds_timeout = 60, verbose = TRUE )
dispatchers |
Logical of length 1, whether to terminate dispatchers. |
workers |
Logical of length 1, whether to terminate workers. |
user |
Character of length 1. Terminate dispatchers and/or workers associated with this user name. |
seconds_interval |
Seconds to wait between polling intervals waiting for a process to exit. |
seconds_timeout |
Seconds to wait for a process to exit. |
verbose |
Logical of length 1, whether to print an informative message every time a process is terminated. |
Behind the scenes, mirai
uses an external R process
called a "dispatcher" to send tasks to crew
workers.
This dispatcher usually shuts down when you terminate the controller
or quit your R session, but sometimes it lingers. Likewise,
sometimes crew
workers do not shut down on their own.
The crew_clean()
function searches the process table on your
local machine and manually terminates any mirai
dispatchers
and crew
workers associated with your user name (or the
user name you select in the user
argument.
Unfortunately, it cannot reach remote workers such as those
launched by a crew.cluster
controller.
NULL
(invisibly). If verbose
is TRUE
, it does
print out a message for every terminated process.
Other utility:
crew_assert()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { crew_clean() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { crew_clean() }
Create an R6
wrapper object to manage the mirai
client.
crew_client( name = NULL, workers = NULL, host = NULL, port = NULL, serialization = NULL, profile = crew::crew_random_name(), tls = crew::crew_tls(), tls_enable = NULL, tls_config = NULL, seconds_interval = 0.25, seconds_timeout = 60, retry_tasks = NULL )
crew_client( name = NULL, workers = NULL, host = NULL, port = NULL, serialization = NULL, profile = crew::crew_random_name(), tls = crew::crew_tls(), tls_enable = NULL, tls_config = NULL, seconds_interval = 0.25, seconds_timeout = 60, retry_tasks = NULL )
name |
Deprecated on 2025-01-14 ( |
workers |
Deprecated on 2025-01-13 ( |
host |
IP address of the |
port |
TCP port to listen for the workers. If |
serialization |
Either |
profile |
Character string, compute profile for |
tls |
A TLS configuration object from |
tls_enable |
Deprecated on 2023-09-15 in version 0.4.1.
Use argument |
tls_config |
Deprecated on 2023-09-15 in version 0.4.1.
Use argument |
seconds_interval |
Number of seconds between
polling intervals waiting for certain internal
synchronous operations to complete,
such as checking |
seconds_timeout |
Number of seconds until timing
out while waiting for certain synchronous operations to complete,
such as checking |
retry_tasks |
Deprecated on 2025-01-13 ( |
Other client:
crew_class_client
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() client$summary() client$terminate() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() client$summary() client$terminate() }
This function is for developers of crew
launcher plugins.
Users should use a specific controller helper such as
crew_controller_local()
.
crew_controller( client, launcher, reset_globals = TRUE, reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, crashes_max = 5L, backup = NULL, auto_scale = NULL )
crew_controller( client, launcher, reset_globals = TRUE, reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, crashes_max = 5L, backup = NULL, auto_scale = NULL )
client |
An |
launcher |
An |
reset_globals |
|
reset_packages |
|
reset_options |
|
garbage_collection |
|
crashes_max |
In rare cases, a worker may exit unexpectedly
before it completes its current task. If this happens,
|
backup |
An optional Limitations of |
auto_scale |
Deprecated. Use the |
Other controller:
crew_class_controller
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() launcher <- crew_launcher_local() controller <- crew_controller(client = client, launcher = launcher) controller$start() controller$push(name = "task", command = sqrt(4)) controller$wait() controller$pop() controller$terminate() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() launcher <- crew_launcher_local() controller <- crew_controller(client = client, launcher = launcher) controller$start() controller$push(name = "task", command = sqrt(4)) controller$wait() controller$pop() controller$terminate() }
Create an R6
object to submit tasks and launch workers
through multiple crew
controllers.
crew_controller_group(..., seconds_interval = 0.25)
crew_controller_group(..., seconds_interval = 0.25)
... |
|
seconds_interval |
Number of seconds between
polling intervals waiting for certain internal
synchronous operations to complete,
such as checking |
Other controller_group:
crew_class_controller_group
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { persistent <- crew_controller_local(name = "persistent") transient <- crew_controller_local( name = "transient", tasks_max = 1L ) group <- crew_controller_group(persistent, transient) group$start() group$push(name = "task", command = sqrt(4), controller = "transient") group$wait() group$pop() group$terminate() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { persistent <- crew_controller_local(name = "persistent") transient <- crew_controller_local( name = "transient", tasks_max = 1L ) group <- crew_controller_group(persistent, transient) group$start() group$push(name = "task", command = sqrt(4), controller = "transient") group$wait() group$pop() group$terminate() }
Create an R6
object to submit tasks and
launch workers on local processes.
crew_controller_local( name = NULL, workers = 1L, host = "127.0.0.1", port = NULL, tls = crew::crew_tls(), tls_enable = NULL, tls_config = NULL, serialization = NULL, profile = crew::crew_random_name(), seconds_interval = 0.25, seconds_timeout = 60, seconds_launch = 30, seconds_idle = 300, seconds_wall = Inf, seconds_exit = NULL, retry_tasks = NULL, tasks_max = Inf, tasks_timers = 0L, reset_globals = TRUE, reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, crashes_error = NULL, launch_max = NULL, r_arguments = c("--no-save", "--no-restore"), crashes_max = 5L, backup = NULL, options_metrics = crew::crew_options_metrics(), options_local = crew::crew_options_local(), local_log_directory = NULL, local_log_join = NULL )
crew_controller_local( name = NULL, workers = 1L, host = "127.0.0.1", port = NULL, tls = crew::crew_tls(), tls_enable = NULL, tls_config = NULL, serialization = NULL, profile = crew::crew_random_name(), seconds_interval = 0.25, seconds_timeout = 60, seconds_launch = 30, seconds_idle = 300, seconds_wall = Inf, seconds_exit = NULL, retry_tasks = NULL, tasks_max = Inf, tasks_timers = 0L, reset_globals = TRUE, reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE, crashes_error = NULL, launch_max = NULL, r_arguments = c("--no-save", "--no-restore"), crashes_max = 5L, backup = NULL, options_metrics = crew::crew_options_metrics(), options_local = crew::crew_options_local(), local_log_directory = NULL, local_log_join = NULL )
name |
Character string, name of the launcher. If the name is
|
workers |
Maximum number of workers to run concurrently
when auto-scaling, excluding task retries and manual calls to |
host |
IP address of the |
port |
TCP port to listen for the workers. If |
tls |
A TLS configuration object from |
tls_enable |
Deprecated on 2023-09-15 in version 0.4.1.
Use argument |
tls_config |
Deprecated on 2023-09-15 in version 0.4.1.
Use argument |
serialization |
Either |
profile |
Character string, compute profile for |
seconds_interval |
Number of seconds between
polling intervals waiting for certain internal
synchronous operations to complete. In certain cases, exponential
backoff is used with this argument passed to |
seconds_timeout |
Number of seconds until timing
out while waiting for certain synchronous operations to complete,
such as checking |
seconds_launch |
Seconds of startup time to allow.
A worker is unconditionally assumed to be alive
from the moment of its launch until |
seconds_idle |
Maximum number of seconds that a worker can idle
since the completion of the last task. If exceeded, the worker exits.
But the timer does not launch until |
seconds_wall |
Soft wall time in seconds.
The timer does not launch until |
seconds_exit |
Deprecated on 2023-09-21 in version 0.5.0.9002. No longer necessary. |
retry_tasks |
Deprecated on 2025-01-13 ( |
tasks_max |
Maximum number of tasks that a worker will do before exiting. Also determines how often the controller auto-scales. See the Auto-scaling section for details. |
tasks_timers |
Number of tasks to do before activating
the timers for |
reset_globals |
|
reset_packages |
|
reset_options |
|
garbage_collection |
|
crashes_error |
Deprecated on 2025-01-13 ( |
launch_max |
Deprecated on 2024-11-04 ( |
r_arguments |
Optional character vector of command line arguments
to pass to |
crashes_max |
In rare cases, a worker may exit unexpectedly
before it completes its current task. If this happens,
|
backup |
An optional Limitations of |
options_metrics |
Either |
options_local |
An object generated by |
local_log_directory |
Deprecated on 2024-10-08. Use
|
local_log_join |
Deprecated on 2024-10-08. Use
|
Other plugin_local:
crew_class_launcher_local
,
crew_launcher_local()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { controller <- crew_controller_local() controller$start() controller$push(name = "task", command = sqrt(4)) controller$wait() controller$pop() controller$terminate() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { controller <- crew_controller_local() controller$start() controller$push(name = "task", command = sqrt(4)) controller$wait() controller$pop() controller$terminate() }
The sequential controller runs tasks on the same R process where the controller object exists. Tasks run sequentially rather than in parallel.
crew_controller_sequential( reset_globals = TRUE, reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE )
crew_controller_sequential( reset_globals = TRUE, reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE )
reset_globals |
|
reset_packages |
|
reset_options |
|
garbage_collection |
|
Other sequential controllers:
crew_class_controller_sequential
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { controller <- crew_controller_sequential() controller$push(name = "task", command = sqrt(4)) controller$pop() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { controller <- crew_controller_sequential() controller$push(name = "task", command = sqrt(4)) controller$pop() }
crew
feature.Show an informative warning when a crew
feature is
deprecated.
crew_deprecate( name, date, version, alternative, condition = "warning", value = "x", skip_cran = FALSE, frequency = "always" )
crew_deprecate( name, date, version, alternative, condition = "warning", value = "x", skip_cran = FALSE, frequency = "always" )
name |
Name of the feature (function or argument) to deprecate. |
date |
Date of deprecation. |
version |
Package version when deprecation was instated. |
alternative |
Message about an alternative. |
condition |
Either "warning" or "message" to indicate the type of condition thrown on deprecation. |
value |
Value of the object. Deprecation is skipped
if |
skip_cran |
Logical of length 1, whether to skip the deprecation warning or message on CRAN. |
frequency |
Character of length 1, passed to the |
NULL
(invisibly). Throws a warning if a feature is deprecated.
Other utility:
crew_assert()
,
crew_clean()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
suppressWarnings( crew_deprecate( name = "auto_scale", date = "2023-05-18", version = "0.2.0", alternative = "use the scale argument of push(), pop(), and wait()." ) )
suppressWarnings( crew_deprecate( name = "auto_scale", date = "2023-05-18", version = "0.2.0", alternative = "use the scale argument of push(), pop(), and wait()." ) )
Not a user-side function. Do not call directly.
crew_eval( command, name, data = list(), globals = list(), seed = NULL, algorithm = NULL, packages = character(0), library = NULL, reset_globals = TRUE, reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE )
crew_eval( command, name, data = list(), globals = list(), seed = NULL, algorithm = NULL, packages = character(0), library = NULL, reset_globals = TRUE, reset_packages = FALSE, reset_options = FALSE, garbage_collection = FALSE )
command |
Language object with R code to run. |
name |
Character of length 1, name of the task. |
data |
Named list of local data objects in the evaluation environment. |
globals |
Named list of objects to temporarily assign to the global environment for the task. |
seed |
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
|
algorithm |
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the |
packages |
Character vector of packages to load for the task. |
library |
Library path to load the packages. See the |
reset_globals |
|
reset_packages |
|
reset_options |
|
garbage_collection |
|
The crew_eval()
function evaluates an R expression
in an encapsulated environment and returns a monad with the results,
including warnings and error messages if applicable.
The random number generator seed, globals
, and global options
are restored to their original values on exit.
A monad object with results and metadata.
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
crew_eval(quote(1 + 1), name = "task_name")
crew_eval(quote(1 + 1), name = "task_name")
This function is useful for inheriting argument documentation
in functions that create custom third-party launchers. See
@inheritParams crew::crew_launcher
in the source code file of
crew_launcher_local()
.
crew_launcher( name = NULL, workers = 1L, seconds_interval = 0.25, seconds_timeout = 60, seconds_launch = 30, seconds_idle = 300, seconds_wall = Inf, seconds_exit = NULL, tasks_max = Inf, tasks_timers = 0L, reset_globals = NULL, reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, crashes_error = NULL, launch_max = NULL, tls = crew::crew_tls(), processes = NULL, r_arguments = c("--no-save", "--no-restore"), options_metrics = crew::crew_options_metrics() )
crew_launcher( name = NULL, workers = 1L, seconds_interval = 0.25, seconds_timeout = 60, seconds_launch = 30, seconds_idle = 300, seconds_wall = Inf, seconds_exit = NULL, tasks_max = Inf, tasks_timers = 0L, reset_globals = NULL, reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, crashes_error = NULL, launch_max = NULL, tls = crew::crew_tls(), processes = NULL, r_arguments = c("--no-save", "--no-restore"), options_metrics = crew::crew_options_metrics() )
name |
Character string, name of the launcher. If the name is
|
workers |
Maximum number of workers to run concurrently
when auto-scaling, excluding task retries and manual calls to |
seconds_interval |
Number of seconds between
polling intervals waiting for certain internal
synchronous operations to complete. In certain cases, exponential
backoff is used with this argument passed to |
seconds_timeout |
Number of seconds until timing
out while waiting for certain synchronous operations to complete,
such as checking |
seconds_launch |
Seconds of startup time to allow.
A worker is unconditionally assumed to be alive
from the moment of its launch until |
seconds_idle |
Maximum number of seconds that a worker can idle
since the completion of the last task. If exceeded, the worker exits.
But the timer does not launch until |
seconds_wall |
Soft wall time in seconds.
The timer does not launch until |
seconds_exit |
Deprecated on 2023-09-21 in version 0.5.0.9002. No longer necessary. |
tasks_max |
Maximum number of tasks that a worker will do before exiting. Also determines how often the controller auto-scales. See the Auto-scaling section for details. |
tasks_timers |
Number of tasks to do before activating
the timers for |
reset_globals |
Deprecated on 2025-05-30 ( |
reset_packages |
Deprecated on 2025-05-30 ( |
reset_options |
Deprecated on 2025-05-30 ( |
garbage_collection |
Deprecated on 2025-05-30
( |
crashes_error |
Deprecated on 2025-01-13 ( |
launch_max |
Deprecated on 2024-11-04 ( |
tls |
A TLS configuration object from |
processes |
Deprecated on 2025-08-27 ( |
r_arguments |
Optional character vector of command line arguments
to pass to |
options_metrics |
Either |
crew
launchers implement auto-scaling in the scale()
method.
When the task load increases, the number of workers increases in response
to demand. When the task load decreases, the workers start to exit.
This behavior happens dynamically over the course of a workflow,
and it can be tuned with arguments seconds_interval
, seconds_wall
,
and tasks_max
.
tasks_max
is special: it determines not only the number of tasks
a worker runs before exiting, it also determines how often
auto-scaling runs.
If tasks_max
is finite, then crew
uses an aggressive
deterministic exponential backoff algorithm to determine how frequently
to auto-scale (see crew_throttle()
).
But if tasks_max
is Inf
, then crew
only scales at equally-spaced
time intervals of seconds_interval
to allow enough pending
tasks to accumulate for job arrays.
This last part is important because auto-scaling
too frequently could lead to hundreds of separate job arrays with only
job per array (as opposed to the desired outcome of 1 or 2 arrays
with many jobs each).
Other launcher:
crew_class_launcher
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local() launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local() launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() }
Create an R6
object to launch and maintain
local process workers.
crew_launcher_local( name = NULL, workers = 1L, seconds_interval = 0.25, seconds_timeout = 60, seconds_launch = 30, seconds_idle = Inf, seconds_wall = Inf, seconds_exit = NULL, tasks_max = Inf, tasks_timers = 0L, reset_globals = NULL, reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, crashes_error = NULL, launch_max = NULL, tls = crew::crew_tls(), r_arguments = c("--no-save", "--no-restore"), options_metrics = crew::crew_options_metrics(), options_local = crew::crew_options_local(), local_log_directory = NULL, local_log_join = NULL )
crew_launcher_local( name = NULL, workers = 1L, seconds_interval = 0.25, seconds_timeout = 60, seconds_launch = 30, seconds_idle = Inf, seconds_wall = Inf, seconds_exit = NULL, tasks_max = Inf, tasks_timers = 0L, reset_globals = NULL, reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, crashes_error = NULL, launch_max = NULL, tls = crew::crew_tls(), r_arguments = c("--no-save", "--no-restore"), options_metrics = crew::crew_options_metrics(), options_local = crew::crew_options_local(), local_log_directory = NULL, local_log_join = NULL )
name |
Character string, name of the launcher. If the name is
|
workers |
Maximum number of workers to run concurrently
when auto-scaling, excluding task retries and manual calls to |
seconds_interval |
Number of seconds between
polling intervals waiting for certain internal
synchronous operations to complete. In certain cases, exponential
backoff is used with this argument passed to |
seconds_timeout |
Number of seconds until timing
out while waiting for certain synchronous operations to complete,
such as checking |
seconds_launch |
Seconds of startup time to allow.
A worker is unconditionally assumed to be alive
from the moment of its launch until |
seconds_idle |
Maximum number of seconds that a worker can idle
since the completion of the last task. If exceeded, the worker exits.
But the timer does not launch until |
seconds_wall |
Soft wall time in seconds.
The timer does not launch until |
seconds_exit |
Deprecated on 2023-09-21 in version 0.5.0.9002. No longer necessary. |
tasks_max |
Maximum number of tasks that a worker will do before exiting. Also determines how often the controller auto-scales. See the Auto-scaling section for details. |
tasks_timers |
Number of tasks to do before activating
the timers for |
reset_globals |
Deprecated on 2025-05-30 ( |
reset_packages |
Deprecated on 2025-05-30 ( |
reset_options |
Deprecated on 2025-05-30 ( |
garbage_collection |
Deprecated on 2025-05-30
( |
crashes_error |
Deprecated on 2025-01-13 ( |
launch_max |
Deprecated on 2024-11-04 ( |
tls |
A TLS configuration object from |
r_arguments |
Optional character vector of command line arguments
to pass to |
options_metrics |
Either |
options_local |
An object generated by |
local_log_directory |
Deprecated on 2024-10-08. Use
|
local_log_join |
Deprecated on 2024-10-08. Use
|
Other plugin_local:
crew_class_launcher_local
,
crew_controller_local()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local(name = client$name) launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local(name = client$name) launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() }
Create an R6
object to monitor local processes created by
crew
and mirai
.
crew_monitor_local()
crew_monitor_local()
Other monitor:
crew_class_monitor_local
crew
launcher options.Options for the local crew
launcher.
crew_options_local(log_directory = NULL, log_join = TRUE)
crew_options_local(log_directory = NULL, log_join = TRUE)
log_directory |
Either |
log_join |
Logical of length 1. If |
A classed list of options for the local launcher.
Other options:
crew_options_metrics()
crew_options_local()
crew_options_local()
crew_options_metrics()
configures the
crew
to record resource usage metrics (such as CPU and memory usage)
for each running worker.
To be activate resource usage logging,
the autometric
R package version 0.1.0 or higher
must be installed.
Logging happens in the background (through a detached POSIX)
so as not to disrupt
the R session. On Unix-like systems, crew_options_metrics()
can specify /dev/stdout
or /dev/stderr
as the log files, which will
redirect output to existing logs you are already using.
autometric::log_read()
and autometric::log_plot()
can read and
visualize resource usage data from multiple log files, even
if those files are mixed with other messages.
crew_options_metrics(path = NULL, seconds_interval = 5)
crew_options_metrics(path = NULL, seconds_interval = 5)
path |
Where to write resource metric log entries for workers.
After running enough tasks in |
seconds_interval |
Positive number, seconds between resource metric
log entries written to |
A classed list of options for logging resource usage metrics.
Other options:
crew_options_local()
crew_options_metrics()
crew_options_metrics()
Generate a random string that can be used as a name for a worker or task.
crew_random_name(n = 8L)
crew_random_name(n = 8L)
n |
Number of bytes of information in the random string
hashed to generate the name. Larger |
The randomness is not reproducible and cannot be set with
e.g. set.seed()
in R.
A random character string.
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
crew_random_name()
crew_random_name()
crew
relay object.Create an R6
crew
relay object.
crew_relay(throttle = crew_throttle())
crew_relay(throttle = crew_throttle())
throttle |
A |
A crew
relay object keeps the signaling relationships
among condition variables.
An R6
crew
relay object.
Other relay:
crew_class_relay
crew_relay()
crew_relay()
Repeatedly retry a function while it keeps returning FALSE
and exit the loop when it returns TRUE
crew_retry( fun, args = list(), seconds_interval = 0.25, seconds_timeout = 60, max_tries = Inf, error = TRUE, message = character(0), envir = parent.frame(), assertions = TRUE )
crew_retry( fun, args = list(), seconds_interval = 0.25, seconds_timeout = 60, max_tries = Inf, error = TRUE, message = character(0), envir = parent.frame(), assertions = TRUE )
fun |
Function that returns |
args |
A named list of arguments to |
seconds_interval |
Nonnegative numeric of length 1,
number of seconds to wait between calls to |
seconds_timeout |
Nonnegative numeric of length 1, number of seconds to loop before timing out. |
max_tries |
Maximum number of calls to |
error |
Whether to throw an error on a timeout or max tries. |
message |
Character of length 1, optional error message if the wait times out. |
envir |
Environment to evaluate |
assertions |
|
NULL
(invisibly).
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
crew_retry(fun = function() TRUE)
crew_retry(fun = function() TRUE)
Manually terminate a local process.
crew_terminate_process(pid)
crew_terminate_process(pid)
pid |
Integer of length 1, process ID to terminate. |
NULL
(invisibly).
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_signal()
,
crew_worker()
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { process <- processx::process$new("sleep", "60") process$is_alive() crew_terminate_process(pid = process$get_pid()) process$is_alive() }
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { process <- processx::process$new("sleep", "60") process$is_alive() crew_terminate_process(pid = process$get_pid()) process$is_alive() }
Get a supported operating system signal for terminating a local process.
crew_terminate_signal()
crew_terminate_signal()
An integer of length 1: tools::SIGTERM
if your platform
supports SIGTERM
. If not, then crew_crew_terminate_signal()()
checks
SIGQUIT
, then SIGINT
, then SIGKILL
, and then returns the first
signal it finds that your operating system can use.
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_worker()
crew_terminate_signal()
crew_terminate_signal()
Create an R6
object for throttling.
crew_throttle( seconds_max = 1, seconds_min = 1e-06, seconds_start = seconds_min, base = 2 )
crew_throttle( seconds_max = 1, seconds_min = 1e-06, seconds_start = seconds_min, base = 2 )
seconds_max |
Positive numeric scalar, maximum throttling interval |
seconds_min |
Positive numeric scalar, minimum throttling interval. |
seconds_start |
Positive numeric scalar,
the initial wait time interval in seconds.
The default is |
base |
Numeric scalar greater than 1, base of the exponential
backoff algorithm. |
Throttling is a technique that limits how often a function is
called in a given period of time. crew_throttle()
objects support
the throttle
argument of controller methods, which ensures auto-scaling
does not induce superfluous overhead.
The throttle uses deterministic exponential backoff algorithm
(https://en.wikipedia.org/wiki/Exponential_backoff) which
increases wait times when there is nothing to do and decreases
wait times when there is something to do. The controller decreases
or increases the wait time with methods accelerate()
and decelerate()
in the throttle object, respectively,
by dividing or multiplying by base
(but keeping the wait time
between seconds_min
and seconds_max
).
In practice, crew
calls reset()
instead of update()
in order to respond quicker to surges of activity (see the
update()
method).
An R6
object with throttle configuration settings and methods.
Other throttle:
crew_class_throttle
throttle <- crew_throttle(seconds_max = 1) throttle$poll() throttle$poll()
throttle <- crew_throttle(seconds_max = 1) throttle$poll() throttle$poll()
Create an R6
object with transport layer security (TLS)
configuration for crew
.
crew_tls( mode = "none", key = NULL, password = NULL, certificates = NULL, validate = TRUE )
crew_tls( mode = "none", key = NULL, password = NULL, certificates = NULL, validate = TRUE )
mode |
Character of length 1. Must be one of the following:
|
key |
If |
password |
If |
certificates |
If |
validate |
Logical of length 1, whether to validate the configuration
object on creation. If |
crew_tls()
objects are input to the tls
argument of
crew_client()
, crew_controller_local()
, etc.
See https://wlandau.github.io/crew/articles/risks.html for details.
An R6
object with TLS configuration settings and methods.
Other tls:
crew_class_tls
crew_tls(mode = "automatic")
crew_tls(mode = "automatic")
Launches a crew
worker which runs a mirai
daemon.
Not a user-side function. Users should not call crew_worker()
directly. See launcher plugins like crew_launcher_local()
for examples.
crew_worker( settings, controller, options_metrics = crew::crew_options_metrics() )
crew_worker( settings, controller, options_metrics = crew::crew_options_metrics() )
settings |
Named list of arguments to |
controller |
Character string, name of the controller. |
options_metrics |
Either |
NULL
(invisibly)
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()