Class: Temporalio::Worker
- Inherits:
-
Object
- Object
- Temporalio::Worker
- Defined in:
- lib/temporalio/worker.rb,
lib/temporalio/worker/tuner.rb,
lib/temporalio/worker/interceptor.rb,
lib/temporalio/worker/thread_pool.rb,
lib/temporalio/worker/activity_executor.rb,
lib/temporalio/worker/workflow_executor.rb,
lib/temporalio/worker/workflow_replayer.rb,
lib/temporalio/worker/activity_executor/fiber.rb,
lib/temporalio/worker/activity_executor/thread_pool.rb,
lib/temporalio/worker/workflow_executor/thread_pool.rb
Overview
Worker for processing activities and workflows on a task queue.
Workers are created for a task queue and the items they can run. Then #run is used for running a single worker, or Worker.run_all is used for a collection of workers. These can wait until a block is complete or a Cancellation is canceled.
Defined Under Namespace
Modules: Interceptor Classes: ActivityExecutor, Options, ThreadPool, Tuner, WorkflowExecutor, WorkflowReplayer
Instance Attribute Summary collapse
-
#options ⇒ Options
readonly
Options for this worker which has the same attributes as #initialize.
Class Method Summary collapse
-
.default_build_id ⇒ String
Memoized default build ID.
-
.default_illegal_workflow_calls ⇒ Hash<String, [:all, Array<Symbol>]>
Default, immutable set illegal calls used for the ‘illegal_workflow_calls` worker option.
-
.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run all workers until cancellation or optional block completes.
Instance Method Summary collapse
-
#client ⇒ Client
Client for this worker.
-
#client=(new_client) ⇒ Object
Replace the worker’s client.
-
#initialize(client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, interceptors: [], build_id: Worker.default_build_id, identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, use_worker_versioning: false, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)) ⇒ Worker
constructor
Create a new worker.
-
#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run this worker until cancellation or optional block completes.
-
#task_queue ⇒ String
Task queue set on the worker options.
Constructor Details
#initialize(client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, interceptors: [], build_id: Worker.default_build_id, identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, use_worker_versioning: false, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)) ⇒ Worker
Create a new worker. At least one activity or workflow must be present.
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 |
# File 'lib/temporalio/worker.rb', line 353 def initialize( client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, interceptors: [], build_id: Worker.default_build_id, identity: nil, logger: client..logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, use_worker_versioning: false, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase) ) raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty? Internal::ProtoUtils.assert_non_reserved_name(task_queue) @options = Options.new( client:, task_queue:, activities:, workflows:, tuner:, activity_executors:, workflow_executor:, interceptors:, build_id:, identity:, logger:, max_cached_workflows:, max_concurrent_workflow_task_polls:, nonsticky_to_sticky_poll_ratio:, max_concurrent_activity_task_polls:, no_remote_activities:, sticky_queue_schedule_to_start_timeout:, max_heartbeat_throttle_interval:, default_heartbeat_throttle_interval:, max_activities_per_second:, max_task_queue_activities_per_second:, graceful_shutdown_period:, use_worker_versioning:, disable_eager_activity_execution:, illegal_workflow_calls:, workflow_failure_exception_types:, workflow_payload_codec_thread_pool:, debug_mode: ).freeze # Preload workflow definitions and some workflow settings for the bridge workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions(workflows) nondeterminism_as_workflow_fail, nondeterminism_as_workflow_fail_for_types = Internal::Worker::WorkflowWorker.( workflow_failure_exception_types:, workflow_definitions: ) # Create the bridge worker @bridge_worker = Internal::Bridge::Worker.new( client.connection._core_client, Internal::Bridge::Worker::Options.new( activity: !activities.empty?, workflow: !workflows.empty?, namespace: client.namespace, task_queue:, tuner: tuner., build_id:, identity_override: identity, max_cached_workflows:, max_concurrent_workflow_task_polls:, nonsticky_to_sticky_poll_ratio:, max_concurrent_activity_task_polls:, # For shutdown to work properly, we must disable remote activities # ourselves if there are no activities no_remote_activities: no_remote_activities || activities.empty?, sticky_queue_schedule_to_start_timeout:, max_heartbeat_throttle_interval:, default_heartbeat_throttle_interval:, max_worker_activities_per_second: max_activities_per_second, max_task_queue_activities_per_second:, graceful_shutdown_period:, use_worker_versioning:, nondeterminism_as_workflow_fail:, nondeterminism_as_workflow_fail_for_types: ) ) # Collect interceptors from client and params @activity_interceptors = (client..interceptors + interceptors).select do |i| i.is_a?(Interceptor::Activity) end @workflow_interceptors = (client..interceptors + interceptors).select do |i| i.is_a?(Interceptor::Workflow) end # Cancellation for the whole worker @worker_shutdown_cancellation = Cancellation.new # Create workers unless activities.empty? @activity_worker = Internal::Worker::ActivityWorker.new(worker: self, bridge_worker: @bridge_worker) end unless workflows.empty? @workflow_worker = Internal::Worker::WorkflowWorker.new( bridge_worker: @bridge_worker, namespace: client.namespace, task_queue:, workflow_definitions:, workflow_executor:, logger:, data_converter: client.data_converter, metric_meter: client.connection..runtime.metric_meter, workflow_interceptors: @workflow_interceptors, disable_eager_activity_execution:, illegal_workflow_calls:, workflow_failure_exception_types:, workflow_payload_codec_thread_pool:, debug_mode: ) end # Validate worker @bridge_worker.validate # Mutex needed for accessing and replacing a client @client_mutex = Mutex.new end |
Instance Attribute Details
#options ⇒ Options (readonly)
Returns Options for this worker which has the same attributes as #initialize.
282 283 284 |
# File 'lib/temporalio/worker.rb', line 282 def @options end |
Class Method Details
.default_build_id ⇒ String
Returns Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby source files in ‘$LOADED_FEATURES`. Users may prefer to set the build ID to a better representation of the source.
66 67 68 |
# File 'lib/temporalio/worker.rb', line 66 def self.default_build_id @default_build_id ||= _load_default_build_id end |
.default_illegal_workflow_calls ⇒ Hash<String, [:all, Array<Symbol>]>
Returns Default, immutable set illegal calls used for the ‘illegal_workflow_calls` worker option. See the documentation of that option for more details.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/temporalio/worker.rb', line 236 def self.default_illegal_workflow_calls @default_illegal_workflow_calls ||= begin hash = { 'BasicSocket' => :all, 'Date' => %i[initialize today], 'DateTime' => %i[initialize now], 'Dir' => :all, 'Fiber' => [:set_scheduler], 'File' => :all, 'FileTest' => :all, 'FileUtils' => :all, 'Find' => :all, 'GC' => :all, 'IO' => [ :read # Intentionally leaving out write so puts will work. We don't want to add heavy logic replacing stdout or # trying to derive whether it's file vs stdout write. #:write ], 'Kernel' => %i[abort at_exit autoload autoload? eval exec exit fork gets load open rand readline readlines spawn srand system test trap], 'Net::HTTP' => :all, 'Pathname' => :all, # TODO(cretz): Investigate why clock_gettime called from Timeout thread affects this code at all. Stack trace # test executing activities inside a timeout will fail if clock_gettime is blocked. 'Process' => %i[abort argv0 daemon detach exec exit exit! fork kill setpriority setproctitle setrlimit setsid spawn times wait wait2 waitall warmup], # TODO(cretz): Allow Ractor.current since exception formatting in error_highlight references it # 'Ractor' => :all, 'Random::Base' => [:initialize], 'Resolv' => :all, 'SecureRandom' => :all, 'Signal' => :all, 'Socket' => :all, 'Tempfile' => :all, 'Thread' => %i[abort_on_exception= exit fork handle_interrupt ignore_deadlock= kill new pass pending_interrupt? report_on_exception= start stop initialize join name= priority= raise run terminate thread_variable_set wakeup], 'Time' => %i[initialize now] } #: Hash[String, :all | Array[Symbol]] hash.each_value(&:freeze) hash.freeze end end |
.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run all workers until cancellation or optional block completes. When the cancellation or block is complete, the workers are shut down. This will return the block result if everything successful or raise an error if not. See #run for details on how worker shutdown works.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/temporalio/worker.rb', line 105 def self.run_all( *workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true, &block ) # Confirm there is at least one and they are all workers raise ArgumentError, 'At least one worker required' if workers.empty? raise ArgumentError, 'Not all parameters are workers' unless workers.all? { |w| w.is_a?(Worker) } Internal::Bridge.assert_fiber_compatibility! # Start the multi runner runner = Internal::Worker::MultiRunner.new(workers:, shutdown_signals:) # Apply block runner.apply_thread_or_fiber_block(&block) # Reuse first worker logger logger = workers.first&.&.logger or raise # Never nil # On cancel, initiate shutdown cancellation.add_cancel_callback do logger.info('Cancel invoked, beginning worker shutdown') runner.initiate_shutdown end # Poller loop, run until all pollers shut down first_error = nil block_result = nil loop do event = runner.next_event # TODO(cretz): Consider improving performance instead of this case statement case event when Internal::Worker::MultiRunner::Event::PollSuccess # Successful poll event.worker #: Worker ._on_poll_bytes(runner, event.worker_type, event.bytes) when Internal::Worker::MultiRunner::Event::PollFailure # Poll failure, this causes shutdown of all workers logger.error('Poll failure (beginning worker shutdown if not already occurring)') logger.error(event.error) first_error ||= event.error runner.initiate_shutdown when Internal::Worker::MultiRunner::Event::WorkflowActivationDecoded # Came back from a codec as decoded event.workflow_worker.handle_activation(runner:, activation: event.activation, decoded: true) when Internal::Worker::MultiRunner::Event::WorkflowActivationComplete # An activation is complete event.workflow_worker.handle_activation_complete( runner:, activation_completion: event.activation_completion, encoded: event.encoded, completion_complete_queue: event.completion_complete_queue ) when Internal::Worker::MultiRunner::Event::WorkflowActivationCompletionComplete # Completion complete, only need to log error if it occurs here if event.error logger.error("Activation completion failed to record on run ID #{event.run_id}") logger.error(event.error) end when Internal::Worker::MultiRunner::Event::PollerShutDown # Individual poller shut down. Nothing to do here until we support # worker status or something. when Internal::Worker::MultiRunner::Event::AllPollersShutDown # This is where we break the loop, no more polling can happen break when Internal::Worker::MultiRunner::Event::BlockSuccess logger.info('Block completed, beginning worker shutdown') block_result = event runner.initiate_shutdown when Internal::Worker::MultiRunner::Event::BlockFailure logger.error('Block failure (beginning worker shutdown)') logger.error(event.error) block_result = event first_error ||= event.error runner.initiate_shutdown when Internal::Worker::MultiRunner::Event::ShutdownSignalReceived logger.info('Signal received, beginning worker shutdown') runner.initiate_shutdown else raise "Unexpected event: #{event}" end end # Now that all pollers have stopped, let's wait for all to complete begin runner.wait_complete_and_finalize_shutdown rescue StandardError => e logger.warn('Failed waiting and finalizing') logger.warn(e) end # If there was a block but not a result yet, we want to raise if that is # wanted, and wait if that is wanted if block_given? && block_result.nil? runner.raise_in_thread_or_fiber_block(raise_in_block_on_shutdown) unless raise_in_block_on_shutdown.nil? if wait_block_complete event = runner.next_event case event when Internal::Worker::MultiRunner::Event::BlockSuccess logger.info('Block completed (after worker shutdown)') block_result = event when Internal::Worker::MultiRunner::Event::BlockFailure logger.error('Block failure (after worker shutdown)') logger.error(event.error) block_result = event first_error ||= event.error when Internal::Worker::MultiRunner::Event::ShutdownSignalReceived # Do nothing, waiting for block else raise "Unexpected event: #{event}" end end end # Notify each worker we're done with it workers.each(&:_on_shutdown_complete) # If there was an shutdown-causing error, we raise that if !first_error.nil? raise first_error elsif block_result.is_a?(Internal::Worker::MultiRunner::Event::BlockSuccess) block_result.result end end |
Instance Method Details
#client ⇒ Client
504 505 506 |
# File 'lib/temporalio/worker.rb', line 504 def client @client_mutex.synchronize { @options.client } end |
#client=(new_client) ⇒ Object
Replace the worker’s client. When this is called, the client is replaced on the internal worker which means any new calls will be made on the new client (but existing calls will still complete on the previous one). This is commonly used for providing a new client with updated authentication credentials.
513 514 515 516 517 518 519 |
# File 'lib/temporalio/worker.rb', line 513 def client=(new_client) @client_mutex.synchronize do @bridge_worker.replace_client(new_client.connection._core_client) @options = @options.with(client: new_client) new_client end end |
#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the worker is shut down. This will return the block result if everything successful or raise an error if not.
Upon shutdown (either via cancellation, block completion, or worker fatal error), the worker immediately stops accepting new work. Then, after an optional grace period, all activities are canceled. This call then waits for every activity and workflow task to complete before returning.
537 538 539 540 541 542 543 544 545 |
# File 'lib/temporalio/worker.rb', line 537 def run( cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true, &block ) Worker.run_all(self, cancellation:, shutdown_signals:, raise_in_block_on_shutdown:, wait_block_complete:, &block) end |
#task_queue ⇒ String
Returns Task queue set on the worker options.
498 499 500 |
# File 'lib/temporalio/worker.rb', line 498 def task_queue @options.task_queue end |