Class: Temporalio::Worker::WorkflowReplayer::ReplayWorker
- Inherits:
-
Object
- Object
- Temporalio::Worker::WorkflowReplayer::ReplayWorker
- Defined in:
- lib/temporalio/worker/workflow_replayer.rb
Overview
Replay worker that can be used to replay individual workflow runs. Only one call to #replay_workflow can be made at a time.
Instance Method Summary collapse
-
#replay_workflow(history, raise_on_replay_failure: true) ⇒ ReplayResult
Replay a workflow history.
Instance Method Details
#replay_workflow(history, raise_on_replay_failure: true) ⇒ ReplayResult
Replay a workflow history.
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 |
# File 'lib/temporalio/worker/workflow_replayer.rb', line 255 def replay_workflow(history, raise_on_replay_failure: true) raise ArgumentError, 'Expected history as WorkflowHistory' unless history.is_a?(WorkflowHistory) # Due to our event processing model, only one can run at a time raise 'Already running' if @running raise 'Replayer shutdown' if @shutdown # Push history proto # TODO(cretz): Unset this @running = true @last_workflow_remove_job = nil begin @bridge_replayer.push_history( history.workflow_id, Api::History::V1::History.new(events: history.events).to_proto ) # Process events until workflow complete until @last_workflow_remove_job event = @runner.next_event case event when Internal::Worker::MultiRunner::Event::PollSuccess @workflow_worker.handle_activation( runner: @runner, activation: Internal::Bridge::Api::WorkflowActivation::WorkflowActivation.decode(event.bytes), decoded: false ) when Internal::Worker::MultiRunner::Event::WorkflowActivationDecoded @workflow_worker.handle_activation(runner: @runner, activation: event.activation, decoded: true) when Internal::Worker::MultiRunner::Event::WorkflowActivationComplete @workflow_worker.handle_activation_complete( runner: @runner, activation_completion: event.activation_completion, encoded: event.encoded, completion_complete_queue: event.completion_complete_queue ) when Internal::Worker::MultiRunner::Event::WorkflowActivationCompletionComplete # Ignore else raise "Unexpected event: #{event}" end end # Create exception if removal is due to error err = if @last_workflow_remove_job.reason == :NONDETERMINISM Workflow::NondeterminismError.new( "#{@last_workflow_remove_job.reason}: #{@last_workflow_remove_job.}" ) elsif !%i[CACHE_FULL LANG_REQUESTED].include?(@last_workflow_remove_job.reason) Workflow::InvalidWorkflowStateError.new( "#{@last_workflow_remove_job.reason}: #{@last_workflow_remove_job.}" ) end # Raise if wanting to raise, otherwise return result raise err if raise_on_replay_failure && err ReplayResult.new(history:, replay_failure: err) ensure @running = false end end |