Class: Temporalio::Client::WorkflowHandle

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/client/workflow_handle.rb

Overview

Handle for interacting with a workflow. This is usually created via #start_workflow or #workflow_handle.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#first_execution_run_idString? (readonly)

Run ID used for some calls like #cancel and #terminate to ensure the cancel and terminate happen for a workflow ID on a chain started with this run ID.

This can be set when using Temporalio::Client#workflow_handle. When Temporalio::Client#start_workflow is called without a start signal, this is set to the resulting run.

This cannot be mutated. If a different first execution run ID is needed, Temporalio::Client#workflow_handle must be used instead.

Returns:

  • (String, nil)

    First execution run ID.



49
50
51
# File 'lib/temporalio/client/workflow_handle.rb', line 49

def first_execution_run_id
  @first_execution_run_id
end

#idString (readonly)

Returns ID for the workflow.

Returns:

  • (String)

    ID for the workflow.



17
18
19
# File 'lib/temporalio/client/workflow_handle.rb', line 17

def id
  @id
end

#result_hintObject? (readonly)

Result hint for the result of this workflow. If this handle was created via Temporalio::Client#start_workflow, this is set from there (either via result hint on that call or workflow definition’s result hint). Otherwise, the result hint is set by the creator of the handle.

Returns:

  • (Object, nil)

    Result hint.



56
57
58
# File 'lib/temporalio/client/workflow_handle.rb', line 56

def result_hint
  @result_hint
end

#result_run_idString? (readonly)

Run ID used for #result calls if present to ensure result is for a workflow starting from this run.

When this handle is created via Temporalio::Client#workflow_handle, this is the same as #run_id. When this handle is created via Temporalio::Client#start_workflow, this value will be the resulting run ID.

This cannot be mutated. If a different run ID is needed, Temporalio::Client#workflow_handle must be used instead.

Returns:

  • (String, nil)

    Result run ID.



37
38
39
# File 'lib/temporalio/client/workflow_handle.rb', line 37

def result_run_id
  @result_run_id
end

#run_idString? (readonly)

Run ID used for #signal, #query, and #start_update/#execute_update calls if present to ensure the signal/query/update happen on this exact run.

This is only created via Temporalio::Client#workflow_handle. Temporalio::Client#start_workflow will not set this value.

This cannot be mutated. If a different run ID is needed, Temporalio::Client#workflow_handle must be used instead.

Returns:

  • (String, nil)

    Run ID.



27
28
29
# File 'lib/temporalio/client/workflow_handle.rb', line 27

def run_id
  @run_id
end

Instance Method Details

#cancel(rpc_options: nil) ⇒ Object

Note:

Handles created as a result of signal with start will cancel the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Cancel the workflow. This will issue a cancellation for #run_id if present. This call will make sure to use the run chain starting from #first_execution_run_id if present. To create handles with these values, use Temporalio::Client#workflow_handle.

Parameters:

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Raises:



396
397
398
399
400
401
402
403
# File 'lib/temporalio/client/workflow_handle.rb', line 396

def cancel(rpc_options: nil)
  @client._impl.cancel_workflow(Interceptor::CancelWorkflowInput.new(
                                  workflow_id: id,
                                  run_id:,
                                  first_execution_run_id:,
                                  rpc_options:
                                ))
end

#describe(rpc_options: nil) ⇒ WorkflowExecution::Description

Note:

Handles created as a result of Temporalio::Client#start_workflow will describe the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Get workflow details. This will get details for the #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:



157
158
159
160
161
162
163
# File 'lib/temporalio/client/workflow_handle.rb', line 157

def describe(rpc_options: nil)
  @client._impl.describe_workflow(Interceptor::DescribeWorkflowInput.new(
                                    workflow_id: id,
                                    run_id:,
                                    rpc_options:
                                  ))
end

#execute_update(update, *args, id: SecureRandom.uuid, arg_hints: nil, result_hint: nil, rpc_options: nil) ⇒ Object?

Note:

