Class: Temporalio::Client::WorkflowHandle

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/client/workflow_handle.rb

Overview

Handle for interacting with a workflow. This is usually created via #start_workflow or #workflow_handle.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#first_execution_run_idString? (readonly)

Run ID used for some calls like #cancel and #terminate to ensure the cancel and terminate happen for a workflow ID on a chain started with this run ID.

This can be set when using Temporalio::Client#workflow_handle. When Temporalio::Client#start_workflow is called without a start signal, this is set to the resulting run.

This cannot be mutated. If a different first execution run ID is needed, Temporalio::Client#workflow_handle must be used instead.

Returns:

  • (String, nil)

    First execution run ID.



49
50
51
# File 'lib/temporalio/client/workflow_handle.rb', line 49

def first_execution_run_id
  @first_execution_run_id
end

#idString (readonly)

Returns ID for the workflow.

Returns:

  • (String)

    ID for the workflow.



17
18
19
# File 'lib/temporalio/client/workflow_handle.rb', line 17

def id
  @id
end

#result_run_idString? (readonly)

Run ID used for #result calls if present to ensure result is for a workflow starting from this run.

When this handle is created via Temporalio::Client#workflow_handle, this is the same as #run_id. When this handle is created via Temporalio::Client#start_workflow, this value will be the resulting run ID.

This cannot be mutated. If a different run ID is needed, Temporalio::Client#workflow_handle must be used instead.

Returns:

  • (String, nil)

    Result run ID.



37
38
39
# File 'lib/temporalio/client/workflow_handle.rb', line 37

def result_run_id
  @result_run_id
end

#run_idString? (readonly)

Run ID used for #signal, #query, and #start_update/#execute_update calls if present to ensure the signal/query/update happen on this exact run.

This is only created via Temporalio::Client#workflow_handle. Temporalio::Client#start_workflow will not set this value.

This cannot be mutated. If a different run ID is needed, Temporalio::Client#workflow_handle must be used instead.

Returns:

  • (String, nil)

    Run ID.



27
28
29
# File 'lib/temporalio/client/workflow_handle.rb', line 27

def run_id
  @run_id
end

Instance Method Details

#cancel(rpc_options: nil) ⇒ Object

Note:

Handles created as a result of signal with start will cancel the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Cancel the workflow. This will issue a cancellation for #run_id if present. This call will make sure to use the run chain starting from #first_execution_run_id if present. To create handles with these values, use Temporalio::Client#workflow_handle.

Parameters:

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Raises:



356
357
358
359
360
361
362
363
# File 'lib/temporalio/client/workflow_handle.rb', line 356

def cancel(rpc_options: nil)
  @client._impl.cancel_workflow(Interceptor::CancelWorkflowInput.new(
                                  workflow_id: id,
                                  run_id:,
                                  first_execution_run_id:,
                                  rpc_options:
                                ))
end

#describe(rpc_options: nil) ⇒ WorkflowExecution::Description

Note:

Handles created as a result of Temporalio::Client#start_workflow will describe the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Get workflow details. This will get details for the #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:



147
148
149
150
151
152
153
# File 'lib/temporalio/client/workflow_handle.rb', line 147

def describe(rpc_options: nil)
  @client._impl.describe_workflow(Interceptor::DescribeWorkflowInput.new(
                                    workflow_id: id,
                                    run_id:,
                                    rpc_options:
                                  ))
end

#execute_update(update, *args, id: SecureRandom.uuid, rpc_options: nil) ⇒ Object?

Note:

Handles created as a result of Temporalio::Client#start_workflow will send updates the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Send an update request to the workflow and wait for it to complete. This will target the workflow with #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • update (Workflow::Definition::Update, Symbol, String)

    Update definition or name.

  • args (Array<Object>)

    Update arguments.

  • id (String) (defaults to: SecureRandom.uuid)

    ID of the update.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

  • (Object, nil)

    Update result.

Raises:



319
320
321
322
323
324
325
326
327
# File 'lib/temporalio/client/workflow_handle.rb', line 319

def execute_update(update, *args, id: SecureRandom.uuid, rpc_options: nil)
  start_update(
    update,
    *args,
    wait_for_stage: WorkflowUpdateWaitStage::COMPLETED,
    id:,
    rpc_options:
  ).result
end

#fetch_history(event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, rpc_options: nil) ⇒ WorkflowHistory

Get workflow history. This is a helper on top of #fetch_history_events.

