Class: Temporalio::Internal::Worker::WorkflowWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/internal/worker/workflow_worker.rb

Overview

Worker for handling workflow activations. Most activation work is delegated to the workflow executor.

Defined Under Namespace

Classes: State

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bridge_worker:, namespace:, task_queue:, workflow_definitions:, workflow_executor:, logger:, data_converter:, metric_meter:, workflow_interceptors:, disable_eager_activity_execution:, illegal_workflow_calls:, workflow_failure_exception_types:, workflow_payload_codec_thread_pool:, debug_mode:, on_eviction: nil) ⇒ WorkflowWorker

Returns a new instance of WorkflowWorker.

[View source]

56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 56

def initialize(
  bridge_worker:,
  namespace:,
  task_queue:,
  workflow_definitions:,
  workflow_executor:,
  logger:,
  data_converter:,
  metric_meter:,
  workflow_interceptors:,
  disable_eager_activity_execution:,
  illegal_workflow_calls:,
  workflow_failure_exception_types:,
  workflow_payload_codec_thread_pool:,
  debug_mode:,
  on_eviction: nil
)
  @executor = workflow_executor

  payload_codec = data_converter.payload_codec
  @workflow_payload_codec_thread_pool = workflow_payload_codec_thread_pool
  if !Fiber.current_scheduler && payload_codec && !@workflow_payload_codec_thread_pool
    raise ArgumentError, 'Must have workflow payload codec thread pool if providing codec and not using fibers'
  end

  # If there is a payload codec, we need to build encoding and decoding visitors
  if payload_codec
    @payload_encoding_visitor = Api::PayloadVisitor.new(skip_search_attributes: true) do |payload_or_payloads|
      apply_codec_on_payload_visit(payload_or_payloads) { |payloads| payload_codec.encode(payloads) }
    end
    @payload_decoding_visitor = Api::PayloadVisitor.new(skip_search_attributes: true) do |payload_or_payloads|
      apply_codec_on_payload_visit(payload_or_payloads) { |payloads| payload_codec.decode(payloads) }
    end
  end

  @state = State.new(
    workflow_definitions:,
    bridge_worker:,
    logger:,
    metric_meter:,
    data_converter:,
    deadlock_timeout: debug_mode ? nil : 2.0,
    # TODO(cretz): Make this more performant for the default set?
    illegal_calls: WorkflowInstance::IllegalCallTracer.frozen_validated_illegal_calls(
      illegal_workflow_calls || {}
    ),
    namespace:,
    task_queue:,
    disable_eager_activity_execution:,
    workflow_interceptors:,
    workflow_failure_exception_types: workflow_failure_exception_types.map do |t|
      unless t.is_a?(Class) && t < Exception
        raise ArgumentError, 'All failure types must classes inheriting Exception'
      end

      t
    end.freeze
  )
  @state.on_eviction = on_eviction if on_eviction

  # Validate worker
  @executor._validate_worker(self, @state)
end

Class Method Details

.bridge_workflow_failure_exception_type_options(workflow_failure_exception_types:, workflow_definitions:) ⇒ Object

[View source]

36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 36

def self.bridge_workflow_failure_exception_type_options(
  workflow_failure_exception_types:,
  workflow_definitions:
)
  as_fail = workflow_failure_exception_types.any? do |t|
    t.is_a?(Class) && t >= Workflow::NondeterminismError
  end
  as_fail_for_types = workflow_definitions.values.map do |defn|
    next unless defn.failure_exception_types.any? { |t| t.is_a?(Class) && t >= Workflow::NondeterminismError }

    # If they tried to do this on a dynamic workflow and haven't already set worker-level option, warn
    unless defn.name || as_fail
      warn('Note, dynamic workflows cannot trap non-determinism errors, so worker-level ' \
           'workflow_failure_exception_types should be set to capture that if that is the intention')
    end
    defn.name
  end.compact
  [as_fail, as_fail_for_types]
end

.workflow_definitions(workflows) ⇒ Object

[View source]

16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 16

def self.workflow_definitions(workflows)
  workflows.each_with_object({}) do |workflow, hash|
    # Load definition
    defn = begin
      if workflow.is_a?(Workflow::Definition::Info)
        workflow
      else
        Workflow::Definition::Info.from_class(workflow)
      end
    rescue StandardError
      raise ArgumentError, "Failed loading workflow #{workflow}"
    end

    # Confirm name not in use
    raise ArgumentError, "Multiple workflows named #{defn.name || '<dynamic>'}" if hash.key?(defn.name)

    hash[defn.name] = defn
  end
end

Instance Method Details

#handle_activation(runner:, activation:, decoded:) ⇒ Object

[View source]

120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 120

def handle_activation(runner:, activation:, decoded:)
  # Encode in background if not encoded but it needs to be
  if @payload_encoding_visitor && !decoded
    if Fiber.current_scheduler
      Fiber.schedule { decode_activation(runner, activation) }
    else
      @workflow_payload_codec_thread_pool.execute { decode_activation(runner, activation) }
    end
  else
    @executor._activate(activation, @state) do |activation_completion|
      runner.apply_workflow_activation_complete(workflow_worker: self, activation_completion:, encoded: false)
    end
  end
rescue Exception => e # rubocop:disable Lint/RescueException
  # Should never happen, executors are expected to trap things
  @state.logger.error("Failed issuing activation on workflow run ID: #{activation.run_id}")
  @state.logger.error(e)
end

#handle_activation_complete(runner:, activation_completion:, encoded:, completion_complete_queue:) ⇒ Object

[View source]

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 139

def handle_activation_complete(runner:, activation_completion:, encoded:, completion_complete_queue:)
  if @payload_encoding_visitor && !encoded
    if Fiber.current_scheduler
      Fiber.schedule { encode_activation_completion(runner, activation_completion) }
    else
      @workflow_payload_codec_thread_pool.execute do
        encode_activation_completion(runner, activation_completion)
      end
    end
  else
    @state.bridge_worker.async_complete_workflow_activation(
      activation_completion.run_id, activation_completion.to_proto, completion_complete_queue
    )
  end
end

#on_shutdown_completeObject

[View source]

155
156
157
# File 'lib/temporalio/internal/worker/workflow_worker.rb', line 155

def on_shutdown_complete
  @state.evict_all
end