Class: Temporalio::Internal::Worker::WorkflowWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/internal/worker/workflow_worker.rb

Overview

Worker for handling workflow activations. Most activation work is delegated to the workflow executor.

Defined Under Namespace

Classes: State

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bridge_worker:, namespace:, task_queue:, workflow_definitions:, workflow_executor:, logger:, data_converter:, metric_meter:, workflow_interceptors:, disable_eager_activity_execution:, illegal_workflow_calls:, workflow_failure_exception_types:, workflow_payload_codec_thread_pool:, debug_mode:, on_eviction: nil) ⇒ WorkflowWorker

Returns a new instance of WorkflowWorker.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 56

def initialize(
  bridge_worker:,
  namespace:,
  task_queue:,
  workflow_definitions:,
  workflow_executor:,
  logger:,
  data_converter:,
  metric_meter:,
  workflow_interceptors:,
  disable_eager_activity_execution:,
  illegal_workflow_calls:,
  workflow_failure_exception_types:,
  workflow_payload_codec_thread_pool:,
  debug_mode:,
  on_eviction: nil
)
  @executor = workflow_executor

  payload_codec = data_converter.payload_codec
  @workflow_payload_codec_thread_pool = workflow_payload_codec_thread_pool
  if !Fiber.current_scheduler && payload_codec && !@workflow_payload_codec_thread_pool
    raise ArgumentError, 'Must have workflow payload codec thread pool if providing codec and not using fibers'
  end

  # If there is a payload codec, we need to build encoding and decoding visitors
  if payload_codec
    @payload_encoding_visitor = Api::PayloadVisitor.new(skip_search_attributes: true) do |payload_or_payloads|
      apply_codec_on_payload_visit(payload_or_payloads) { |payloads| payload_codec.encode(payloads) }
    end
    @payload_decoding_visitor = Api::PayloadVisitor.new(skip_search_attributes: true) do |payload_or_payloads|
      apply_codec_on_payload_visit(payload_or_payloads) { |payloads| payload_codec.decode(payloads) }
    end
  end

  @state = State.new(
    workflow_definitions:,
    bridge_worker:,
    logger:,
    metric_meter:,
    data_converter:,
    deadlock_timeout: debug_mode ? nil : 2.0,
    # TODO(cretz): Make this more performant for the default set?
    illegal_calls: WorkflowInstance::IllegalCallTracer.frozen_validated_illegal_calls(
      illegal_workflow_calls || {}
    ),
    namespace:,
    task_queue:,
    disable_eager_activity_execution:,
    workflow_interceptors:,
    workflow_failure_exception_types: workflow_failure_exception_types.map do |t|
      unless t.is_a?(Class) && t < Exception
        raise ArgumentError, 'All failure types must classes inheriting Exception'
      end

      t
    end.freeze
  )
  @state.on_eviction = on_eviction if on_eviction

  # Validate worker
  @executor._validate_worker(self, @state)
end

Class Method Details

.bridge_workflow_failure_exception_type_options(workflow_failure_exception_types:, workflow_definitions:) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 36

def self.bridge_workflow_failure_exception_type_options(
  workflow_failure_exception_types:,
  workflow_definitions:
)
  as_fail = workflow_failure_exception_types.any? do |t|
    t.is_a?(Class) && t >= Workflow::NondeterminismError
  end
  as_fail_for_types = workflow_definitions.values.map do |defn|
    next unless defn.failure_exception_types.any? { |t| t.is_a?(Class) && t >= Workflow::NondeterminismError }

    # If they tried to do this on a dynamic workflow and haven't already set worker-level option, warn
    unless defn.name || as_fail
      warn('Note, dynamic workflows cannot trap non-determinism errors, so worker-level ' \
           'workflow_failure_exception_types should be set to capture that if that is the intention')
    end
    defn.name
  end.compact
  [as_fail, as_fail_for_types]
end

.workflow_definitions(workflows) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 16

def self.workflow_definitions(workflows)
  workflows.each_with_object({}) do |workflow, hash|
    # Load definition
    defn = begin
      if workflow.is_a?(Workflow::Definition::Info)
        workflow
      else
        Workflow::Definition::Info.from_class(workflow)
      end
    rescue StandardError
      raise ArgumentError, "Failed loading workflow #{workflow}"
    end

    # Confirm name not in use
    raise ArgumentError, "Multiple workflows named #{defn.name || '<dynamic>'}" if hash.key?(defn.name)

    hash[defn.name] = defn
  end
end

Instance Method Details

#handle_activation(runner:, activation:, decoded:) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 120

def handle_activation(runner:, activation:, decoded:)
  # Encode in background if not encoded but it needs to be
  if @payload_encoding_visitor && !decoded
    if Fiber.current_scheduler
      Fiber.schedule { decode_activation(runner, activation) }
    else
      @workflow_payload_codec_thread_pool.execute { decode_activation(runner, activation) }
    end
  else
    @executor._activate(activation, @state) do |activation_completion|
      runner.apply_workflow_activation_complete(workflow_worker: self, activation_completion:, encoded: false)
    end
  end
rescue Exception => e # rubocop:disable Lint/RescueException
  # Should never happen, executors are expected to trap things
  @state.logger.error("Failed issuing activation on workflow run ID: #{activation.run_id}")
  @state.logger.error(e)
end

#handle_activation_complete(runner:, activation_completion:, encoded:, completion_complete_queue:) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 139

def handle_activation_complete(runner:, activation_completion:, encoded:, completion_complete_queue:)
  if @payload_encoding_visitor && !encoded
    if Fiber.current_scheduler
      Fiber.schedule { encode_activation_completion(runner, activation_completion) }
    else
      @workflow_payload_codec_thread_pool.execute do
        encode_activation_completion(runner, activation_completion)
      end
    end
  else
    @state.bridge_worker.async_complete_workflow_activation(
      activation_completion.run_id, activation_completion.to_proto, completion_complete_queue
    )
  end
end

#on_shutdown_completeObject



155
156
157
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 155

def on_shutdown_complete
  @state.evict_all
end