Class: Temporalio::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker.rb,
lib/temporalio/worker/tuner.rb,
lib/temporalio/worker/interceptor.rb,
lib/temporalio/worker/thread_pool.rb,
lib/temporalio/worker/activity_executor.rb,
lib/temporalio/worker/workflow_executor.rb,
lib/temporalio/worker/workflow_replayer.rb,
lib/temporalio/worker/activity_executor/fiber.rb,
lib/temporalio/worker/activity_executor/thread_pool.rb,
lib/temporalio/worker/workflow_executor/thread_pool.rb

Overview

Worker for processing activities and workflows on a task queue.

Workers are created for a task queue and the items they can run. Then #run is used for running a single worker, or Worker.run_all is used for a collection of workers. These can wait until a block is complete or a Cancellation is canceled.

Defined Under Namespace

Modules: Interceptor Classes: ActivityExecutor, Options, ThreadPool, Tuner, WorkflowExecutor, WorkflowReplayer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, interceptors: [], build_id: Worker.default_build_id, identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, use_worker_versioning: false, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)) ⇒ Worker

Create a new worker. At least one activity or workflow must be present.

Parameters:

  • client (Client)

    Client for this worker.

  • task_queue (String)

    Task queue for this worker.

  • activities (Array<Activity::Definition, Class<Activity::Definition>, Activity::Definition::Info>) (defaults to: [])

    Activities for this worker.

  • workflows (Array<Class<Workflow::Definition>>) (defaults to: [])

    Workflows for this worker.

  • tuner (Tuner) (defaults to: Tuner.create_fixed)

    Tuner that controls the amount of concurrent activities/workflows that run at a time.

  • activity_executors (Hash<Symbol, Worker::ActivityExecutor>) (defaults to: ActivityExecutor.defaults)

    Executors that activities can run within.

  • workflow_executor (WorkflowExecutor) (defaults to: WorkflowExecutor::ThreadPool.default)

    Workflow executor that workflow tasks run within. This must be a Temporalio::Worker::WorkflowExecutor::ThreadPool currently.

  • interceptors (Array<Interceptor::Activity, Interceptor::Workflow>) (defaults to: [])

    Interceptors specific to this worker. Note, interceptors set on the client that include the Temporalio::Worker::Interceptor::Activity or Temporalio::Worker::Interceptor::Workflow module are automatically included here, so no need to specify them again.

  • build_id (String) (defaults to: Worker.default_build_id)

    Unique identifier for the current runtime. This is best set as a unique value representing all code and should change only when code does. This can be something like a git commit hash. If unset, default is hash of known Ruby code.

  • identity (String, nil) (defaults to: nil)

    Override the identity for this worker. If unset, client identity is used.

  • logger (Logger) (defaults to: client.options.logger)

    Logger to override client logger with. Default is the client logger.

  • max_cached_workflows (Integer) (defaults to: 1000)

    Number of workflows held in cache for use by sticky task queue. If set to 0, workflow caching and sticky queuing are disabled.

  • max_concurrent_workflow_task_polls (Integer) (defaults to: 5)

    Maximum number of concurrent poll workflow task requests we will perform at a time on this worker’s task queue.

  • nonsticky_to_sticky_poll_ratio (Float) (defaults to: 0.2)

    ‘max_concurrent_workflow_task_polls` * this number = the number of max pollers that will be allowed for the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for either poller is 1, so if `max_concurrent_workflow_task_polls` is 1 and sticky queues are enabled, there will be 2 concurrent polls.

  • max_concurrent_activity_task_polls (Integer) (defaults to: 5)

    Maximum number of concurrent poll activity task requests we will perform at a time on this worker’s task queue.

  • no_remote_activities (Boolean) (defaults to: false)

    If true, this worker will only handle workflow tasks and local activities, it will not poll for activity tasks.

  • sticky_queue_schedule_to_start_timeout (Float) (defaults to: 10)

    How long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any worker.

  • max_heartbeat_throttle_interval (Float) (defaults to: 60)

    Longest interval for throttling activity heartbeats.

  • default_heartbeat_throttle_interval (Float) (defaults to: 30)

    Default interval for throttling activity heartbeats in case per-activity heartbeat timeout is unset. Otherwise, it’s the per-activity heartbeat timeout * 0.8.

  • max_activities_per_second (Float, nil) (defaults to: nil)

    Limits the number of activities per second that this worker will process. The worker will not poll for new activities if by doing so it might receive and execute an activity which would cause it to exceed this limit.

  • max_task_queue_activities_per_second (Float, nil) (defaults to: nil)

    Sets the maximum number of activities per second the task queue will dispatch, controlled server-side. Note that this only takes effect upon an activity poll request. If multiple workers on the same queue have different values set, they will thrash with the last poller winning.

  • graceful_shutdown_period (Float) (defaults to: 0)

    Amount of time after shutdown is called that activities are given to complete before their tasks are canceled.

  • use_worker_versioning (Boolean) (defaults to: false)

    If true, the ‘build_id` argument must be specified, and this worker opts into the worker versioning feature. This ensures it only receives workflow tasks for workflows which it claims to be compatible with. For more information, see docs.temporal.io/workers#worker-versioning.

  • disable_eager_activity_execution (Boolean) (defaults to: false)

    If true, disables eager activity execution. Eager activity execution is an optimization on some servers that sends activities back to the same worker as the calling workflow if they can run there. This should be set to true for ‘max_task_queue_activities_per_second` to work and in a future version of this API may be implied as such (i.e. this setting will be ignored if that setting is set).

  • illegal_workflow_calls (Hash<String, [:all, Array<Symbol>]>) (defaults to: Worker.default_illegal_workflow_calls)

    Set of illegal workflow calls that are considered unsafe/non-deterministic and will raise if seen. The key of the hash is the fully qualified string class name (no leading ‘::`). The value is either `:all` which means any use of the class, or an array of symbols for methods on the class that cannot be used. The methods refer to either instance or class methods, there is no way to differentiate at this time.

  • workflow_failure_exception_types (Array<Class<Exception>>) (defaults to: [])

    Workflow failure exception types. This is the set of exception types that, if a workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow via task failure. These are applied in addition to the ‘workflow_failure_exception_type` on the workflow definition class itself. If Exception is set, it effectively will fail a workflow/update in all user exception cases.

  • workflow_payload_codec_thread_pool (ThreadPool, nil) (defaults to: nil)

    Thread pool to run payload codec encode/decode within. This is required if a payload codec exists and the worker is not fiber based. Codecs can potentially block execution which is why they need to be run in the background.

  • debug_mode (Boolean) (defaults to: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase))

    If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks if they block the thread for too long. This defaults to true if the ‘TEMPORAL_DEBUG` environment variable is `true` or `1`.

