Class: Temporalio::Client::WorkflowHandle
- Inherits:
-
Object
- Object
- Temporalio::Client::WorkflowHandle
- Defined in:
- lib/temporalio/client/workflow_handle.rb
Overview
Handle for interacting with a workflow. This is usually created via #start_workflow or #workflow_handle.
Instance Attribute Summary collapse
-
#first_execution_run_id ⇒ String?
readonly
Run ID used for some calls like #cancel and #terminate to ensure the cancel and terminate happen for a workflow ID on a chain started with this run ID.
-
#id ⇒ String
readonly
ID for the workflow.
-
#result_hint ⇒ Object?
readonly
Result hint for the result of this workflow.
-
#result_run_id ⇒ String?
readonly
Run ID used for #result calls if present to ensure result is for a workflow starting from this run.
-
#run_id ⇒ String?
readonly
Run ID used for #signal, #query, and #start_update/#execute_update calls if present to ensure the signal/query/update happen on this exact run.
Instance Method Summary collapse
-
#cancel(rpc_options: nil) ⇒ Object
Cancel the workflow.
-
#describe(rpc_options: nil) ⇒ WorkflowExecution::Description
Get workflow details.
-
#execute_update(update, *args, id: SecureRandom.uuid, arg_hints: nil, result_hint: nil, rpc_options: nil) ⇒ Object?
Send an update request to the workflow and wait for it to complete.
-
#fetch_history(event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, rpc_options: nil) ⇒ WorkflowHistory
Get workflow history.
-
#fetch_history_events(wait_new_event: false, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, specific_run_id: run_id, rpc_options: nil) ⇒ Enumerator<Api::History::V1::HistoryEvent>
Fetch an enumerator of history events for this workflow.
-
#query(query, *args, reject_condition: @client.options.default_workflow_query_reject_condition, arg_hints: nil, result_hint: nil, rpc_options: nil) ⇒ Object?
Query the workflow.
-
#result(follow_runs: true, result_hint: nil, rpc_options: nil) ⇒ Object
Wait for the result of the workflow.
-
#signal(signal, *args, arg_hints: nil, rpc_options: nil) ⇒ Object
Send a signal to the workflow.
-
#start_update(update, *args, wait_for_stage:, id: SecureRandom.uuid, arg_hints: nil, result_hint: nil, rpc_options: nil) ⇒ WorkflowUpdateHandle
Send an update request to the workflow and return a handle to it.
-
#terminate(reason = nil, details: [], rpc_options: nil) ⇒ Object
Terminate the workflow.
-
#update_handle(id, specific_run_id: run_id, result_hint: nil) ⇒ WorkflowUpdateHandle
Get a handle for an update.
Instance Attribute Details
#first_execution_run_id ⇒ String? (readonly)
Run ID used for some calls like #cancel and #terminate to ensure the cancel and terminate happen for a workflow ID on a chain started with this run ID.
This can be set when using Temporalio::Client#workflow_handle. When Temporalio::Client#start_workflow is called without a start signal, this is set to the resulting run.
This cannot be mutated. If a different first execution run ID is needed, Temporalio::Client#workflow_handle must be used instead.
49 50 51 |
# File 'lib/temporalio/client/workflow_handle.rb', line 49 def first_execution_run_id @first_execution_run_id end |
#id ⇒ String (readonly)
Returns ID for the workflow.
17 18 19 |
# File 'lib/temporalio/client/workflow_handle.rb', line 17 def id @id end |
#result_hint ⇒ Object? (readonly)
Result hint for the result of this workflow. If this handle was created via Temporalio::Client#start_workflow, this is set from there (either via result hint on that call or workflow definition’s result hint). Otherwise, the result hint is set by the creator of the handle.
56 57 58 |
# File 'lib/temporalio/client/workflow_handle.rb', line 56 def result_hint @result_hint end |
#result_run_id ⇒ String? (readonly)
Run ID used for #result calls if present to ensure result is for a workflow starting from this run.
When this handle is created via Temporalio::Client#workflow_handle, this is the same as #run_id. When this handle is created via Temporalio::Client#start_workflow, this value will be the resulting run ID.
This cannot be mutated. If a different run ID is needed, Temporalio::Client#workflow_handle must be used instead.
37 38 39 |
# File 'lib/temporalio/client/workflow_handle.rb', line 37 def result_run_id @result_run_id end |
#run_id ⇒ String? (readonly)
Run ID used for #signal, #query, and #start_update/#execute_update calls if present to ensure the signal/query/update happen on this exact run.
This is only created via Temporalio::Client#workflow_handle. Temporalio::Client#start_workflow will not set this value.
This cannot be mutated. If a different run ID is needed, Temporalio::Client#workflow_handle must be used instead.
27 28 29 |
# File 'lib/temporalio/client/workflow_handle.rb', line 27 def run_id @run_id end |
Instance Method Details
#cancel(rpc_options: nil) ⇒ Object
Handles created as a result of signal with start will cancel the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Cancel the workflow. This will issue a cancellation for #run_id if present. This call will make sure to use the run chain starting from #first_execution_run_id if present. To create handles with these values, use Temporalio::Client#workflow_handle.
396 397 398 399 400 401 402 403 |
# File 'lib/temporalio/client/workflow_handle.rb', line 396 def cancel(rpc_options: nil) @client._impl.cancel_workflow(Interceptor::CancelWorkflowInput.new( workflow_id: id, run_id:, first_execution_run_id:, rpc_options: )) end |
#describe(rpc_options: nil) ⇒ WorkflowExecution::Description
Handles created as a result of Temporalio::Client#start_workflow will describe the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Get workflow details. This will get details for the #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.
157 158 159 160 161 162 163 |
# File 'lib/temporalio/client/workflow_handle.rb', line 157 def describe(rpc_options: nil) @client._impl.describe_workflow(Interceptor::DescribeWorkflowInput.new( workflow_id: id, run_id:, rpc_options: )) end |
#execute_update(update, *args, id: SecureRandom.uuid, arg_hints: nil, result_hint: nil, rpc_options: nil) ⇒ Object?
Handles created as a result of Temporalio::Client#start_workflow will send updates the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Send an update request to the workflow and wait for it to complete. This will target the workflow with #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.
355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/temporalio/client/workflow_handle.rb', line 355 def execute_update(update, *args, id: SecureRandom.uuid, arg_hints: nil, result_hint: nil, rpc_options: nil) start_update( update, *args, wait_for_stage: WorkflowUpdateWaitStage::COMPLETED, id:, arg_hints:, result_hint:, rpc_options: ).result end |
#fetch_history(event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, rpc_options: nil) ⇒ WorkflowHistory
Get workflow history. This is a helper on top of #fetch_history_events.
174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/temporalio/client/workflow_handle.rb', line 174 def fetch_history( event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, rpc_options: nil ) WorkflowHistory.new( fetch_history_events( event_filter_type:, skip_archival:, rpc_options: ).to_a ) end |
#fetch_history_events(wait_new_event: false, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, specific_run_id: run_id, rpc_options: nil) ⇒ Enumerator<Api::History::V1::HistoryEvent>
Fetch an enumerator of history events for this workflow. Internally this is done in paginated form, but it is presented as an enumerator.
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/temporalio/client/workflow_handle.rb', line 203 def fetch_history_events( wait_new_event: false, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, specific_run_id: run_id, rpc_options: nil ) @client._impl.fetch_workflow_history_events(Interceptor::FetchWorkflowHistoryEventsInput.new( workflow_id: id, run_id: specific_run_id, wait_new_event:, event_filter_type:, skip_archival:, rpc_options: )) end |
#query(query, *args, reject_condition: @client.options.default_workflow_query_reject_condition, arg_hints: nil, result_hint: nil, rpc_options: nil) ⇒ Object?
Handles created as a result of Temporalio::Client#start_workflow will query the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Query the workflow. This will query for #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/temporalio/client/workflow_handle.rb', line 266 def query( query, *args, reject_condition: @client..default_workflow_query_reject_condition, arg_hints: nil, result_hint: nil, rpc_options: nil ) query, defn_arg_hints, defn_result_hint = Workflow::Definition::Query._name_and_hints_from_parameter(query) @client._impl.query_workflow(Interceptor::QueryWorkflowInput.new( workflow_id: id, run_id:, query:, args:, reject_condition:, arg_hints: arg_hints || defn_arg_hints, result_hint: result_hint || defn_result_hint, headers: {}, rpc_options: )) end |
#result(follow_runs: true, result_hint: nil, rpc_options: nil) ⇒ Object
Wait for the result of the workflow.
This will use #result_run_id if present to base the result on. To use another run ID, a new handle must be created via Temporalio::Client#workflow_handle.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/temporalio/client/workflow_handle.rb', line 84 def result(follow_runs: true, result_hint: nil, rpc_options: nil) # Wait on the close event, following as needed hist_run_id = result_run_id loop do # Get close event event = fetch_history_events( wait_new_event: true, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, skip_archival: true, specific_run_id: hist_run_id, rpc_options: ).next # Check each close type' case event.event_type when :EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED attrs = event.workflow_execution_completed_event_attributes hist_run_id = attrs.new_execution_run_id next if follow_runs && hist_run_id && !hist_run_id.empty? return @client.data_converter.from_payloads(attrs.result, hints: Array(@result_hint || result_hint)).first when :EVENT_TYPE_WORKFLOW_EXECUTION_FAILED attrs = event.workflow_execution_failed_event_attributes hist_run_id = attrs.new_execution_run_id next if follow_runs && hist_run_id && !hist_run_id.empty? raise Error::WorkflowFailedError.new, cause: @client.data_converter.from_failure(attrs.failure) when :EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED attrs = event.workflow_execution_canceled_event_attributes raise Error::WorkflowFailedError.new, 'Workflow execution canceled', cause: Error::CanceledError.new( 'Workflow execution canceled', details: @client.data_converter.from_payloads(attrs&.details) ) when :EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED attrs = event.workflow_execution_terminated_event_attributes raise Error::WorkflowFailedError.new, 'Workflow execution terminated', cause: Error::TerminatedError.new( Internal::ProtoUtils.string_or(attrs.reason, 'Workflow execution terminated'), details: @client.data_converter.from_payloads(attrs&.details) ) when :EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT attrs = event.workflow_execution_timed_out_event_attributes hist_run_id = attrs.new_execution_run_id next if follow_runs && hist_run_id && !hist_run_id.empty? raise Error::WorkflowFailedError.new, 'Workflow execution timed out', cause: Error::TimeoutError.new( 'Workflow execution timed out', type: Api::Enums::V1::TimeoutType::TIMEOUT_TYPE_START_TO_CLOSE, last_heartbeat_details: [] ) when :EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW attrs = event.workflow_execution_continued_as_new_event_attributes hist_run_id = attrs.new_execution_run_id next if follow_runs && hist_run_id && !hist_run_id.empty? # TODO: Use more specific error and decode failure raise Error::WorkflowContinuedAsNewError.new(new_run_id: attrs.new_execution_run_id) else raise Error, "Unknown close event type: #{event.event_type}" end end end |
#signal(signal, *args, arg_hints: nil, rpc_options: nil) ⇒ Object
Handles created as a result of Temporalio::Client#start_workflow will signal the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Send a signal to the workflow. This will signal for #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.
233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/temporalio/client/workflow_handle.rb', line 233 def signal(signal, *args, arg_hints: nil, rpc_options: nil) signal, defn_arg_hints = Workflow::Definition::Signal._name_and_hints_from_parameter(signal) @client._impl.signal_workflow(Interceptor::SignalWorkflowInput.new( workflow_id: id, run_id:, signal:, args:, arg_hints: arg_hints || defn_arg_hints, headers: {}, rpc_options: )) end |
#start_update(update, *args, wait_for_stage:, id: SecureRandom.uuid, arg_hints: nil, result_hint: nil, rpc_options: nil) ⇒ WorkflowUpdateHandle
Handles created as a result of Temporalio::Client#start_workflow will send updates the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Send an update request to the workflow and return a handle to it. This will target the workflow with #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/temporalio/client/workflow_handle.rb', line 310 def start_update( update, *args, wait_for_stage:, id: SecureRandom.uuid, arg_hints: nil, result_hint: nil, rpc_options: nil ) update, defn_arg_hints, defn_result_hint = Workflow::Definition::Update._name_and_hints_from_parameter(update) @client._impl.start_workflow_update(Interceptor::StartWorkflowUpdateInput.new( workflow_id: self.id, run_id:, update_id: id, update:, args:, wait_for_stage:, arg_hints: arg_hints || defn_arg_hints, result_hint: result_hint || defn_result_hint, headers: {}, rpc_options: )) end |
#terminate(reason = nil, details: [], rpc_options: nil) ⇒ Object
Handles created as a result of signal with start will terminate the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Terminate the workflow. This will issue a termination for #run_id if present. This call will make sure to use the run chain starting from #first_execution_run_id if present. To create handles with these values, use Temporalio::Client#workflow_handle.
417 418 419 420 421 422 423 424 425 426 |
# File 'lib/temporalio/client/workflow_handle.rb', line 417 def terminate(reason = nil, details: [], rpc_options: nil) @client._impl.terminate_workflow(Interceptor::TerminateWorkflowInput.new( workflow_id: id, run_id:, first_execution_run_id:, reason:, details:, rpc_options: )) end |
#update_handle(id, specific_run_id: run_id, result_hint: nil) ⇒ WorkflowUpdateHandle
Get a handle for an update. The handle can be used to wait on the update result.
375 376 377 378 379 380 381 382 383 384 |
# File 'lib/temporalio/client/workflow_handle.rb', line 375 def update_handle(id, specific_run_id: run_id, result_hint: nil) WorkflowUpdateHandle.new( client: @client, id:, workflow_id: self.id, workflow_run_id: specific_run_id, known_outcome: nil, result_hint: ) end |