Parameters:

  • event_filter_type (Api::Enums::V1::HistoryEventFilterType) (defaults to: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)

    Types of events to fetch.

  • skip_archival (Boolean) (defaults to: false)

    Whether to skip archival.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:



164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/temporalio/client/workflow_handle.rb', line 164

def fetch_history(
  event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
  skip_archival: false,
  rpc_options: nil
)
  WorkflowHistory.new(
    fetch_history_events(
      event_filter_type:,
      skip_archival:,
      rpc_options:
    ).to_a
  )
end

#fetch_history_events(wait_new_event: false, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, specific_run_id: run_id, rpc_options: nil) ⇒ Enumerator<Api::History::V1::HistoryEvent>

Fetch an enumerator of history events for this workflow. Internally this is done in paginated form, but it is presented as an enumerator.

Parameters:

  • wait_new_event (Boolean) (defaults to: false)

    If true, when the end of the current set of events is reached but the workflow is not complete, this will wait for the next event. If false, the enumerable completes at the end of current history.

  • event_filter_type (Api::Enums::V1::HistoryEventFilterType) (defaults to: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)

    Types of events to fetch.

  • skip_archival (Boolean) (defaults to: false)

    Whether to skip archival.

  • specific_run_id (String, nil) (defaults to: run_id)

    Run ID to fetch events for. Default is the #run_id. Most users will not need to set this and instead use the one on the class.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

Raises:



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/temporalio/client/workflow_handle.rb', line 193

def fetch_history_events(
  wait_new_event: false,
  event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
  skip_archival: false,
  specific_run_id: run_id,
  rpc_options: nil
)
  @client._impl.fetch_workflow_history_events(Interceptor::FetchWorkflowHistoryEventsInput.new(
                                                workflow_id: id,
                                                run_id: specific_run_id,
                                                wait_new_event:,
                                                event_filter_type:,
                                                skip_archival:,
                                                rpc_options:
                                              ))
end

#query(query, *args, reject_condition: @client.options.default_workflow_query_reject_condition, rpc_options: nil) ⇒ Object?

Note:

Handles created as a result of Temporalio::Client#start_workflow will query the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Query the workflow. This will query for #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • query (Workflow::Definition::Query, Symbol, String)

    Query definition or name.

  • args (Array<Object>)

    Query arguments.

  • reject_condition (WorkflowQueryRejectCondition, nil) (defaults to: @client.options.default_workflow_query_reject_condition)

    Condition for rejecting the query.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

  • (Object, nil)

    Query result.

Raises:



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/temporalio/client/workflow_handle.rb', line 248

def query(
  query,
  *args,
  reject_condition: @client.options.default_workflow_query_reject_condition,
  rpc_options: nil
)
  @client._impl.query_workflow(Interceptor::QueryWorkflowInput.new(
                                 workflow_id: id,
                                 run_id:,
                                 query:,
                                 args:,
                                 reject_condition:,
                                 headers: {},
                                 rpc_options:
                               ))
end

#result(follow_runs: true, rpc_options: nil) ⇒ Object

Wait for the result of the workflow.

This will use #result_run_id if present to base the result on. To use another run ID, a new handle must be created via Temporalio::Client#workflow_handle.

Parameters:

  • follow_runs (Boolean) (defaults to: true)

    If true, workflow runs will be continually fetched across retries and continue as new until the latest one is found. If false, the first result is used.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Returns:

  • (Object)

    Result of the workflow after being converted by the data converter.

Raises:



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/temporalio/client/workflow_handle.rb', line 74

