Module: Temporalio::Workflow
- Defined in:
- lib/temporalio/workflow.rb,
lib/temporalio/workflow/info.rb,
lib/temporalio/workflow/future.rb,
lib/temporalio/workflow/definition.rb,
lib/temporalio/workflow/update_info.rb,
lib/temporalio/workflow/parent_close_policy.rb,
lib/temporalio/workflow/child_workflow_handle.rb,
lib/temporalio/workflow/external_workflow_handle.rb,
lib/temporalio/workflow/handler_unfinished_policy.rb,
lib/temporalio/workflow/activity_cancellation_type.rb,
lib/temporalio/workflow/child_workflow_cancellation_type.rb
Overview
Module with all class-methods that can be made from a workflow. Methods on this module cannot be used outside of a workflow with the obvious exception of Workflow.in_workflow?. This module is not meant to be included or mixed in.
Defined Under Namespace
Modules: ActivityCancellationType, ChildWorkflowCancellationType, HandlerUnfinishedPolicy, ParentClosePolicy, Unsafe Classes: ChildWorkflowHandle, ContinueAsNewError, Definition, ExternalWorkflowHandle, Future, Info, InvalidWorkflowStateError, NondeterminismError, UpdateInfo
Class Method Summary collapse
-
.all_handlers_finished? ⇒ Boolean
Whether all update and signal handlers have finished executing.
-
.cancellation ⇒ Cancellation
Cancellation for the workflow.
-
.continue_as_new_suggested ⇒ Boolean
Whether continue as new is suggested.
-
.current_details ⇒ String
Get current details for this workflow that may appear in UI/CLI.
-
.current_details=(details) ⇒ Object
Set current details for this workflow that may appear in UI/CLI.
-
.current_history_length ⇒ Integer
Current number of events in history.
-
.current_history_size ⇒ Integer
Current history size in bytes.
-
.current_update_info ⇒ UpdateInfo
Current update info if this code is running inside an update.
-
.deprecate_patch(patch_id) ⇒ Object
Mark a patch as deprecated.
-
.execute_activity(activity, *args, task_queue: info.task_queue, summary: nil, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, heartbeat_timeout: nil, retry_policy: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil, disable_eager_execution: false) ⇒ Object
Execute an activity and return its result.
-
.execute_child_workflow(workflow, *args, id: random.uuid, task_queue: info.task_queue, static_summary: nil, static_details: nil, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil) ⇒ Object
Shortcut for Workflow.start_child_workflow + ChildWorkflowHandle#result.
-
.execute_local_activity(activity, *args, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, retry_policy: nil, local_retry_threshold: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil) ⇒ Object
Execute an activity locally in this same workflow task and return its result.
-
.external_workflow_handle(workflow_id, run_id: nil) ⇒ ExternalWorkflowHandle
Get a handle to an external workflow for canceling and issuing signals.
-
.in_workflow? ⇒ Boolean
Whether the current code is executing in a workflow.
-
.info ⇒ Info
Information about the current workflow.
-
.instance ⇒ Definition?
Workflow class instance.
-
.logger ⇒ Logger
Logger for the workflow.
-
.memo ⇒ Hash{String, Symbol => Object}
Memo for the workflow.
-
.metric_meter ⇒ Metric::Meter
Metric meter to create metrics on.
-
.now ⇒ Time
Current UTC time for this workflow.
-
.patched(patch_id) ⇒ Boolean
Patch a workflow.
-
.payload_converter ⇒ Converters::PayloadConverter
Payload converter for the workflow.
-
.query_handlers ⇒ Hash<String, Definition::Query>
Query handlers for this workflow.
-
.random ⇒ Random
Deterministic instance of Random for use in a workflow.
-
.search_attributes ⇒ SearchAttributes
Search attributes for the workflow.
-
.signal_handlers ⇒ Hash<String, Definition::Signal>
Signal handlers for this workflow.
-
.sleep(duration, summary: nil, cancellation: Workflow.cancellation) ⇒ Object
Sleep in a workflow for the given time.
-
.start_child_workflow(workflow, *args, id: random.uuid, task_queue: info.task_queue, static_summary: nil, static_details: nil, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil) ⇒ ChildWorkflowHandle
Start a child workflow and return the handle.
-
.timeout(duration, exception_class = Timeout::Error, message = 'execution expired', summary: 'Timeout timer') { ... } ⇒ Object
Run the block until the timeout is reached.
-
.update_handlers ⇒ Hash<String, Definition::Update>
Update handlers for this workflow.
-
.upsert_memo(hash) ⇒ Object
Issue updates to the workflow memo.
-
.upsert_search_attributes(*updates) ⇒ Object
Issue updates to the workflow search attributes.
-
.wait_condition(cancellation: Workflow.cancellation) { ... } ⇒ Object
Wait for the given block to return a “truthy” value (i.e. any value other than ‘false` or `nil`).
Class Method Details
.all_handlers_finished? ⇒ Boolean
Whether all update and signal handlers have finished executing. Consider waiting on this condition before workflow return or continue-as-new, to prevent interruption of in-progress handlers by workflow return: ‘Temporalio::Workflow.wait_condition { Temporalio::Workflow.all_handlers_finished? }“
24 25 26 |
# File 'lib/temporalio/workflow.rb', line 24 def self.all_handlers_finished? _current.all_handlers_finished? end |
.cancellation ⇒ Cancellation
Returns Cancellation for the workflow. This is canceled when a workflow cancellation request is received. This is the default cancellation for most workflow calls.
30 31 32 |
# File 'lib/temporalio/workflow.rb', line 30 def self.cancellation _current.cancellation end |
.continue_as_new_suggested ⇒ Boolean
Returns Whether continue as new is suggested. This value is the current continue-as-new suggestion up until the current task. Note, this value may not be up to date when accessed in a query. When continue as new is suggested is based on server-side configuration.
37 38 39 |
# File 'lib/temporalio/workflow.rb', line 37 def self.continue_as_new_suggested _current.continue_as_new_suggested end |
.current_details ⇒ String
Get current details for this workflow that may appear in UI/CLI. Unlike static details set at start, this value can be updated throughout the life of the workflow. This can be in Temporal markdown format and can span multiple lines. This is currently experimental.
46 47 48 |
# File 'lib/temporalio/workflow.rb', line 46 def self.current_details _current.current_details end |
.current_details=(details) ⇒ Object
Set current details for this workflow that may appear in UI/CLI. Unlike static details set at start, this value can be updated throughout the life of the workflow. This can be in Temporal markdown format and can span multiple lines. This is currently experimental.
55 56 57 |
# File 'lib/temporalio/workflow.rb', line 55 def self.current_details=(details) _current.current_details = details end |
.current_history_length ⇒ Integer
Returns Current number of events in history. This value is the current history event count up until the current task. Note, this value may not be up to date when accessed in a query.
61 62 63 |
# File 'lib/temporalio/workflow.rb', line 61 def self.current_history_length _current.current_history_length end |
.current_history_size ⇒ Integer
Returns Current history size in bytes. This value is the current history size up until the current task. Note, this value may not be up to date when accessed in a query.
67 68 69 |
# File 'lib/temporalio/workflow.rb', line 67 def self.current_history_size _current.current_history_size end |
.current_update_info ⇒ UpdateInfo
Returns Current update info if this code is running inside an update. This is set via a Fiber-local storage so it is only visible to the current handler fiber.
73 74 75 |
# File 'lib/temporalio/workflow.rb', line 73 def self.current_update_info _current.current_update_info end |
.deprecate_patch(patch_id) ⇒ Object
Mark a patch as deprecated.
This marks a workflow that had patched in a previous version of the code as no longer applicable because all workflows that use the old code path are done and will never be queried again. Therefore the old code path is removed as well.
84 85 86 |
# File 'lib/temporalio/workflow.rb', line 84 def self.deprecate_patch(patch_id) _current.deprecate_patch(patch_id) end |
.execute_activity(activity, *args, task_queue: info.task_queue, summary: nil, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, heartbeat_timeout: nil, retry_policy: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil, disable_eager_execution: false) ⇒ Object
Using an already-canceled cancellation may give a different exception than canceling after started. Use Error.canceled? to check if the exception is a cancellation either way.
Execute an activity and return its result. Either ‘start_to_close_timeout` or `schedule_to_close_timeout` must be set. The `heartbeat_timeout` should be set for any non-immediately-completing activity so it can receive cancellation. To run an activity in the background, use a Future.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/temporalio/workflow.rb', line 126 def self.execute_activity( activity, *args, task_queue: info.task_queue, summary: nil, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, heartbeat_timeout: nil, retry_policy: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil, disable_eager_execution: false ) _current.execute_activity( activity, *args, task_queue:, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution: ) end |
.execute_child_workflow(workflow, *args, id: random.uuid, task_queue: info.task_queue, static_summary: nil, static_details: nil, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil) ⇒ Object
Shortcut for start_child_workflow + Temporalio::Workflow::ChildWorkflowHandle#result. See those two calls for more details.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/temporalio/workflow.rb', line 149 def self.execute_child_workflow( workflow, *args, id: random.uuid, task_queue: info.task_queue, static_summary: nil, static_details: nil, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil ) start_child_workflow( workflow, *args, id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:, parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, retry_policy:, cron_schedule:, memo:, search_attributes: ).result end |
.execute_local_activity(activity, *args, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, retry_policy: nil, local_retry_threshold: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil) ⇒ Object
Using an already-canceled cancellation may give a different exception than canceling after started. Use Error.canceled? to check if the exception is a cancellation either way.
Execute an activity locally in this same workflow task and return its result. This should usually only be used for short/simple activities where the result performance matters. Either ‘start_to_close_timeout` or `schedule_to_close_timeout` must be set. To run an activity in the background, use a Future.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/temporalio/workflow.rb', line 208 def self.execute_local_activity( activity, *args, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, retry_policy: nil, local_retry_threshold: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil ) _current.execute_local_activity( activity, *args, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, retry_policy:, local_retry_threshold:, cancellation:, cancellation_type:, activity_id: ) end |
.external_workflow_handle(workflow_id, run_id: nil) ⇒ ExternalWorkflowHandle
Get a handle to an external workflow for canceling and issuing signals.
233 234 235 |
# File 'lib/temporalio/workflow.rb', line 233 def self.external_workflow_handle(workflow_id, run_id: nil) _current.external_workflow_handle(workflow_id, run_id:) end |
.in_workflow? ⇒ Boolean
Returns Whether the current code is executing in a workflow.
238 239 240 |
# File 'lib/temporalio/workflow.rb', line 238 def self.in_workflow? _current_or_nil != nil end |
.info ⇒ Info
Returns Information about the current workflow.
243 244 245 |
# File 'lib/temporalio/workflow.rb', line 243 def self.info _current.info end |
.instance ⇒ Definition?
Returns Workflow class instance. This should always be present except in Temporalio::Worker::Interceptor::Workflow::Inbound#init where it will be nil.
249 250 251 |
# File 'lib/temporalio/workflow.rb', line 249 def self.instance _current.instance end |
.logger ⇒ Logger
Returns Logger for the workflow. This is a scoped logger that automatically appends workflow details to every log and takes care not to log during replay.
255 256 257 |
# File 'lib/temporalio/workflow.rb', line 255 def self.logger _current.logger end |
.memo ⇒ Hash{String, Symbol => Object}
Returns Memo for the workflow. This is a read-only view of the memo. To update the memo, use upsert_memo. This always returns the same instance and updates are reflected on the returned instance, so it is not technically frozen.
262 263 264 |
# File 'lib/temporalio/workflow.rb', line 262 def self.memo _current.memo end |
.metric_meter ⇒ Metric::Meter
Returns Metric meter to create metrics on. This metric meter already contains some workflow-specific attributes and takes care not to apply metrics during replay.
268 269 270 |
# File 'lib/temporalio/workflow.rb', line 268 def self.metric_meter _current.metric_meter end |
.now ⇒ Time
Returns Current UTC time for this workflow. This creates and returns a new Time instance every time it is invoked, it is not the same instance continually mutated.
274 275 276 |
# File 'lib/temporalio/workflow.rb', line 274 def self.now _current.now end |
.patched(patch_id) ⇒ Boolean
Patch a workflow.
When called, this will only return true if code should take the newer path which means this is either not replaying or is replaying and has seen this patch before. Results for successive calls to this function for the same ID and workflow are memoized. Use deprecate_patch when all workflows are done and will never be queried again. The old code path can be removed at that time too.
287 288 289 |
# File 'lib/temporalio/workflow.rb', line 287 def self.patched(patch_id) _current.patched(patch_id) end |
.payload_converter ⇒ Converters::PayloadConverter
Returns Payload converter for the workflow.
292 293 294 |
# File 'lib/temporalio/workflow.rb', line 292 def self.payload_converter _current.payload_converter end |
.query_handlers ⇒ Hash<String, Definition::Query>
Returns Query handlers for this workflow. This hash is mostly immutable except for ‘[]=` (and `store`) which can be used to set a new handler, or can be set with `nil` to remove a handler. For most use cases, defining a handler as a `workflow_query` method is best.
299 300 301 |
# File 'lib/temporalio/workflow.rb', line 299 def self.query_handlers _current.query_handlers end |
.random ⇒ Random
Returns Deterministic instance of Random for use in a workflow. This instance should be accessed each time needed, not stored. This instance may be recreated with a different seed in special cases (e.g. workflow reset). Do not use any other randomization inside workflow code.
306 307 308 |
# File 'lib/temporalio/workflow.rb', line 306 def self.random _current.random end |
.search_attributes ⇒ SearchAttributes
Returns Search attributes for the workflow. This is a read-only view of the attributes. To update the attributes, use upsert_search_attributes. This always returns the same instance and updates are reflected on the returned instance, so it is not technically frozen.
313 314 315 |
# File 'lib/temporalio/workflow.rb', line 313 def self.search_attributes _current.search_attributes end |
.signal_handlers ⇒ Hash<String, Definition::Signal>
Returns Signal handlers for this workflow. This hash is mostly immutable except for ‘[]=` (and `store`) which can be used to set a new handler, or can be set with `nil` to remove a handler. For most use cases, defining a handler as a `workflow_signal` method is best.
320 321 322 |
# File 'lib/temporalio/workflow.rb', line 320 def self.signal_handlers _current.signal_handlers end |
.sleep(duration, summary: nil, cancellation: Workflow.cancellation) ⇒ Object
Sleep in a workflow for the given time.
334 335 336 |
# File 'lib/temporalio/workflow.rb', line 334 def self.sleep(duration, summary: nil, cancellation: Workflow.cancellation) _current.sleep(duration, summary:, cancellation:) end |
.start_child_workflow(workflow, *args, id: random.uuid, task_queue: info.task_queue, static_summary: nil, static_details: nil, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil) ⇒ ChildWorkflowHandle
Start a child workflow and return the handle.
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 |
# File 'lib/temporalio/workflow.rb', line 367 def self.start_child_workflow( workflow, *args, id: random.uuid, task_queue: info.task_queue, static_summary: nil, static_details: nil, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil ) _current.start_child_workflow( workflow, *args, id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:, parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, retry_policy:, cron_schedule:, memo:, search_attributes: ) end |
.timeout(duration, exception_class = Timeout::Error, message = 'execution expired', summary: 'Timeout timer') { ... } ⇒ Object
Run the block until the timeout is reached. This is backed by sleep. This does not accept cancellation because it is expected the block within will properly handle/bubble cancellation.
409 410 411 412 413 414 415 416 417 |
# File 'lib/temporalio/workflow.rb', line 409 def self.timeout( duration, exception_class = Timeout::Error, = 'execution expired', summary: 'Timeout timer', & ) _current.timeout(duration, exception_class, , summary:, &) end |
.update_handlers ⇒ Hash<String, Definition::Update>
Returns Update handlers for this workflow. This hash is mostly immutable except for ‘[]=` (and `store`) which can be used to set a new handler, or can be set with `nil` to remove a handler. For most use cases, defining a handler as a `workflow_update` method is best.
422 423 424 |
# File 'lib/temporalio/workflow.rb', line 422 def self.update_handlers _current.update_handlers end |
.upsert_memo(hash) ⇒ Object
Issue updates to the workflow memo.
430 431 432 |
# File 'lib/temporalio/workflow.rb', line 430 def self.upsert_memo(hash) _current.upsert_memo(hash) end |
.upsert_search_attributes(*updates) ⇒ Object
Issue updates to the workflow search attributes.
438 439 440 |
# File 'lib/temporalio/workflow.rb', line 438 def self.upsert_search_attributes(*updates) _current.upsert_search_attributes(*updates) end |
.wait_condition(cancellation: Workflow.cancellation) { ... } ⇒ Object
Wait for the given block to return a “truthy” value (i.e. any value other than ‘false` or `nil`). The block must be side-effect free since it may be invoked frequently during event loop iteration. To timeout a wait, timeout can be used. This cannot be used in side-effect-free contexts such as `initialize`, queries, or update validators.
This is very commonly used to wait on a value to be set by a handler, e.g. ‘Temporalio::Workflow.wait_condition { @some_value }`. Special care was taken to only wake up a single wait condition when it evaluates to true. Therefore if multiple wait conditions are waiting on the same thing, only one is awoken at a time, which means the code immediately following that wait condition can change the variable before other wait conditions are evaluated. This is a useful property for building mutexes/semaphores.
459 460 461 462 463 |
# File 'lib/temporalio/workflow.rb', line 459 def self.wait_condition(cancellation: Workflow.cancellation, &) raise 'Block required' unless block_given? _current.wait_condition(cancellation:, &) end |