Class: Temporalio::Worker::WorkflowReplayer

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker/workflow_replayer.rb

Overview

Replayer to replay workflows from existing history.

Defined Under Namespace

Classes: Options, ReplayResult, ReplayWorker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workflows:, namespace: 'ReplayNamespace', task_queue: 'ReplayTaskQueue', data_converter: Converters::DataConverter.default, workflow_executor: WorkflowExecutor::ThreadPool.default, interceptors: [], build_id: Worker.default_build_id, identity: nil, logger: Logger.new($stdout, level: Logger::WARN), illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase), runtime: Runtime.default) { ... } ⇒ WorkflowReplayer

Create a new replayer. This combines some options from both Temporalio::Worker#initialize and Client#initialize.

Parameters:

  • workflows (Array<Class<Workflow::Definition>>)

    Workflows for this replayer.

  • namespace (String) (defaults to: 'ReplayNamespace')

    Namespace as set in the workflow info.

  • task_queue (String) (defaults to: 'ReplayTaskQueue')

    Task queue as set in the workflow info.

  • data_converter (Converters::DataConverter) (defaults to: Converters::DataConverter.default)

    Data converter to use for all data conversions to/from payloads.

  • workflow_executor (WorkflowExecutor) (defaults to: WorkflowExecutor::ThreadPool.default)

    Workflow executor that workflow tasks run within. This must be a Temporalio::Worker::WorkflowExecutor::ThreadPool currently.

  • interceptors (Array<Interceptor::Workflow>) (defaults to: [])

    Workflow interceptors.

  • build_id (String) (defaults to: Worker.default_build_id)

    Unique identifier for the current runtime. This is best set as a unique value representing all code and should change only when code does. This can be something like a git commit hash. If unset, default is hash of known Ruby code.

  • identity (String, nil) (defaults to: nil)

    Override the identity for this replater.

  • logger (Logger) (defaults to: Logger.new($stdout, level: Logger::WARN))

    Logger to use. Defaults to stdout with warn level. Callers setting this logger are responsible for closing it.

  • illegal_workflow_calls (Hash<String, [:all, Array<Symbol>]>) (defaults to: Worker.default_illegal_workflow_calls)

    Set of illegal workflow calls that are considered unsafe/non-deterministic and will raise if seen. The key of the hash is the fully qualified string class name (no leading ‘::`). The value is either `:all` which means any use of the class, or an array of symbols for methods on the class that cannot be used. The methods refer to either instance or class methods, there is no way to differentiate at this time.

  • workflow_failure_exception_types (Array<Class<Exception>>) (defaults to: [])

    Workflow failure exception types. This is the set of exception types that, if a workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow via task failure. These are applied in addition to the ‘workflow_failure_exception_type` on the workflow definition class itself. If Exception is set, it effectively will fail a workflow/update in all user exception cases.

  • workflow_payload_codec_thread_pool (ThreadPool, nil) (defaults to: nil)

    Thread pool to run payload codec encode/decode within. This is required if a payload codec exists and the worker is not fiber based. Codecs can potentially block execution which is why they need to be run in the background.

  • debug_mode (Boolean) (defaults to: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase))

    If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks if they block the thread for too long. This defaults to true if the ‘TEMPORAL_DEBUG` environment variable is `true` or `1`.

  • runtime (Runtime) (defaults to: Runtime.default)

    Runtime for this replayer.

Yields:

  • If a block is present, this is the equivalent of calling #with_replay_worker with the block and discarding the result.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/temporalio/worker/workflow_replayer.rb', line 79

def initialize(
  workflows:,
  namespace: 'ReplayNamespace',
  task_queue: 'ReplayTaskQueue',
  data_converter: Converters::DataConverter.default,
  workflow_executor: WorkflowExecutor::ThreadPool.default,
  interceptors: [],
  build_id: Worker.default_build_id,
  identity: nil,
  logger: Logger.new($stdout, level: Logger::WARN),
  illegal_workflow_calls: Worker.default_illegal_workflow_calls,
  workflow_failure_exception_types: [],
  workflow_payload_codec_thread_pool: nil,
  debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase),
  runtime: Runtime.default,
  &
)
  @options = Options.new(
    workflows:,
    namespace:,
    task_queue:,
    data_converter:,
    workflow_executor:,
    interceptors:,
    build_id:,
    identity:,
    logger:,
    illegal_workflow_calls:,
    workflow_failure_exception_types:,
    workflow_payload_codec_thread_pool:,
    debug_mode:,
    runtime:
  ).freeze
  # Preload definitions and other settings
  @workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions(workflows)
  @nondeterminism_as_workflow_fail, @nondeterminism_as_workflow_fail_for_types =
    Internal::Worker::WorkflowWorker.bridge_workflow_failure_exception_type_options(
      workflow_failure_exception_types:, workflow_definitions: @workflow_definitions
    )
  # If there is a block, we'll go ahead and assume it's for with_replay_worker
  with_replay_worker(&) if block_given? # steep:ignore
end

Instance Attribute Details

#optionsOptions (readonly)

Returns Options for this replayer which has the same attributes as #initialize.

Returns:



41
42
43
# File 'lib/temporalio/worker/workflow_replayer.rb', line 41

def options
  @options
end

Instance Method Details

#replay_workflow(history, raise_on_replay_failure: true) ⇒ ReplayResult

Replay a workflow history.

If doing multiple histories, it is better to use #replay_workflows or #with_replay_worker since they create a replay worker just once instead of each time like this call does.

Parameters:

Returns:



132
133
134
# File 'lib/temporalio/worker/workflow_replayer.rb', line 132

def replay_workflow(history, raise_on_replay_failure: true)
  with_replay_worker { |worker| worker.replay_workflow(history, raise_on_replay_failure:) }
end

#replay_workflows(histories, raise_on_replay_failure: false) ⇒ Array<ReplayResult>

Replay multiple workflow histories.

Parameters:

Returns:



143
144
145
146
147
# File 'lib/temporalio/worker/workflow_replayer.rb', line 143

def replay_workflows(histories, raise_on_replay_failure: false)
  with_replay_worker do |worker|
    histories.map { |h| worker.replay_workflow(h, raise_on_replay_failure:) }
  end
end

#with_replay_worker {|Worker| ... } ⇒ Object

Run a block of code with a ReplayWorker to execute replays.

Yields:

  • Block of code to run with a replay worker.

Yield Parameters:

  • Worker (ReplayWorker)

    to run replays on. Note, only one workflow can replay at a time.

Yield Returns:

  • (Object)

    Result of the block.



154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/temporalio/worker/workflow_replayer.rb', line 154

def with_replay_worker(&)
  worker = ReplayWorker.new(
    options:,
    workflow_definitions: @workflow_definitions,
    nondeterminism_as_workflow_fail: @nondeterminism_as_workflow_fail,
    nondeterminism_as_workflow_fail_for_types: @nondeterminism_as_workflow_fail_for_types
  )
  begin
    yield worker
  ensure
    worker._shutdown
  end
end