def result(follow_runs: true, rpc_options: nil)
  # Wait on the close event, following as needed
  hist_run_id = result_run_id
  loop do
    # Get close event
    event = fetch_history_events(
      wait_new_event: true,
      event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
      skip_archival: true,
      specific_run_id: hist_run_id,
      rpc_options:
    ).next

    # Check each close type'
    case event.event_type
    when :EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED
      attrs = event.workflow_execution_completed_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      return @client.data_converter.from_payloads(attrs.result).first
    when :EVENT_TYPE_WORKFLOW_EXECUTION_FAILED
      attrs = event.workflow_execution_failed_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      raise Error::WorkflowFailedError.new, cause: @client.data_converter.from_failure(attrs.failure)
    when :EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED
      attrs = event.workflow_execution_canceled_event_attributes
      raise Error::WorkflowFailedError.new, 'Workflow execution canceled', cause: Error::CanceledError.new(
        'Workflow execution canceled',
        details: @client.data_converter.from_payloads(attrs&.details)
      )
    when :EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED
      attrs = event.workflow_execution_terminated_event_attributes
      raise Error::WorkflowFailedError.new, 'Workflow execution terminated', cause: Error::TerminatedError.new(
        Internal::ProtoUtils.string_or(attrs.reason, 'Workflow execution terminated'),
        details: @client.data_converter.from_payloads(attrs&.details)
      )
    when :EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT
      attrs = event.workflow_execution_timed_out_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      raise Error::WorkflowFailedError.new, 'Workflow execution timed out', cause: Error::TimeoutError.new(
        'Workflow execution timed out',
        type: Api::Enums::V1::TimeoutType::TIMEOUT_TYPE_START_TO_CLOSE,
        last_heartbeat_details: []
      )
    when :EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW
      attrs = event.workflow_execution_continued_as_new_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      # TODO: Use more specific error and decode failure
      raise Error::WorkflowContinuedAsNewError.new(new_run_id: attrs.new_execution_run_id)
    else
      raise Error, "Unknown close event type: #{event.event_type}"
    end
  end
end

#signal(signal, *args, rpc_options: nil) ⇒ Object

Note:

Handles created as a result of Temporalio::Client#start_workflow will signal the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Send a signal to the workflow. This will signal for #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • signal (Workflow::Definition::Signal, Symbol, String)

    Signal definition or name.

  • args (Array<Object>)

    Signal arguments.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Raises:



221
222
223
224
225
226
227
228
229
230
# File 'lib/temporalio/client/workflow_handle.rb', line 221

def signal(signal, *args, rpc_options: nil)
  @client._impl.signal_workflow(Interceptor::SignalWorkflowInput.new(
                                  workflow_id: id,
                                  run_id:,
                                  signal:,
                                  args:,
                                  headers: {},
                                  rpc_options:
                                ))
end

#start_update(update, *args, wait_for_stage:, id: SecureRandom.uuid, rpc_options: nil) ⇒ WorkflowUpdateHandle

Note:

Handles created as a result of Temporalio::Client#start_workflow will send updates the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Send an update request to the workflow and return a handle to it. This will target the workflow with #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

Returns:

Raises:



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/temporalio/client/workflow_handle.rb', line 283

def start_update(
  update,
  *args,
  wait_for_stage:,
  id: SecureRandom.uuid,
  rpc_options: nil
)
  @client._impl.start_workflow_update(Interceptor::StartWorkflowUpdateInput.new(
                                        workflow_id: self.id,
                                        run_id:,
                                        update_id: id,
                                        update:,
                                        args:,
                                        wait_for_stage:,
                                        headers: {},
                                        rpc_options:
                                      ))
end

#terminate(reason = nil, details: [], rpc_options: nil) ⇒ Object

Note:

Handles created as a result of signal with start will terminate the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Terminate the workflow. This will issue a termination for #run_id if present. This call will make sure to use the run chain starting from #first_execution_run_id if present. To create handles with these values, use Temporalio::Client#workflow_handle.

Parameters:

  • reason (String, nil) (defaults to: nil)

    Reason for the termination.

  • details (Array<Object>) (defaults to: [])

    Details to store on the termination.

  • rpc_options (RPCOptions, nil) (defaults to: nil)

    Advanced RPC options.

Raises:



377
378
379
380
381
382
383
384
385
386
# File 'lib/temporalio/client/workflow_handle.rb', line 377

def terminate(reason = nil, details: [], rpc_options: nil)
  @client._impl.terminate_workflow(Interceptor::TerminateWorkflowInput.new(
                                     workflow_id: id,
                                     run_id:,
                                     first_execution_run_id:,
                                     reason:,
                                     details:,
                                     rpc_options:
                                   ))
end

#update_handle(id, specific_run_id: run_id) ⇒ WorkflowUpdateHandle

Get a handle for an update. The handle can be used to wait on the update result.

Parameters:

  • id (String)

    ID of the update.

  • specific_run_id (String, nil) (defaults to: run_id)

    Workflow run ID to get update handle for. Default is the #run_id. Most users will not need to set this and instead use the one on the class.

Returns:



336
337
338
339
340
341
342
343
344
# File 'lib/temporalio/client/workflow_handle.rb', line 336

def update_handle(id, specific_run_id: run_id)
  WorkflowUpdateHandle.new(
    client: @client,
    id:,
    workflow_id: self.id,
    workflow_run_id: specific_run_id,
    known_outcome: nil
  )
end