Handles created as a result of Temporalio::Client#start_workflow will send updates the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Send an update request to the workflow and wait for it to complete. This will target the workflow with #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • update (Workflow::Definition::Update, Symbol, String)

    Update definition or name.

  • args (Array<Object>)

    Update arguments.

  • id (String) (defaults to: SecureRandom.uuid)

    ID of the update.

  • arg_hints (Array<Object>, nil) (defaults to: nil)

    Update argument hints. If unset/nil and am update definition is passed, uses the ones on the update definition if present.

  • result_hint (Object, nil) (defaults to: nil)

    Update result hints. If unset/nil and an update definition is passed, uses the one on the update definition if present.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

  • (Object, nil)

    Update result.

Raises:



355
356
357
358
359
360
361
362
363
364
365
# File 'lib/temporalio/client/workflow_handle.rb', line 355

def execute_update(update, *args, id: SecureRandom.uuid, arg_hints: nil, result_hint: nil, rpc_options: nil)
  start_update(
    update,
    *args,
    wait_for_stage: WorkflowUpdateWaitStage::COMPLETED,
    id:,
    arg_hints:,
    result_hint:,
    rpc_options:
  ).result
end

#fetch_history(event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, rpc_options: nil) ⇒ WorkflowHistory

Get workflow history. This is a helper on top of #fetch_history_events.

Parameters:

  • event_filter_type (Api::Enums::V1::HistoryEventFilterType) (defaults to: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)

    Types of events to fetch.

  • skip_archival (Boolean) (defaults to: false)

    Whether to skip archival.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:



174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/temporalio/client/workflow_handle.rb', line 174

def fetch_history(
  event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
  skip_archival: false,
  rpc_options: nil
)
  WorkflowHistory.new(
    fetch_history_events(
      event_filter_type:,
      skip_archival:,
      rpc_options:
    ).to_a
  )
end

#fetch_history_events(wait_new_event: false, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, specific_run_id: run_id, rpc_options: nil) ⇒ Enumerator<Api::History::V1::HistoryEvent>

Fetch an enumerator of history events for this workflow. Internally this is done in paginated form, but it is presented as an enumerator.

Parameters:

  • wait_new_event (Boolean) (defaults to: false)

    If true, when the end of the current set of events is reached but the workflow is not complete, this will wait for the next event. If false, the enumerable completes at the end of current history.

  • event_filter_type (Api::Enums::V1::HistoryEventFilterType) (defaults to: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)

    Types of events to fetch.

  • skip_archival (Boolean) (defaults to: false)

    Whether to skip archival.

  • specific_run_id (String, nil) (defaults to: run_id)

    Run ID to fetch events for. Default is the #run_id. Most users will not need to set this and instead use the one on the class.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/temporalio/client/workflow_handle.rb', line 203

def fetch_history_events(
  wait_new_event: false,
  event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
  skip_archival: false,
  specific_run_id: run_id,
  rpc_options: nil
)
  @client._impl.fetch_workflow_history_events(Interceptor::FetchWorkflowHistoryEventsInput.new(
                                                workflow_id: id,
                                                run_id: specific_run_id,
                                                wait_new_event:,
                                                event_filter_type:,
                                                skip_archival:,
                                                rpc_options:
                                              ))
end

#query(query, *args, reject_condition: @client.options.default_workflow_query_reject_condition, arg_hints: nil, result_hint: nil, rpc_options: nil) ⇒ Object?

Note:

Handles created as a result of Temporalio::Client#start_workflow will query the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Query the workflow. This will query for #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • query (Workflow::Definition::Query, Symbol, String)

    Query definition or name.

  • args (Array<Object>)

    Query arguments.

  • reject_condition (WorkflowQueryRejectCondition, nil) (defaults to: @client.options.default_workflow_query_reject_condition)

    Condition for rejecting the query.

  • arg_hints (Array<Object>, nil) (defaults to: nil)

    Query argument hints. If unset/nil and a query definition is passed, uses the ones on the query definition if present.

  • result_hint (Object, nil) (defaults to: nil)

    Query result hints. If unset/nil and a query definition is passed, uses the one on the query definition if present.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

  • (Object, nil)

    Query result.

Raises:



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/temporalio/client/workflow_handle.rb', line 266

