Class: Temporalio::Worker::WorkflowReplayer::ReplayWorker
- Inherits:
-
Object
- Object
- Temporalio::Worker::WorkflowReplayer::ReplayWorker
- Defined in:
- lib/temporalio/worker/workflow_replayer.rb
Overview
Replay worker that can be used to replay individual workflow runs. Only one call to #replay_workflow can be made at a time.
Instance Method Summary collapse
-
#replay_workflow(history, raise_on_replay_failure: true) ⇒ ReplayResult
Replay a workflow history.
Instance Method Details
#replay_workflow(history, raise_on_replay_failure: true) ⇒ ReplayResult
Replay a workflow history.
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/temporalio/worker/workflow_replayer.rb', line 285 def replay_workflow(history, raise_on_replay_failure: true) raise ArgumentError, 'Expected history as WorkflowHistory' unless history.is_a?(WorkflowHistory) # Due to our event processing model, only one can run at a time raise 'Already running' if @running raise 'Replayer shutdown' if @shutdown # Push history proto # TODO(cretz): Unset this @running = true @last_workflow_remove_job = nil begin @bridge_replayer.push_history( history.workflow_id, Api::History::V1::History.new(events: history.events).to_proto ) # Process events until workflow complete until @last_workflow_remove_job event = @runner.next_event case event when Internal::Worker::MultiRunner::Event::PollSuccess @workflow_worker.handle_activation( runner: @runner, activation: Internal::Bridge::Api::WorkflowActivation::WorkflowActivation.decode(event.bytes), decoded: false ) when Internal::Worker::MultiRunner::Event::WorkflowActivationDecoded @workflow_worker.handle_activation(runner: @runner, activation: event.activation, decoded: true) when Internal::Worker::MultiRunner::Event::WorkflowActivationComplete @workflow_worker.handle_activation_complete( runner: @runner, activation_completion: event.activation_completion, encoded: event.encoded, completion_complete_queue: event.completion_complete_queue ) when Internal::Worker::MultiRunner::Event::WorkflowActivationCompletionComplete # Ignore else raise "Unexpected event: #{event}" end end # Create exception if removal is due to error err = if @last_workflow_remove_job.reason == :NONDETERMINISM Workflow::NondeterminismError.new( "#{@last_workflow_remove_job.reason}: #{@last_workflow_remove_job.}" ) elsif !%i[CACHE_FULL LANG_REQUESTED].include?(@last_workflow_remove_job.reason) Workflow::InvalidWorkflowStateError.new( "#{@last_workflow_remove_job.reason}: #{@last_workflow_remove_job.}" ) end # Raise if wanting to raise, otherwise return result raise err if raise_on_replay_failure && err ReplayResult.new(history:, replay_failure: err) ensure @running = false end end |