Raises:

  • (ArgumentError)


353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# File 'lib/temporalio/worker.rb', line 353

def initialize(
  client:,
  task_queue:,
  activities: [],
  workflows: [],
  tuner: Tuner.create_fixed,
  activity_executors: ActivityExecutor.defaults,
  workflow_executor: WorkflowExecutor::ThreadPool.default,
  interceptors: [],
  build_id: Worker.default_build_id,
  identity: nil,
  logger: client.options.logger,
  max_cached_workflows: 1000,
  max_concurrent_workflow_task_polls: 5,
  nonsticky_to_sticky_poll_ratio: 0.2,
  max_concurrent_activity_task_polls: 5,
  no_remote_activities: false,
  sticky_queue_schedule_to_start_timeout: 10,
  max_heartbeat_throttle_interval: 60,
  default_heartbeat_throttle_interval: 30,
  max_activities_per_second: nil,
  max_task_queue_activities_per_second: nil,
  graceful_shutdown_period: 0,
  use_worker_versioning: false,
  disable_eager_activity_execution: false,
  illegal_workflow_calls: Worker.default_illegal_workflow_calls,
  workflow_failure_exception_types: [],
  workflow_payload_codec_thread_pool: nil,
  debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
)
  raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty?

  Internal::ProtoUtils.assert_non_reserved_name(task_queue)

  @options = Options.new(
    client:,
    task_queue:,
    activities:,
    workflows:,
    tuner:,
    activity_executors:,
    workflow_executor:,
    interceptors:,
    build_id:,
    identity:,
    logger:,
    max_cached_workflows:,
    max_concurrent_workflow_task_polls:,
    nonsticky_to_sticky_poll_ratio:,
    max_concurrent_activity_task_polls:,
    no_remote_activities:,
    sticky_queue_schedule_to_start_timeout:,
    max_heartbeat_throttle_interval:,
    default_heartbeat_throttle_interval:,
    max_activities_per_second:,
    max_task_queue_activities_per_second:,
    graceful_shutdown_period:,
    use_worker_versioning:,
    disable_eager_activity_execution:,
    illegal_workflow_calls:,
    workflow_failure_exception_types:,
    workflow_payload_codec_thread_pool:,
    debug_mode:
  ).freeze

  # Preload workflow definitions and some workflow settings for the bridge
  workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions(workflows)
  nondeterminism_as_workflow_fail, nondeterminism_as_workflow_fail_for_types =
    Internal::Worker::WorkflowWorker.bridge_workflow_failure_exception_type_options(
      workflow_failure_exception_types:, workflow_definitions:
    )

  # Create the bridge worker
  @bridge_worker = Internal::Bridge::Worker.new(
    client.connection._core_client,
    Internal::Bridge::Worker::Options.new(
      activity: !activities.empty?,
      workflow: !workflows.empty?,
      namespace: client.namespace,
      task_queue:,
      tuner: tuner._to_bridge_options,
      build_id:,
      identity_override: identity,
      max_cached_workflows:,
      max_concurrent_workflow_task_polls:,
      nonsticky_to_sticky_poll_ratio:,
      max_concurrent_activity_task_polls:,
      # For shutdown to work properly, we must disable remote activities
      # ourselves if there are no activities
      no_remote_activities: no_remote_activities || activities.empty?,
      sticky_queue_schedule_to_start_timeout:,
      max_heartbeat_throttle_interval:,
      default_heartbeat_throttle_interval:,
      max_worker_activities_per_second: max_activities_per_second,
      max_task_queue_activities_per_second:,
      graceful_shutdown_period:,
      use_worker_versioning:,
      nondeterminism_as_workflow_fail:,
      nondeterminism_as_workflow_fail_for_types:
    )
  )

  # Collect interceptors from client and params
  @activity_interceptors = (client.options.interceptors + interceptors).select do |i|
    i.is_a?(Interceptor::Activity)
  end
  @workflow_interceptors = (client.options.interceptors + interceptors).select do |i|
    i.is_a?(Interceptor::Workflow)
  end

  # Cancellation for the whole worker
  @worker_shutdown_cancellation = Cancellation.new

  # Create workers
  unless activities.empty?
    @activity_worker = Internal::Worker::ActivityWorker.new(worker: self,
                                                            bridge_worker: @bridge_worker)
  end
  unless workflows.empty?
    @workflow_worker = Internal::Worker::WorkflowWorker.new(
      bridge_worker: @bridge_worker,
      namespace: client.namespace,
      task_queue:,
      workflow_definitions:,
      workflow_executor:,
      logger:,
      data_converter: client.data_converter,
      metric_meter: client.connection.options.runtime.metric_meter,
      workflow_interceptors: @workflow_interceptors,
      disable_eager_activity_execution:,
      illegal_workflow_calls:,
      workflow_failure_exception_types:,
      workflow_payload_codec_thread_pool:,
      debug_mode:
    )
  end

  # Validate worker
  @bridge_worker.validate

  # Mutex needed for accessing and replacing a client
  @client_mutex = Mutex.new
