Class: Temporalio::Internal::Worker::WorkflowInstance::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/internal/worker/workflow_instance/scheduler.rb

Overview

Deterministic Fiber::Scheduler implementation.

Instance Method Summary collapse

Constructor Details

#initialize(instance) ⇒ Scheduler

Returns a new instance of Scheduler.



16
17
18
19
20
21
22
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 16

def initialize(instance)
  @instance = instance
  @fibers = []
  @ready = []
  @wait_conditions = {}
  @wait_condition_counter = 0
end

Instance Method Details

#block(_blocker, timeout = nil) ⇒ Object

Fiber::Scheduler methods

Note, we do not implement many methods here such as io_read and such. While it might seem to make sense to implement them and raise, we actually want to default to the blocking behavior of them not being present. This is so advanced things like logging still work inside of workflows. So we only implement the bare minimum.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 103

def block(_blocker, timeout = nil)
  # TODO(cretz): Make the blocker visible in the stack trace?

  # We just yield because unblock will resume this. We will just wrap in timeout if needed.
  if timeout
    begin
      Timeout.timeout(timeout) { Fiber.yield }
      true
    rescue Timeout::Error
      false
    end
  else
    Fiber.yield
    true
  end
end

#closeObject



120
121
122
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 120

def close
  # Nothing to do here, lifetime of scheduler is controlled by the instance
end

#contextObject



24
25
26
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 24

def context
  @instance.context
end

#fiber(&block) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 124

def fiber(&block)
  if @instance.context_frozen
    raise Workflow::InvalidWorkflowStateError, 'Cannot schedule fibers in this context'
  end

  fiber = Fiber.new do
    block.call # steep:ignore
  ensure
    @fibers.delete(Fiber.current)
  end
  @fibers << fiber
  @ready << fiber
  fiber
end

#io_wait(io, events, timeout) ⇒ Object

Raises:

  • (NotImplementedError)


139
140
141
142
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 139

def io_wait(io, events, timeout)
  # TODO(cretz): This in a blocking fashion?
  raise NotImplementedError, 'TODO'
end

#kernel_sleep(duration = nil) ⇒ Object



144
145
146
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 144

def kernel_sleep(duration = nil)
  Workflow.sleep(duration)
end

#process_wait(pid, flags) ⇒ Object

Raises:

  • (NotImplementedError)


148
149
150
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 148

def process_wait(pid, flags)
  raise NotImplementedError, 'Cannot wait on other processes in workflows'
end

#run_until_all_yieldedObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 28

def run_until_all_yielded
  loop do
    # Run all fibers until all yielded
    while (fiber = @ready.shift)
      fiber.resume
    end

    # Find the _first_ resolvable wait condition and if there, resolve it, and loop again, otherwise return.
    # It is important that we both let fibers get all settled _before_ this and only allow a _single_ wait
    # condition to be satisfied before looping. This allows wait condition users to trust that the line of
    # code after the wait condition still has the condition satisfied.
    # @type var cond_fiber: Fiber?
    cond_fiber = nil
    cond_result = nil
    @wait_conditions.each do |seq, cond|
      next unless (cond_result = cond.first.call)

      cond_fiber = cond[1]
      @wait_conditions.delete(seq)
      break
    end
    return unless cond_fiber

    cond_fiber.resume(cond_result)
  end
end

#stack_traceObject



84
85
86
87
88
89
90
91
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 84

def stack_trace
  # Collect backtraces of known fibers, separating with a blank line. We make sure to remove any lines that
  # reference Temporal paths, and we remove any empty backtraces.
  dir_path = @instance.illegal_call_tracing_disabled { File.dirname(Temporalio._root_file_path) }
  @fibers.map do |fiber|
    fiber.backtrace.reject { |s| s.start_with?(dir_path) }.join("\n")
  end.reject(&:empty?).join("\n\n")
end

#timeout_after(duration, exception_class, *exception_arguments) ⇒ Object



152
153
154
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 152

def timeout_after(duration, exception_class, *exception_arguments, &)
  context.timeout(duration, exception_class, *exception_arguments, summary: 'Timeout timer', &)
end

#unblock(_blocker, fiber) ⇒ Object



156
157
158
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 156

def unblock(_blocker, fiber)
  @ready << fiber
end

#wait_condition(cancellation:, &block) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/temporalio/internal/worker/workflow_instance/scheduler.rb', line 55

def wait_condition(cancellation:, &block)
  raise Workflow::InvalidWorkflowStateError, 'Cannot wait in this context' if @instance.context_frozen

  if cancellation&.canceled?
    raise Error::CanceledError,
          cancellation.canceled_reason || 'Wait condition canceled before started'
  end

  seq = (@wait_condition_counter += 1)
  @wait_conditions[seq] = [block, Fiber.current]

  # Add a cancellation callback
  cancel_callback_key = cancellation&.add_cancel_callback do
    # Only if the condition is still present
    cond = @wait_conditions.delete(seq)
    if cond&.last&.alive?
      cond&.last&.raise(Error::CanceledError.new(cancellation&.canceled_reason || 'Wait condition canceled'))
    end
  end

  # This blocks until a resume is called on this fiber
  result = Fiber.yield

  # Remove cancellation callback (only needed on success)
  cancellation&.remove_cancel_callback(cancel_callback_key) if cancel_callback_key

  result
end