Class: Temporalio::Internal::Worker::WorkflowWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/internal/worker/workflow_worker.rb

Overview

Worker for handling workflow activations. Most activation work is delegated to the workflow executor.

Defined Under Namespace

Classes: State

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bridge_worker:, namespace:, task_queue:, workflow_definitions:, workflow_executor:, logger:, data_converter:, metric_meter:, workflow_interceptors:, disable_eager_activity_execution:, illegal_workflow_calls:, workflow_failure_exception_types:, workflow_payload_codec_thread_pool:, unsafe_workflow_io_enabled:, debug_mode:, assert_valid_local_activity:, on_eviction: nil) ⇒ WorkflowWorker

Returns a new instance of WorkflowWorker.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 62

def initialize(
  bridge_worker:,
  namespace:,
  task_queue:,
  workflow_definitions:,
  workflow_executor:,
  logger:,
  data_converter:,
  metric_meter:,
  workflow_interceptors:,
  disable_eager_activity_execution:,
  illegal_workflow_calls:,
  workflow_failure_exception_types:,
  workflow_payload_codec_thread_pool:,
  unsafe_workflow_io_enabled:,
  debug_mode:,
  assert_valid_local_activity:, on_eviction: nil
)
  @executor = workflow_executor

  payload_codec = data_converter.payload_codec
  @workflow_payload_codec_thread_pool = workflow_payload_codec_thread_pool
  if !Fiber.current_scheduler && payload_codec && !@workflow_payload_codec_thread_pool
    raise ArgumentError, 'Must have workflow payload codec thread pool if providing codec and not using fibers'
  end

  # If there is a payload codec, we need to build encoding and decoding visitors
  if payload_codec
    @payload_encoding_visitor = Api::PayloadVisitor.new(skip_search_attributes: true) do |payload_or_payloads|
      apply_codec_on_payload_visit(payload_or_payloads) { |payloads| payload_codec.encode(payloads) }
    end
    @payload_decoding_visitor = Api::PayloadVisitor.new(skip_search_attributes: true) do |payload_or_payloads|
      apply_codec_on_payload_visit(payload_or_payloads) { |payloads| payload_codec.decode(payloads) }
    end
  end

  @state = State.new(
    workflow_definitions:,
    bridge_worker:,
    logger:,
    metric_meter:,
    data_converter:,
    deadlock_timeout: debug_mode ? nil : 2.0,
    # TODO(cretz): Make this more performant for the default set?
    illegal_calls: WorkflowInstance::IllegalCallTracer.frozen_validated_illegal_calls(
      illegal_workflow_calls || {}
    ),
    namespace:,
    task_queue:,
    disable_eager_activity_execution:,
    workflow_interceptors:,
    workflow_failure_exception_types: workflow_failure_exception_types.map do |t|
      unless t.is_a?(Class) && t < Exception
        raise ArgumentError, 'All failure types must classes inheriting Exception'
      end

      t
    end.freeze,
    unsafe_workflow_io_enabled:,
    assert_valid_local_activity:
  )
  @state.on_eviction = on_eviction if on_eviction

  # Validate worker
  @executor._validate_worker(self, @state)
end

Class Method Details

.bridge_workflow_failure_exception_type_options(workflow_failure_exception_types:, workflow_definitions:) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 42

def self.bridge_workflow_failure_exception_type_options(
  workflow_failure_exception_types:,
  workflow_definitions:
)
  as_fail = workflow_failure_exception_types.any? do |t|
    t.is_a?(Class) && t >= Workflow::NondeterminismError
  end
  as_fail_for_types = workflow_definitions.values.map do |defn|
    next unless defn.failure_exception_types.any? { |t| t.is_a?(Class) && t >= Workflow::NondeterminismError }

    # If they tried to do this on a dynamic workflow and haven't already set worker-level option, warn
    unless defn.name || as_fail
      warn('Note, dynamic workflows cannot trap non-determinism errors, so worker-level ' \
           'workflow_failure_exception_types should be set to capture that if that is the intention')
    end
    defn.name
  end.compact
  [as_fail, as_fail_for_types]
end

.workflow_definitions(workflows, should_enforce_versioning_behavior:) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 16

def self.workflow_definitions(workflows, should_enforce_versioning_behavior:)
  workflows.each_with_object({}) do |workflow, hash|
    # Load definition
    defn = begin
      if workflow.is_a?(Workflow::Definition::Info)
        workflow
      else
        Workflow::Definition::Info.from_class(workflow)
      end
    rescue StandardError
      raise ArgumentError, "Failed loading workflow #{workflow}"
    end

    # Confirm name not in use
    raise ArgumentError, "Multiple workflows named #{defn.name || '<dynamic>'}" if hash.key?(defn.name)

    # Enforce versioning behavior is set when versioning is on
    if should_enforce_versioning_behavior &&
       defn.versioning_behavior == VersioningBehavior::UNSPECIFIED && !defn.dynamic_options_method
      raise ArgumentError, "Workflow #{defn.name} must specify a versioning behavior"
    end

    hash[defn.name] = defn
  end
end

Instance Method Details

#handle_activation(runner:, activation:, decoded:) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 129

def handle_activation(runner:, activation:, decoded:)
  # Encode in background if not encoded but it needs to be
  if @payload_encoding_visitor && !decoded
    if Fiber.current_scheduler
      Fiber.schedule { decode_activation(runner, activation) }
    else
      @workflow_payload_codec_thread_pool.execute { decode_activation(runner, activation) }
    end
  else
    @executor._activate(activation, @state) do |activation_completion|
      runner.apply_workflow_activation_complete(workflow_worker: self, activation_completion:, encoded: false)
    end
  end
rescue Exception => e # rubocop:disable Lint/RescueException
  # Should never happen, executors are expected to trap things
  @state.logger.error("Failed issuing activation on workflow run ID: #{activation.run_id}")
  @state.logger.error(e)
end

#handle_activation_complete(runner:, activation_completion:, encoded:, completion_complete_queue:) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 148

def handle_activation_complete(runner:, activation_completion:, encoded:, completion_complete_queue:)
  if @payload_encoding_visitor && !encoded
    if Fiber.current_scheduler
      Fiber.schedule { encode_activation_completion(runner, activation_completion) }
    else
      @workflow_payload_codec_thread_pool.execute do
        encode_activation_completion(runner, activation_completion)
      end
    end
  else
    @state.bridge_worker.async_complete_workflow_activation(
      activation_completion.run_id, activation_completion.to_proto, completion_complete_queue
    )
  end
end

#on_shutdown_completeObject



164
165
166
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 164

def on_shutdown_complete
  @state.evict_all
end