end

Instance Attribute Details

#optionsOptions (readonly)

Returns Options for this worker which has the same attributes as #initialize.

Returns:



282
283
284
# File 'lib/temporalio/worker.rb', line 282

def options
  @options
end

Class Method Details

.default_build_idString

Returns Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby source files in ‘$LOADED_FEATURES`. Users may prefer to set the build ID to a better representation of the source.

Returns:

  • (String)

    Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby source files in ‘$LOADED_FEATURES`. Users may prefer to set the build ID to a better representation of the source.



66
67
68
# File 'lib/temporalio/worker.rb', line 66

def self.default_build_id
  @default_build_id ||= _load_default_build_id
end

.default_illegal_workflow_callsHash<String, [:all, Array<Symbol>]>

Returns Default, immutable set illegal calls used for the ‘illegal_workflow_calls` worker option. See the documentation of that option for more details.

Returns:

  • (Hash<String, [:all, Array<Symbol>]>)

    Default, immutable set illegal calls used for the ‘illegal_workflow_calls` worker option. See the documentation of that option for more details.



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/temporalio/worker.rb', line 236

def self.default_illegal_workflow_calls
  @default_illegal_workflow_calls ||= begin
    hash = {
      'BasicSocket' => :all,
      'Date' => %i[initialize today],
      'DateTime' => %i[initialize now],
      'Dir' => :all,
      'Fiber' => [:set_scheduler],
      'File' => :all,
      'FileTest' => :all,
      'FileUtils' => :all,
      'Find' => :all,
      'GC' => :all,
      'IO' => [
        :read
        # Intentionally leaving out write so puts will work. We don't want to add heavy logic replacing stdout or
        # trying to derive whether it's file vs stdout write.
        #:write
      ],
      'Kernel' => %i[abort at_exit autoload autoload? eval exec exit fork gets load open rand readline readlines
                     spawn srand system test trap],
      'Net::HTTP' => :all,
      'Pathname' => :all,
      # TODO(cretz): Investigate why clock_gettime called from Timeout thread affects this code at all. Stack trace
      # test executing activities inside a timeout will fail if clock_gettime is blocked.
      'Process' => %i[abort argv0 daemon detach exec exit exit! fork kill setpriority setproctitle setrlimit setsid
                      spawn times wait wait2 waitall warmup],
      # TODO(cretz): Allow Ractor.current since exception formatting in error_highlight references it
      # 'Ractor' => :all,
      'Random::Base' => [:initialize],
      'Resolv' => :all,
      'SecureRandom' => :all,
      'Signal' => :all,
      'Socket' => :all,
      'Tempfile' => :all,
      'Thread' => %i[abort_on_exception= exit fork handle_interrupt ignore_deadlock= kill new pass
                     pending_interrupt? report_on_exception= start stop initialize join name= priority= raise run
                     terminate thread_variable_set wakeup],
      'Time' => %i[initialize now]
    } #: Hash[String, :all | Array[Symbol]]
    hash.each_value(&:freeze)
    hash.freeze
  end