def query(
  query,
  *args,
  reject_condition: @client.options.default_workflow_query_reject_condition,
  arg_hints: nil,
  result_hint: nil,
  rpc_options: nil
)
  query, defn_arg_hints, defn_result_hint = Workflow::Definition::Query._name_and_hints_from_parameter(query)
  @client._impl.query_workflow(Interceptor::QueryWorkflowInput.new(
                                 workflow_id: id,
                                 run_id:,
                                 query:,
                                 args:,
                                 reject_condition:,
                                 arg_hints: arg_hints || defn_arg_hints,
                                 result_hint: result_hint || defn_result_hint,
                                 headers: {},
                                 rpc_options:
                               ))
end

#result(follow_runs: true, result_hint: nil, rpc_options: nil) ⇒ Object

Wait for the result of the workflow.

This will use #result_run_id if present to base the result on. To use another run ID, a new handle must be created via Temporalio::Client#workflow_handle.

Parameters:

  • follow_runs (Boolean) (defaults to: true)

    If true, workflow runs will be continually fetched across retries and continue as new until the latest one is found. If false, the first result is used.

  • result_hint (Object, nil) (defaults to: nil)

    Override the result hint for the result. If unset/nil, uses one on the handle itself.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

  • (Object)

    Result of the workflow after being converted by the data converter.

Raises:



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/temporalio/client/workflow_handle.rb', line 84

def result(follow_runs: true, result_hint: nil, rpc_options: nil)
  # Wait on the close event, following as needed
  hist_run_id = result_run_id
  loop do
    # Get close event
    event = fetch_history_events(
      wait_new_event: true,
      event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
      skip_archival: true,
      specific_run_id: hist_run_id,
      rpc_options:
    ).next

    # Check each close type'
    case event.event_type
    when :EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED
      attrs = event.workflow_execution_completed_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      return @client.data_converter.from_payloads(attrs.result, hints: Array(@result_hint || result_hint)).first
    when :EVENT_TYPE_WORKFLOW_EXECUTION_FAILED
      attrs = event.workflow_execution_failed_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      raise Error::WorkflowFailedError.new, cause: @client.data_converter.from_failure(attrs.failure)
    when :EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED
      attrs = event.workflow_execution_canceled_event_attributes
      raise Error::WorkflowFailedError.new, 'Workflow execution canceled', cause: Error::CanceledError.new(
        'Workflow execution canceled',
        details: @client.data_converter.from_payloads(attrs&.details)
      )
    when :EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED
      attrs = event.workflow_execution_terminated_event_attributes
      raise Error::WorkflowFailedError.new, 'Workflow execution terminated', cause: Error::TerminatedError.new(
        Internal::ProtoUtils.string_or(attrs.reason, 'Workflow execution terminated'),
        details: @client.data_converter.from_payloads(attrs&.details)
      )
    when :EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT
      attrs = event.workflow_execution_timed_out_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      raise Error::WorkflowFailedError.new, 'Workflow execution timed out', cause: Error::TimeoutError.new(
        'Workflow execution timed out',
        type: Api::Enums::V1::TimeoutType::TIMEOUT_TYPE_START_TO_CLOSE,
        last_heartbeat_details: []
      )
    when :EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW
      attrs = event.workflow_execution_continued_as_new_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      # TODO: Use more specific error and decode failure
      raise Error::WorkflowContinuedAsNewError.new(new_run_id: attrs.new_execution_run_id)
    else
      raise Error, "Unknown close event type: #{event.event_type}"
    end
  end
end

#signal(signal, *args, arg_hints: nil, rpc_options: nil) ⇒ Object

Note:

Handles created as a result of Temporalio::Client#start_workflow will signal the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Send a signal to the workflow. This will signal for #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • signal (Workflow::Definition::Signal, Symbol, String)

    Signal definition or name.

  • args (Array<Object>)

    Signal arguments.

  • arg_hints (Array<Object>, nil) (defaults to: nil)

    Signal argument hints. If unset/nil and a signal definition is passed, uses the ones on the signal definition if present.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Raises:



233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/temporalio/client/workflow_handle.rb', line 233

