Class: Temporalio::Internal::Worker::WorkflowInstance
- Inherits:
-
Object
- Object
- Temporalio::Internal::Worker::WorkflowInstance
- Defined in:
- lib/temporalio/internal/worker/workflow_instance.rb,
lib/temporalio/internal/worker/workflow_instance/context.rb,
lib/temporalio/internal/worker/workflow_instance/details.rb,
lib/temporalio/internal/worker/workflow_instance/scheduler.rb,
lib/temporalio/internal/worker/workflow_instance/handler_hash.rb,
lib/temporalio/internal/worker/workflow_instance/handler_execution.rb,
lib/temporalio/internal/worker/workflow_instance/replay_safe_logger.rb,
lib/temporalio/internal/worker/workflow_instance/replay_safe_metric.rb,
lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb,
lib/temporalio/internal/worker/workflow_instance/child_workflow_handle.rb,
lib/temporalio/internal/worker/workflow_instance/inbound_implementation.rb,
lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb,
lib/temporalio/internal/worker/workflow_instance/external_workflow_handle.rb,
lib/temporalio/internal/worker/workflow_instance/externally_immutable_hash.rb
Overview
Instance of a user workflow. This is the instance with all state needed to run the workflow and is expected to be cached by the worker for sticky execution.
Defined Under Namespace
Classes: ChildWorkflowHandle, Context, Details, ExternalWorkflowHandle, ExternallyImmutableHash, HandlerExecution, HandlerHash, IllegalCallTracer, InboundImplementation, OutboundImplementation, ReplaySafeLogger, ReplaySafeMetric, Scheduler
Instance Attribute Summary collapse
-
#assert_valid_local_activity ⇒ Object
readonly
Returns the value of attribute assert_valid_local_activity.
-
#cancellation ⇒ Object
readonly
Returns the value of attribute cancellation.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#context_frozen ⇒ Object
readonly
Returns the value of attribute context_frozen.
-
#continue_as_new_suggested ⇒ Object
readonly
Returns the value of attribute continue_as_new_suggested.
-
#current_deployment_version ⇒ Object
readonly
Returns the value of attribute current_deployment_version.
-
#current_details ⇒ Object
Returns the value of attribute current_details.
-
#current_history_length ⇒ Object
readonly
Returns the value of attribute current_history_length.
-
#current_history_size ⇒ Object
readonly
Returns the value of attribute current_history_size.
-
#disable_eager_activity_execution ⇒ Object
readonly
Returns the value of attribute disable_eager_activity_execution.
-
#failure_converter ⇒ Object
readonly
Returns the value of attribute failure_converter.
-
#in_progress_handlers ⇒ Object
readonly
Returns the value of attribute in_progress_handlers.
-
#info ⇒ Object
readonly
Returns the value of attribute info.
-
#io_enabled ⇒ Object
Returns the value of attribute io_enabled.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#payload_converter ⇒ Object
readonly
Returns the value of attribute payload_converter.
-
#pending_activities ⇒ Object
readonly
Returns the value of attribute pending_activities.
-
#pending_child_workflow_starts ⇒ Object
readonly
Returns the value of attribute pending_child_workflow_starts.
-
#pending_child_workflows ⇒ Object
readonly
Returns the value of attribute pending_child_workflows.
-
#pending_external_cancels ⇒ Object
readonly
Returns the value of attribute pending_external_cancels.
-
#pending_external_signals ⇒ Object
readonly
Returns the value of attribute pending_external_signals.
-
#pending_timers ⇒ Object
readonly
Returns the value of attribute pending_timers.
-
#query_handlers ⇒ Object
readonly
Returns the value of attribute query_handlers.
-
#random ⇒ Object
readonly
Returns the value of attribute random.
-
#replaying ⇒ Object
readonly
Returns the value of attribute replaying.
-
#scheduler ⇒ Object
readonly
Returns the value of attribute scheduler.
-
#signal_handlers ⇒ Object
readonly
Returns the value of attribute signal_handlers.
-
#update_handlers ⇒ Object
readonly
Returns the value of attribute update_handlers.
Class Method Summary collapse
Instance Method Summary collapse
- #activate(activation) ⇒ Object
- #add_command(command) ⇒ Object
- #illegal_call_tracing_disabled ⇒ Object
-
#initialize(details) ⇒ WorkflowInstance
constructor
A new instance of WorkflowInstance.
- #instance ⇒ Object
- #memo ⇒ Object
- #metric_meter ⇒ Object
- #now ⇒ Object
- #patch(patch_id:, deprecated:) ⇒ Object
- #search_attributes ⇒ Object
Constructor Details
#initialize(details) ⇒ WorkflowInstance
Returns a new instance of WorkflowInstance.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 63 def initialize(details) # Initialize general state @context = Context.new(self) if details.illegal_calls && !details.illegal_calls.empty? @tracer = IllegalCallTracer.new(details.illegal_calls) end @logger = ReplaySafeLogger.new(logger: details.logger, instance: self) @logger.scoped_values_getter = proc { scoped_logger_info } @runtime_metric_meter = details.metric_meter @io_enabled = details.unsafe_workflow_io_enabled @scheduler = Scheduler.new(self) @payload_converter = details.payload_converter @failure_converter = details.failure_converter @disable_eager_activity_execution = details.disable_eager_activity_execution @pending_activities = {} # Keyed by sequence, value is fiber to resume with proto result @pending_timers = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflow_starts = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflows = {} # Keyed by sequence, value is ChildWorkflowHandle to resolve with proto result @pending_external_signals = {} # Keyed by sequence, value is fiber to resume with proto result @pending_external_cancels = {} # Keyed by sequence, value is fiber to resume with proto result @buffered_signals = {} # Keyed by signal name, value is array of signal jobs # TODO(cretz): Should these be sets instead? Both should be fairly low counts. @in_progress_handlers = [] # Value is HandlerExecution @patches_notified = [] @definition = details.definition @interceptors = details.interceptors @cancellation, @cancellation_proc = Cancellation.new @continue_as_new_suggested = false @current_history_length = 0 @current_history_size = 0 @replaying = false @workflow_failure_exception_types = details.workflow_failure_exception_types @signal_handlers = HandlerHash.new( details.definition.signals, Workflow::Definition::Signal ) do |defn| # New definition, drain buffer. If it's dynamic (i.e. no name) drain them all. to_drain = if defn.name.nil? all_signals = @buffered_signals.values.flatten @buffered_signals.clear all_signals else @buffered_signals.delete(defn.name) end to_drain&.each { |job| apply_signal(job) } end @query_handlers = HandlerHash.new(details.definition.queries, Workflow::Definition::Query) @update_handlers = HandlerHash.new(details.definition.updates, Workflow::Definition::Update) @definition_options = Workflow::DefinitionOptions.new( failure_exception_types: details.definition.failure_exception_types, versioning_behavior: details.definition.versioning_behavior ) @assert_valid_local_activity = details.assert_valid_local_activity # Create all things needed from initial job @init_job = details.initial_activation.jobs.find { |j| !j.initialize_workflow.nil? }&.initialize_workflow raise 'Missing init job from first activation' unless @init_job illegal_call_tracing_disabled do @info = Workflow::Info.new( attempt: @init_job.attempt, continued_run_id: ProtoUtils.string_or(@init_job.continued_from_execution_run_id), cron_schedule: ProtoUtils.string_or(@init_job.cron_schedule), execution_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_execution_timeout), headers: ProtoUtils.headers_from_proto_map(@init_job.headers, @payload_converter) || {}, last_failure: if @init_job.continued_failure @failure_converter.from_failure(@init_job.continued_failure, @payload_converter) end, last_result: if @init_job.last_completion_result @payload_converter.from_payloads(@init_job.last_completion_result).first end, namespace: details.namespace, parent: if @init_job.parent_workflow_info Workflow::Info::ParentInfo.new( namespace: @init_job.parent_workflow_info.namespace, run_id: @init_job.parent_workflow_info.run_id, workflow_id: @init_job.parent_workflow_info.workflow_id ) end, priority: Priority._from_proto(@init_job.priority), retry_policy: (RetryPolicy._from_proto(@init_job.retry_policy) if @init_job.retry_policy), root: if @init_job.root_workflow Workflow::Info::RootInfo.new( run_id: @init_job.root_workflow.run_id, workflow_id: @init_job.root_workflow.workflow_id ) end, run_id: details.initial_activation.run_id, run_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_run_timeout), start_time: ProtoUtils.(@init_job.start_time) || raise, task_queue: details.task_queue, task_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_task_timeout) || raise, workflow_id: @init_job.workflow_id, workflow_type: @init_job.workflow_type ).freeze @random = Random.new(@init_job.randomness_seed) end end |
Instance Attribute Details
#assert_valid_local_activity ⇒ Object (readonly)
Returns the value of attribute assert_valid_local_activity.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def assert_valid_local_activity @assert_valid_local_activity end |
#cancellation ⇒ Object (readonly)
Returns the value of attribute cancellation.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def cancellation @cancellation end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def context @context end |
#context_frozen ⇒ Object (readonly)
Returns the value of attribute context_frozen.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def context_frozen @context_frozen end |
#continue_as_new_suggested ⇒ Object (readonly)
Returns the value of attribute continue_as_new_suggested.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def continue_as_new_suggested @continue_as_new_suggested end |
#current_deployment_version ⇒ Object (readonly)
Returns the value of attribute current_deployment_version.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def current_deployment_version @current_deployment_version end |
#current_details ⇒ Object
Returns the value of attribute current_details.
61 62 63 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 61 def current_details @current_details end |
#current_history_length ⇒ Object (readonly)
Returns the value of attribute current_history_length.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def current_history_length @current_history_length end |
#current_history_size ⇒ Object (readonly)
Returns the value of attribute current_history_size.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def current_history_size @current_history_size end |
#disable_eager_activity_execution ⇒ Object (readonly)
Returns the value of attribute disable_eager_activity_execution.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def disable_eager_activity_execution @disable_eager_activity_execution end |
#failure_converter ⇒ Object (readonly)
Returns the value of attribute failure_converter.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def failure_converter @failure_converter end |
#in_progress_handlers ⇒ Object (readonly)
Returns the value of attribute in_progress_handlers.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def in_progress_handlers @in_progress_handlers end |
#info ⇒ Object (readonly)
Returns the value of attribute info.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def info @info end |
#io_enabled ⇒ Object
Returns the value of attribute io_enabled.
61 62 63 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 61 def io_enabled @io_enabled end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def logger @logger end |
#payload_converter ⇒ Object (readonly)
Returns the value of attribute payload_converter.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def payload_converter @payload_converter end |
#pending_activities ⇒ Object (readonly)
Returns the value of attribute pending_activities.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_activities @pending_activities end |
#pending_child_workflow_starts ⇒ Object (readonly)
Returns the value of attribute pending_child_workflow_starts.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_child_workflow_starts @pending_child_workflow_starts end |
#pending_child_workflows ⇒ Object (readonly)
Returns the value of attribute pending_child_workflows.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_child_workflows @pending_child_workflows end |
#pending_external_cancels ⇒ Object (readonly)
Returns the value of attribute pending_external_cancels.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_external_cancels @pending_external_cancels end |
#pending_external_signals ⇒ Object (readonly)
Returns the value of attribute pending_external_signals.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_external_signals @pending_external_signals end |
#pending_timers ⇒ Object (readonly)
Returns the value of attribute pending_timers.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def pending_timers @pending_timers end |
#query_handlers ⇒ Object (readonly)
Returns the value of attribute query_handlers.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def query_handlers @query_handlers end |
#random ⇒ Object (readonly)
Returns the value of attribute random.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def random @random end |
#replaying ⇒ Object (readonly)
Returns the value of attribute replaying.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def @replaying end |
#scheduler ⇒ Object (readonly)
Returns the value of attribute scheduler.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def scheduler @scheduler end |
#signal_handlers ⇒ Object (readonly)
Returns the value of attribute signal_handlers.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def signal_handlers @signal_handlers end |
#update_handlers ⇒ Object (readonly)
Returns the value of attribute update_handlers.
55 56 57 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 55 def update_handlers @update_handlers end |
Class Method Details
.new_completion_with_failure(run_id:, error:, failure_converter:, payload_converter:) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 38 def self.new_completion_with_failure(run_id:, error:, failure_converter:, payload_converter:) Bridge::Api::WorkflowCompletion::WorkflowActivationCompletion.new( run_id: run_id, failed: Bridge::Api::WorkflowCompletion::Failure.new( failure: begin failure_converter.to_failure(error, payload_converter) rescue Exception => e # rubocop:disable Lint/RescueException Api::Failure::V1::Failure.new( message: "Failed converting error to failure: #{e.}, " \ "original error message: #{error.}", application_failure_info: Api::Failure::V1::ApplicationFailureInfo.new ) end ) ) end |
Instance Method Details
#activate(activation) ⇒ Object
164 165 166 167 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 164 def activate(activation) # Run inside of scheduler run_in_scheduler { activate_internal(activation) } end |
#add_command(command) ⇒ Object
169 170 171 172 173 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 169 def add_command(command) raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen @commands << command end |
#illegal_call_tracing_disabled ⇒ Object
196 197 198 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 196 def illegal_call_tracing_disabled(&) @tracer.disable(&) end |
#instance ⇒ Object
175 176 177 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 175 def instance @instance or raise 'Instance accessed before created' end |
#memo ⇒ Object
186 187 188 189 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 186 def memo # Lazy on first access @memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {}) end |
#metric_meter ⇒ Object
219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 219 def metric_meter @metric_meter ||= ReplaySafeMetric::Meter.new( @runtime_metric_meter.with_additional_attributes( { namespace: info.namespace, task_queue: info.task_queue, workflow_type: info.workflow_type } ) ) end |
#now ⇒ Object
191 192 193 194 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 191 def now # Create each time ProtoUtils.(@now_timestamp) or raise 'Time unexpectedly not present' end |
#patch(patch_id:, deprecated:) ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 200 def patch(patch_id:, deprecated:) # Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the # command. patch_id = patch_id.to_s @patches_memoized ||= {} @patches_memoized.fetch(patch_id) do patched = ! || @patches_notified.include?(patch_id) @patches_memoized[patch_id] = patched if patched add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:) ) ) end patched end end |
#search_attributes ⇒ Object
179 180 181 182 183 184 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 179 def search_attributes # Lazy on first access @search_attributes ||= SearchAttributes._from_proto( @init_job.search_attributes, disable_mutations: true, never_nil: true ) || raise end |