Class: Temporalio::Internal::Worker::WorkflowWorker
- Inherits:
-
Object
- Object
- Temporalio::Internal::Worker::WorkflowWorker
- Defined in:
- lib/temporalio/internal/worker/workflow_worker.rb
Overview
Worker for handling workflow activations. Most activation work is delegated to the workflow executor.
Defined Under Namespace
Classes: State
Class Method Summary collapse
- .bridge_workflow_failure_exception_type_options(workflow_failure_exception_types:, workflow_definitions:) ⇒ Object
- .workflow_definitions(workflows) ⇒ Object
Instance Method Summary collapse
- #handle_activation(runner:, activation:, decoded:) ⇒ Object
- #handle_activation_complete(runner:, activation_completion:, encoded:, completion_complete_queue:) ⇒ Object
-
#initialize(bridge_worker:, namespace:, task_queue:, workflow_definitions:, workflow_executor:, logger:, data_converter:, metric_meter:, workflow_interceptors:, disable_eager_activity_execution:, illegal_workflow_calls:, workflow_failure_exception_types:, workflow_payload_codec_thread_pool:, debug_mode:, on_eviction: nil) ⇒ WorkflowWorker
constructor
A new instance of WorkflowWorker.
- #on_shutdown_complete ⇒ Object
Constructor Details
#initialize(bridge_worker:, namespace:, task_queue:, workflow_definitions:, workflow_executor:, logger:, data_converter:, metric_meter:, workflow_interceptors:, disable_eager_activity_execution:, illegal_workflow_calls:, workflow_failure_exception_types:, workflow_payload_codec_thread_pool:, debug_mode:, on_eviction: nil) ⇒ WorkflowWorker
Returns a new instance of WorkflowWorker.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 56 def initialize( bridge_worker:, namespace:, task_queue:, workflow_definitions:, workflow_executor:, logger:, data_converter:, metric_meter:, workflow_interceptors:, disable_eager_activity_execution:, illegal_workflow_calls:, workflow_failure_exception_types:, workflow_payload_codec_thread_pool:, debug_mode:, on_eviction: nil ) @executor = workflow_executor payload_codec = data_converter.payload_codec @workflow_payload_codec_thread_pool = workflow_payload_codec_thread_pool if !Fiber.current_scheduler && payload_codec && !@workflow_payload_codec_thread_pool raise ArgumentError, 'Must have workflow payload codec thread pool if providing codec and not using fibers' end # If there is a payload codec, we need to build encoding and decoding visitors if payload_codec @payload_encoding_visitor = Api::PayloadVisitor.new(skip_search_attributes: true) do |payload_or_payloads| apply_codec_on_payload_visit(payload_or_payloads) { |payloads| payload_codec.encode(payloads) } end @payload_decoding_visitor = Api::PayloadVisitor.new(skip_search_attributes: true) do |payload_or_payloads| apply_codec_on_payload_visit(payload_or_payloads) { |payloads| payload_codec.decode(payloads) } end end @state = State.new( workflow_definitions:, bridge_worker:, logger:, metric_meter:, data_converter:, deadlock_timeout: debug_mode ? nil : 2.0, # TODO(cretz): Make this more performant for the default set? illegal_calls: WorkflowInstance::IllegalCallTracer.frozen_validated_illegal_calls( illegal_workflow_calls || {} ), namespace:, task_queue:, disable_eager_activity_execution:, workflow_interceptors:, workflow_failure_exception_types: workflow_failure_exception_types.map do |t| unless t.is_a?(Class) && t < Exception raise ArgumentError, 'All failure types must classes inheriting Exception' end t end.freeze ) @state.on_eviction = on_eviction if on_eviction # Validate worker @executor._validate_worker(self, @state) end |
Class Method Details
.bridge_workflow_failure_exception_type_options(workflow_failure_exception_types:, workflow_definitions:) ⇒ Object
[View source]
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 36 def self.( workflow_failure_exception_types:, workflow_definitions: ) as_fail = workflow_failure_exception_types.any? do |t| t.is_a?(Class) && t >= Workflow::NondeterminismError end as_fail_for_types = workflow_definitions.values.map do |defn| next unless defn.failure_exception_types.any? { |t| t.is_a?(Class) && t >= Workflow::NondeterminismError } # If they tried to do this on a dynamic workflow and haven't already set worker-level option, warn unless defn.name || as_fail warn('Note, dynamic workflows cannot trap non-determinism errors, so worker-level ' \ 'workflow_failure_exception_types should be set to capture that if that is the intention') end defn.name end.compact [as_fail, as_fail_for_types] end |
.workflow_definitions(workflows) ⇒ Object
[View source]
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 16 def self.workflow_definitions(workflows) workflows.each_with_object({}) do |workflow, hash| # Load definition defn = begin if workflow.is_a?(Workflow::Definition::Info) workflow else Workflow::Definition::Info.from_class(workflow) end rescue StandardError raise ArgumentError, "Failed loading workflow #{workflow}" end # Confirm name not in use raise ArgumentError, "Multiple workflows named #{defn.name || '<dynamic>'}" if hash.key?(defn.name) hash[defn.name] = defn end end |
Instance Method Details
#handle_activation(runner:, activation:, decoded:) ⇒ Object
[View source]
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 120 def handle_activation(runner:, activation:, decoded:) # Encode in background if not encoded but it needs to be if @payload_encoding_visitor && !decoded if Fiber.current_scheduler Fiber.schedule { decode_activation(runner, activation) } else @workflow_payload_codec_thread_pool.execute { decode_activation(runner, activation) } end else @executor._activate(activation, @state) do |activation_completion| runner.apply_workflow_activation_complete(workflow_worker: self, activation_completion:, encoded: false) end end rescue Exception => e # rubocop:disable Lint/RescueException # Should never happen, executors are expected to trap things @state.logger.error("Failed issuing activation on workflow run ID: #{activation.run_id}") @state.logger.error(e) end |
#handle_activation_complete(runner:, activation_completion:, encoded:, completion_complete_queue:) ⇒ Object
[View source]
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 139 def handle_activation_complete(runner:, activation_completion:, encoded:, completion_complete_queue:) if @payload_encoding_visitor && !encoded if Fiber.current_scheduler Fiber.schedule { encode_activation_completion(runner, activation_completion) } else @workflow_payload_codec_thread_pool.execute do encode_activation_completion(runner, activation_completion) end end else @state.bridge_worker.async_complete_workflow_activation( activation_completion.run_id, activation_completion.to_proto, completion_complete_queue ) end end |
#on_shutdown_complete ⇒ Object
[View source]
155 156 157 |
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 155 def on_shutdown_complete @state.evict_all end |