def signal(signal, *args, arg_hints: nil, rpc_options: nil)
  signal, defn_arg_hints = Workflow::Definition::Signal._name_and_hints_from_parameter(signal)
  @client._impl.signal_workflow(Interceptor::SignalWorkflowInput.new(
                                  workflow_id: id,
                                  run_id:,
                                  signal:,
                                  args:,
                                  arg_hints: arg_hints || defn_arg_hints,
                                  headers: {},
                                  rpc_options:
                                ))
end

#start_update(update, *args, wait_for_stage:, id: SecureRandom.uuid, arg_hints: nil, result_hint: nil, rpc_options: nil) ⇒ WorkflowUpdateHandle

Note:

Handles created as a result of Temporalio::Client#start_workflow will send updates the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Send an update request to the workflow and return a handle to it. This will target the workflow with #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • update (Workflow::Definition::Update, Symbol, String)

    Update definition or name.

  • args (Array<Object>)

    Update arguments.

  • wait_for_stage (WorkflowUpdateWaitStage)

    Required stage to wait until returning. ADMITTED is not currently supported. See docs.temporal.io/workflows#update for more details.

  • id (String) (defaults to: SecureRandom.uuid)

    ID of the update.

  • arg_hints (Array<Object>, nil) (defaults to: nil)

    Update argument hints. If unset/nil and am update definition is passed, uses the ones on the update definition if present.

  • result_hint (Object, nil) (defaults to: nil)

    Update result hints. If unset/nil and an update definition is passed, uses the one on the update definition if present.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/temporalio/client/workflow_handle.rb', line 310

def start_update(
  update,
  *args,
  wait_for_stage:,
  id: SecureRandom.uuid,
  arg_hints: nil,
  result_hint: nil,
  rpc_options: nil
)
  update, defn_arg_hints, defn_result_hint = Workflow::Definition::Update._name_and_hints_from_parameter(update)
  @client._impl.start_workflow_update(Interceptor::StartWorkflowUpdateInput.new(
                                        workflow_id: self.id,
                                        run_id:,
                                        update_id: id,
                                        update:,
                                        args:,
                                        wait_for_stage:,
                                        arg_hints: arg_hints || defn_arg_hints,
                                        result_hint: result_hint || defn_result_hint,
                                        headers: {},
                                        rpc_options:
                                      ))
end

#terminate(reason = nil, details: [], rpc_options: nil) ⇒ Object

Note:

Handles created as a result of signal with start will terminate the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Terminate the workflow. This will issue a termination for #run_id if present. This call will make sure to use the run chain starting from #first_execution_run_id if present. To create handles with these values, use Temporalio::Client#workflow_handle.

Parameters:

  • reason (String, nil) (defaults to: nil)

    Reason for the termination.

  • details (Array<Object>) (defaults to: [])

    Details to store on the termination.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Raises:



417
418
419
420
421
422
423
424
425
426
# File 'lib/temporalio/client/workflow_handle.rb', line 417

def terminate(reason = nil, details: [], rpc_options: nil)
  @client._impl.terminate_workflow(Interceptor::TerminateWorkflowInput.new(
                                     workflow_id: id,
                                     run_id:,
                                     first_execution_run_id:,
                                     reason:,
                                     details:,
                                     rpc_options:
                                   ))
end

#update_handle(id, specific_run_id: run_id, result_hint: nil) ⇒ WorkflowUpdateHandle

Get a handle for an update. The handle can be used to wait on the update result.

Parameters:

  • id (String)

    ID of the update.

  • specific_run_id (String, nil) (defaults to: run_id)

    Workflow run ID to get update handle for. Default is the #run_id. Most users will not need to set this and instead use the one on the class.

  • result_hint (Object, nil) (defaults to: nil)

    Result hint for the update result to set on the handle.

Returns:



375
376
377
378
379
380
381
382
383
384
# File 'lib/temporalio/client/workflow_handle.rb', line 375

def update_handle(id, specific_run_id: run_id, result_hint: nil)
  WorkflowUpdateHandle.new(
    client: @client,
    id:,
    workflow_id: self.id,
    workflow_run_id: specific_run_id,
    known_outcome: nil,
    result_hint:
  )
end