end

.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object

Run all workers until cancellation or optional block completes. When the cancellation or block is complete, the workers are shut down. This will return the block result if everything successful or raise an error if not. See #run for details on how worker shutdown works.

Parameters:

  • workers (Array<Worker>)

    Workers to run.

  • cancellation (Cancellation) (defaults to: Cancellation.new)

    Cancellation that can be canceled to shut down all workers.

  • shutdown_signals (Array) (defaults to: [])

    Signals to trap and cause worker shutdown.

  • raise_in_block_on_shutdown (Exception, nil) (defaults to: Error::CanceledError.new('Workers finished'))

    Exception to Thread.raise or Fiber.raise if a block is present and still running on shutdown. If nil, ‘raise` is not used.

  • wait_block_complete (Boolean) (defaults to: true)

    If block given and shutdown caused by something else (e.g. cancellation canceled), whether to wait on the block to complete before returning.

Yields:

  • Optional block. This will be run in a new background thread or fiber. Workers will shut down upon completion of this and, assuming no other failures, return/bubble success/exception of the block.

Returns:

  • (Object)

    Return value of the block or nil of no block given.

Raises:

  • (ArgumentError)


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/temporalio/worker.rb', line 105

def self.run_all(
  *workers,
  cancellation: Cancellation.new,
  shutdown_signals: [],
  raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'),
  wait_block_complete: true,
  &block
)
  # Confirm there is at least one and they are all workers
  raise ArgumentError, 'At least one worker required' if workers.empty?
  raise ArgumentError, 'Not all parameters are workers' unless workers.all? { |w| w.is_a?(Worker) }

  Internal::Bridge.assert_fiber_compatibility!

  # Start the multi runner
  runner = Internal::Worker::MultiRunner.new(workers:, shutdown_signals:)

  # Apply block
  runner.apply_thread_or_fiber_block(&block)

  # Reuse first worker logger
  logger = workers.first&.options&.logger or raise # Never nil

  # On cancel, initiate shutdown
  cancellation.add_cancel_callback do
    logger.info('Cancel invoked, beginning worker shutdown')
    runner.initiate_shutdown
  end

  # Poller loop, run until all pollers shut down
  first_error = nil
  block_result = nil
  loop do
    event = runner.next_event
    # TODO(cretz): Consider improving performance instead of this case statement
    case event
    when Internal::Worker::MultiRunner::Event::PollSuccess
      # Successful poll
      event.worker #: Worker
           ._on_poll_bytes(runner, event.worker_type, event.bytes)
    when Internal::Worker::MultiRunner::Event::PollFailure
      # Poll failure, this causes shutdown of all workers
      logger.error('Poll failure (beginning worker shutdown if not already occurring)')
      logger.error(event.error)
      first_error ||= event.error
      runner.initiate_shutdown
    when Internal::Worker::MultiRunner::Event::WorkflowActivationDecoded
      # Came back from a codec as decoded
      event.workflow_worker.handle_activation(runner:, activation: event.activation, decoded: true)
    when Internal::Worker::MultiRunner::Event::WorkflowActivationComplete
      # An activation is complete
      event.workflow_worker.handle_activation_complete(
        runner:,
        activation_completion: event.activation_completion,
        encoded: event.encoded,
        completion_complete_queue: event.completion_complete_queue
      )
    when Internal::Worker::MultiRunner::Event::WorkflowActivationCompletionComplete
      # Completion complete, only need to log error if it occurs here
      if event.error
        logger.error("Activation completion failed to record on run ID #{event.run_id}")
        logger.error(event.error)
      end
    when Internal::Worker::MultiRunner::Event::PollerShutDown
      # Individual poller shut down. Nothing to do here until we support
      # worker status or something.
    when Internal::Worker::MultiRunner::Event::AllPollersShutDown
      # This is where we break the loop, no more polling can happen
      break
    when Internal::Worker::MultiRunner::Event::BlockSuccess
      logger.info('Block completed, beginning worker shutdown')
      block_result = event
      runner.initiate_shutdown
    when Internal::Worker::MultiRunner::Event::BlockFailure
      logger.error('Block failure (beginning worker shutdown)')
      logger.error(event.error)
      block_result = event
      first_error ||= event.error
      runner.initiate_shutdown
    when Internal::Worker::MultiRunner::Event::ShutdownSignalReceived
      logger.info('Signal received, beginning worker shutdown')
      runner.initiate_shutdown
    else
      raise "Unexpected event: #{event}"
    end
  end

  # Now that all pollers have stopped, let's wait for all to complete
  begin
    runner.wait_complete_and_finalize_shutdown
  rescue StandardError => e
    logger.warn('Failed waiting and finalizing')
    logger.warn(e)
  end

  # If there was a block but not a result yet, we want to raise if that is
  # wanted, and wait if that is wanted
  if block_given? && block_result.nil?
    runner.raise_in_thread_or_fiber_block(raise_in_block_on_shutdown) unless raise_in_block_on_shutdown.nil?
    if wait_block_complete
      event = runner.next_event
      case event
      when Internal::Worker::MultiRunner::Event::BlockSuccess
        logger.info('Block completed (after worker shutdown)')
        block_result = event
      when Internal::Worker::MultiRunner::Event::BlockFailure
        logger.error('Block failure (after worker shutdown)')
        logger.error(event.error)
        block_result = event
        first_error ||= event.error
      when Internal::Worker::MultiRunner::Event::ShutdownSignalReceived
        # Do nothing, waiting for block
      else
        raise "Unexpected event: #{event}"
      end
    end
  end

  # Notify each worker we're done with it
  workers.each(&:_on_shutdown_complete)

  # If there was an shutdown-causing error, we raise that
  if !first_error.nil?
    raise first_error
  elsif block_result.is_a?(Internal::Worker::MultiRunner::Event::BlockSuccess)
    block_result.result
  end
end

Instance Method Details

#clientClient

Returns Client for this worker. This is the same as #client in #options, but surrounded by a mutex to be safe for client replacement in #client=.

Returns:

  • (Client)

    Client for this worker. This is the same as #client in #options, but surrounded by a mutex to be safe for client replacement in #client=.



504
505
506
# File 'lib/temporalio/worker.rb', line 504

def client
  @client_mutex.synchronize { @options.client }
end

#client=(new_client) ⇒ Object

Replace the worker’s client. When this is called, the client is replaced on the internal worker which means any new calls will be made on the new client (but existing calls will still complete on the previous one). This is commonly used for providing a new client with updated authentication credentials.

Parameters:

  • new_client (Client)

    New client to use for new calls.



513
514
515
516
517
518
519
# File 'lib/temporalio/worker.rb', line 513

def client=(new_client)
  @client_mutex.synchronize do
    @bridge_worker.replace_client(new_client.connection._core_client)
    @options = @options.with(client: new_client)
    new_client
  end
end

#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object

Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the worker is shut down. This will return the block result if everything successful or raise an error if not.

Upon shutdown (either via cancellation, block completion, or worker fatal error), the worker immediately stops accepting new work. Then, after an optional grace period, all activities are canceled. This call then waits for every activity and workflow task to complete before returning.

Parameters:

  • cancellation (Cancellation) (defaults to: Cancellation.new)

    Cancellation that can be canceled to shut down this worker.

  • shutdown_signals (Array) (defaults to: [])

    Signals to trap and cause worker shutdown.

  • raise_in_block_on_shutdown (Exception, nil) (defaults to: Error::CanceledError.new('Workers finished'))

    Exception to Thread.raise or Fiber.raise if a block is present and still running on shutdown. If nil, ‘raise` is not used.

  • wait_block_complete (Boolean) (defaults to: true)

    If block given and shutdown caused by something else (e.g. cancellation canceled), whether to wait on the block to complete before returning.

Yields:

  • Optional block. This will be run in a new background thread or fiber. Worker will shut down upon completion of this and, assuming no other failures, return/bubble success/exception of the block.

Returns:

  • (Object)

    Return value of the block or nil of no block given.



537
538
539
540
541
542
543
544
545
# File 'lib/temporalio/worker.rb', line 537

def run(
  cancellation: Cancellation.new,
  shutdown_signals: [],
  raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'),
  wait_block_complete: true,
  &block
)
  Worker.run_all(self, cancellation:, shutdown_signals:, raise_in_block_on_shutdown:, wait_block_complete:, &block)
end

#task_queueString

Returns Task queue set on the worker options.

Returns:

  • (String)

    Task queue set on the worker options.



498
499
500
# File 'lib/temporalio/worker.rb', line 498

def task_queue
  @options.task_queue
end