Class: Temporalio::Internal::Worker::WorkflowInstance
- Inherits:
-
Object
- Object
- Temporalio::Internal::Worker::WorkflowInstance
- Defined in:
- lib/temporalio/internal/worker/workflow_instance.rb,
lib/temporalio/internal/worker/workflow_instance/context.rb,
lib/temporalio/internal/worker/workflow_instance/details.rb,
lib/temporalio/internal/worker/workflow_instance/scheduler.rb,
lib/temporalio/internal/worker/workflow_instance/handler_hash.rb,
lib/temporalio/internal/worker/workflow_instance/handler_execution.rb,
lib/temporalio/internal/worker/workflow_instance/replay_safe_logger.rb,
lib/temporalio/internal/worker/workflow_instance/replay_safe_metric.rb,
lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb,
lib/temporalio/internal/worker/workflow_instance/child_workflow_handle.rb,
lib/temporalio/internal/worker/workflow_instance/inbound_implementation.rb,
lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb,
lib/temporalio/internal/worker/workflow_instance/external_workflow_handle.rb,
lib/temporalio/internal/worker/workflow_instance/externally_immutable_hash.rb
Overview
Instance of a user workflow. This is the instance with all state needed to run the workflow and is expected to be cached by the worker for sticky execution.
Defined Under Namespace
Classes: ChildWorkflowHandle, Context, Details, ExternalWorkflowHandle, ExternallyImmutableHash, HandlerExecution, HandlerHash, IllegalCallTracer, InboundImplementation, OutboundImplementation, ReplaySafeLogger, ReplaySafeMetric, Scheduler
Instance Attribute Summary collapse
-
#assert_valid_local_activity ⇒ Object
readonly
Returns the value of attribute assert_valid_local_activity.
-
#cancellation ⇒ Object
readonly
Returns the value of attribute cancellation.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#context_frozen ⇒ Object
readonly
Returns the value of attribute context_frozen.
-
#continue_as_new_suggested ⇒ Object
readonly
Returns the value of attribute continue_as_new_suggested.
-
#current_deployment_version ⇒ Object
readonly
Returns the value of attribute current_deployment_version.
-
#current_details ⇒ Object
Returns the value of attribute current_details.
-
#current_history_length ⇒ Object
readonly
Returns the value of attribute current_history_length.
-
#current_history_size ⇒ Object
readonly
Returns the value of attribute current_history_size.
-
#disable_eager_activity_execution ⇒ Object
readonly
Returns the value of attribute disable_eager_activity_execution.
-
#failure_converter ⇒ Object
readonly
Returns the value of attribute failure_converter.
-
#in_progress_handlers ⇒ Object
readonly
Returns the value of attribute in_progress_handlers.
-
#info ⇒ Object
readonly
Returns the value of attribute info.
-
#io_enabled ⇒ Object
Returns the value of attribute io_enabled.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#payload_converter ⇒ Object
readonly
Returns the value of attribute payload_converter.
-
#pending_activities ⇒ Object
readonly
Returns the value of attribute pending_activities.
-
#pending_child_workflow_starts ⇒ Object
readonly
Returns the value of attribute pending_child_workflow_starts.
-
#pending_child_workflows ⇒ Object
readonly
Returns the value of attribute pending_child_workflows.
-
#pending_external_cancels ⇒ Object
readonly
Returns the value of attribute pending_external_cancels.
-
#pending_external_signals ⇒ Object
readonly
Returns the value of attribute pending_external_signals.
-
#pending_timers ⇒ Object
readonly
Returns the value of attribute pending_timers.
-
#query_handlers ⇒ Object
readonly
Returns the value of attribute query_handlers.
-
#random ⇒ Object
readonly
Returns the value of attribute random.
-
#replaying ⇒ Object
readonly
Returns the value of attribute replaying.
-
#scheduler ⇒ Object
readonly
Returns the value of attribute scheduler.
-
#signal_handlers ⇒ Object
readonly
Returns the value of attribute signal_handlers.
-
#update_handlers ⇒ Object
readonly
Returns the value of attribute update_handlers.
Class Method Summary collapse
Instance Method Summary collapse
- #activate(activation) ⇒ Object
- #add_command(command) ⇒ Object
- #illegal_call_tracing_disabled ⇒ Object
-
#initialize(details) ⇒ WorkflowInstance
constructor
A new instance of WorkflowInstance.
- #instance ⇒ Object
- #memo ⇒ Object
- #metric_meter ⇒ Object
- #now ⇒ Object
- #patch(patch_id:, deprecated:) ⇒ Object
- #search_attributes ⇒ Object
Constructor Details
#initialize(details) ⇒ WorkflowInstance
Returns a new instance of WorkflowInstance.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 63 def initialize(details) # Initialize general state @context = Context.new(self) if details.illegal_calls && !details.illegal_calls.empty? @tracer = IllegalCallTracer.new(details.illegal_calls) end @logger = ReplaySafeLogger.new(logger: details.logger, instance: self) @logger.scoped_values_getter = proc { scoped_logger_info } @runtime_metric_meter = details.metric_meter @io_enabled = details.unsafe_workflow_io_enabled @scheduler = Scheduler.new(self) @payload_converter = details.payload_converter @failure_converter = details.failure_converter @disable_eager_activity_execution = details.disable_eager_activity_execution @pending_activities = {} # Keyed by sequence, value is fiber to resume with proto result @pending_timers = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflow_starts = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflows = {} # Keyed by sequence, value is ChildWorkflowHandle to resolve with proto result @pending_external_signals = {} # Keyed by sequence, value is fiber to resume with proto result @pending_external_cancels = {} # Keyed by sequence, value is fiber to resume with proto result @buffered_signals = {} # Keyed by signal name, value is array of signal jobs # TODO(cretz): Should these be sets instead? Both should be fairly low counts. @in_progress_handlers = [] # Value is HandlerExecution @patches_notified = [] @definition = details.definition @interceptors = details.interceptors @cancellation, @cancellation_proc = Cancellation.new @continue_as_new_suggested = false @current_history_length = 0 @current_history_size = 0 @replaying = false @workflow_failure_exception_types = details.workflow_failure_exception_types @signal_handlers = HandlerHash.new( details.definition.signals, Workflow::Definition::Signal ) do |defn| # New definition, drain buffer. If it's dynamic (i.e. no name) drain them all. to_drain = if defn.name.nil? all_signals = @buffered_signals.values.flatten @buffered_signals.clear all_signals else @buffered_signals.delete(defn.name) end to_drain&.each { |job| apply_signal(job) } end @query_handlers = HandlerHash.new(details.definition.queries, Workflow::Definition::Query) @update_handlers = HandlerHash.new(details.definition.updates, Workflow::Definition::Update) @definition_options = Workflow::DefinitionOptions.new( failure_exception_types: details.definition.failure_exception_types, versioning_behavior: details.definition.versioning_behavior ) @assert_valid_local_activity = details.assert_valid_local_activity # Create all things needed from initial job @init_job = details.initial_activation.jobs.find { |j| !j.initialize_workflow.nil? }&.initialize_workflow raise 'Missing init job from first activation' unless @init_job illegal_call_tracing_disabled do @info = Workflow::Info.new( attempt: @init_job.attempt, continued_run_id: ProtoUtils.string_or(@init_job.continued_from_execution_run_id), cron_schedule: ProtoUtils.string_or(@init_job.cron_schedule), execution_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_execution_timeout), first_execution_run_id: @init_job.first_execution_run_id, headers: ProtoUtils.headers_from_proto_map(@init_job.headers, @payload_converter) || {}, last_failure: if @init_job.continued_failure @failure_converter.from_failure(@init_job.continued_failure, @payload_converter) end, last_result: if @init_job.last_completion_result @payload_converter.from_payloads(@init_job.last_completion_result).first end, namespace: details.namespace, parent: if @init_job.parent_workflow_info Workflow::Info::ParentInfo.new( namespace: @init_job.parent_workflow_info.namespace, run_id: @init_job.parent_workflow_info.run_id, workflow_id: @init_job.parent_workflow_info.workflow_id ) end, priority: Priority._from_proto(@init_job.priority), retry_policy: (RetryPolicy._from_proto(@init_job.retry_policy) if @init_job.retry_policy), root: if @init_job.root_workflow Workflow::Info::RootInfo.new( run_id: @init_job.root_workflow.run_id, workflow_id: @init_job.root_workflow.workflow_id ) end, run_id: details.initial_activation.run_id, run_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_run_timeout), start_time: ProtoUtils.(@init_job.start_time) || raise, task_queue: details.task_queue, task_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_task_timeout) || raise, workflow_id: @init_job.workflow_id, workflow_type: @init_job.workflow_type ).freeze @random = Random.new(@init_job.randomness_seed) end end |
Instance Attribute Details
#assert_valid_local_activity ⇒ Object (readonly)
Returns the value of attribute assert_valid_local_activity.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def assert_valid_local_activity @assert_valid_local_activity end |
#cancellation ⇒ Object (readonly)
Returns the value of attribute cancellation.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def cancellation @cancellation end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def context @context end |
#context_frozen ⇒ Object (readonly)
Returns the value of attribute context_frozen.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def context_frozen @context_frozen end |
#continue_as_new_suggested ⇒ Object (readonly)
Returns the value of attribute continue_as_new_suggested.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def continue_as_new_suggested @continue_as_new_suggested end |
#current_deployment_version ⇒ Object (readonly)
Returns the value of attribute current_deployment_version.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def current_deployment_version @current_deployment_version end |
#current_details ⇒ Object
Returns the value of attribute current_details.
61 62 63 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 61 def current_details @current_details end |
#current_history_length ⇒ Object (readonly)
Returns the value of attribute current_history_length.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def current_history_length @current_history_length end |
#current_history_size ⇒ Object (readonly)
Returns the value of attribute current_history_size.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def current_history_size @current_history_size end |
#disable_eager_activity_execution ⇒ Object (readonly)
Returns the value of attribute disable_eager_activity_execution.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def disable_eager_activity_execution @disable_eager_activity_execution end |
#failure_converter ⇒ Object (readonly)
Returns the value of attribute failure_converter.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def failure_converter @failure_converter end |
#in_progress_handlers ⇒ Object (readonly)
Returns the value of attribute in_progress_handlers.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def in_progress_handlers @in_progress_handlers end |
#info ⇒ Object (readonly)
Returns the value of attribute info.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def info @info end |
#io_enabled ⇒ Object
Returns the value of attribute io_enabled.
61 62 63 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 61 def io_enabled @io_enabled end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def logger @logger end |
#payload_converter ⇒ Object (readonly)
Returns the value of attribute payload_converter.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def payload_converter @payload_converter end |
#pending_activities ⇒ Object (readonly)
Returns the value of attribute pending_activities.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_activities @pending_activities end |
#pending_child_workflow_starts ⇒ Object (readonly)
Returns the value of attribute pending_child_workflow_starts.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_child_workflow_starts @pending_child_workflow_starts end |
#pending_child_workflows ⇒ Object (readonly)
Returns the value of attribute pending_child_workflows.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_child_workflows @pending_child_workflows end |
#pending_external_cancels ⇒ Object (readonly)
Returns the value of attribute pending_external_cancels.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_external_cancels @pending_external_cancels end |
#pending_external_signals ⇒ Object (readonly)
Returns the value of attribute pending_external_signals.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_external_signals @pending_external_signals end |
#pending_timers ⇒ Object (readonly)
Returns the value of attribute pending_timers.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_timers @pending_timers end |
#query_handlers ⇒ Object (readonly)
Returns the value of attribute query_handlers.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def query_handlers @query_handlers end |
#random ⇒ Object (readonly)
Returns the value of attribute random.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def random @random end |
#replaying ⇒ Object (readonly)
Returns the value of attribute replaying.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def @replaying end |
#scheduler ⇒ Object (readonly)
Returns the value of attribute scheduler.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def scheduler @scheduler end |
#signal_handlers ⇒ Object (readonly)
Returns the value of attribute signal_handlers.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def signal_handlers @signal_handlers end |
#update_handlers ⇒ Object (readonly)
Returns the value of attribute update_handlers.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def update_handlers @update_handlers end |
Class Method Details
.new_completion_with_failure(run_id:, error:, failure_converter:, payload_converter:) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 38 def self.new_completion_with_failure(run_id:, error:, failure_converter:, payload_converter:) Bridge::Api::WorkflowCompletion::WorkflowActivationCompletion.new( run_id: run_id, failed: Bridge::Api::WorkflowCompletion::Failure.new( failure: begin failure_converter.to_failure(error, payload_converter) rescue Exception => e # rubocop:disable Lint/RescueException Api::Failure::V1::Failure.new( message: "Failed converting error to failure: #{e.}, " \ "original error message: #{error.}", application_failure_info: Api::Failure::V1::ApplicationFailureInfo.new ) end ) ) end |
Instance Method Details
#activate(activation) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 165 def activate(activation) # Run inside of scheduler (removed on ensure) Fiber.set_scheduler(@scheduler) # Reset some activation state @commands = [] @current_activation_error = nil @continue_as_new_suggested = activation.continue_as_new_suggested @current_deployment_version = WorkerDeploymentVersion._from_bridge( activation.deployment_version_for_current_task ) @current_history_length = activation.history_length @current_history_size = activation.history_size_bytes @replaying = activation. @now_timestamp = activation. # Apply jobs and run event loop begin # Create instance if it doesn't already exist @instance ||= with_context_frozen { create_instance } # Apply jobs activation.jobs.each { |job| apply(job) } # Schedule primary 'execute' if not already running (i.e. this is # the first activation) @primary_fiber ||= schedule(top_level: true) { run_workflow } # Run the event loop in the tracer if it exists if @tracer @tracer.enable { @scheduler.run_until_all_yielded } else @scheduler.run_until_all_yielded end rescue Exception => e # rubocop:disable Lint/RescueException on_top_level_exception(e) end # If we are not replaying and workflow is complete but not a # failure (i.e. success, continue as new, or cancel), we warn for # any unfinished handlers. if !@replaying && @commands.any? do |c| !c.complete_workflow_execution.nil? || !c.continue_as_new_workflow_execution.nil? || !c.cancel_workflow_execution.nil? end warn_on_any_unfinished_handlers end # Return success or failure if @current_activation_error @logger.replay_safety_disabled do @logger.warn('Failed activation') @logger.warn(@current_activation_error) end WorkflowInstance.new_completion_with_failure( run_id: activation.run_id, error: @current_activation_error, failure_converter: @failure_converter, payload_converter: @payload_converter ) else Bridge::Api::WorkflowCompletion::WorkflowActivationCompletion.new( run_id: activation.run_id, successful: Bridge::Api::WorkflowCompletion::Success.new( commands: @commands, versioning_behavior: @definition_options.versioning_behavior ) ) end ensure @commands = nil @current_activation_error = nil Fiber.set_scheduler(nil) end |
#add_command(command) ⇒ Object
240 241 242 243 244 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 240 def add_command(command) raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen @commands << command end |
#illegal_call_tracing_disabled ⇒ Object
267 268 269 270 271 272 273 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 267 def illegal_call_tracing_disabled(&) if @tracer @tracer.disable_temporarily(&) else yield end end |
#instance ⇒ Object
246 247 248 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 246 def instance @instance or raise 'Instance accessed before created' end |
#memo ⇒ Object
257 258 259 260 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 257 def memo # Lazy on first access @memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {}) end |
#metric_meter ⇒ Object
294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 294 def metric_meter @metric_meter ||= ReplaySafeMetric::Meter.new( @runtime_metric_meter.with_additional_attributes( { namespace: info.namespace, task_queue: info.task_queue, workflow_type: info.workflow_type } ) ) end |
#now ⇒ Object
262 263 264 265 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 262 def now # Create each time ProtoUtils.(@now_timestamp) or raise 'Time unexpectedly not present' end |
#patch(patch_id:, deprecated:) ⇒ Object
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 275 def patch(patch_id:, deprecated:) # Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the # command. patch_id = patch_id.to_s @patches_memoized ||= {} @patches_memoized.fetch(patch_id) do patched = ! || @patches_notified.include?(patch_id) @patches_memoized[patch_id] = patched if patched add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:) ) ) end patched end end |
#search_attributes ⇒ Object
250 251 252 253 254 255 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 250 def search_attributes # Lazy on first access @search_attributes ||= SearchAttributes._from_proto( @init_job.search_attributes, disable_mutations: true, never_nil: true ) || raise end |