Class: Temporalio::Internal::Worker::WorkflowInstance
- Inherits:
-
Object
- Object
- Temporalio::Internal::Worker::WorkflowInstance
- Defined in:
- lib/temporalio/internal/worker/workflow_instance.rb,
lib/temporalio/internal/worker/workflow_instance/context.rb,
lib/temporalio/internal/worker/workflow_instance/details.rb,
lib/temporalio/internal/worker/workflow_instance/scheduler.rb,
lib/temporalio/internal/worker/workflow_instance/handler_hash.rb,
lib/temporalio/internal/worker/workflow_instance/handler_execution.rb,
lib/temporalio/internal/worker/workflow_instance/replay_safe_logger.rb,
lib/temporalio/internal/worker/workflow_instance/replay_safe_metric.rb,
lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb,
lib/temporalio/internal/worker/workflow_instance/child_workflow_handle.rb,
lib/temporalio/internal/worker/workflow_instance/inbound_implementation.rb,
lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb,
lib/temporalio/internal/worker/workflow_instance/external_workflow_handle.rb,
lib/temporalio/internal/worker/workflow_instance/externally_immutable_hash.rb
Overview
Instance of a user workflow. This is the instance with all state needed to run the workflow and is expected to be cached by the worker for sticky execution.
Defined Under Namespace
Classes: ChildWorkflowHandle, Context, Details, ExternalWorkflowHandle, ExternallyImmutableHash, HandlerExecution, HandlerHash, IllegalCallTracer, InboundImplementation, OutboundImplementation, ReplaySafeLogger, ReplaySafeMetric, Scheduler
Instance Attribute Summary collapse
-
#cancellation ⇒ Object
readonly
Returns the value of attribute cancellation.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#context_frozen ⇒ Object
readonly
Returns the value of attribute context_frozen.
-
#continue_as_new_suggested ⇒ Object
readonly
Returns the value of attribute continue_as_new_suggested.
-
#current_details ⇒ Object
Returns the value of attribute current_details.
-
#current_history_length ⇒ Object
readonly
Returns the value of attribute current_history_length.
-
#current_history_size ⇒ Object
readonly
Returns the value of attribute current_history_size.
-
#disable_eager_activity_execution ⇒ Object
readonly
Returns the value of attribute disable_eager_activity_execution.
-
#failure_converter ⇒ Object
readonly
Returns the value of attribute failure_converter.
-
#in_progress_handlers ⇒ Object
readonly
Returns the value of attribute in_progress_handlers.
-
#info ⇒ Object
readonly
Returns the value of attribute info.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#payload_converter ⇒ Object
readonly
Returns the value of attribute payload_converter.
-
#pending_activities ⇒ Object
readonly
Returns the value of attribute pending_activities.
-
#pending_child_workflow_starts ⇒ Object
readonly
Returns the value of attribute pending_child_workflow_starts.
-
#pending_child_workflows ⇒ Object
readonly
Returns the value of attribute pending_child_workflows.
-
#pending_external_cancels ⇒ Object
readonly
Returns the value of attribute pending_external_cancels.
-
#pending_external_signals ⇒ Object
readonly
Returns the value of attribute pending_external_signals.
-
#pending_timers ⇒ Object
readonly
Returns the value of attribute pending_timers.
-
#query_handlers ⇒ Object
readonly
Returns the value of attribute query_handlers.
-
#random ⇒ Object
readonly
Returns the value of attribute random.
-
#replaying ⇒ Object
readonly
Returns the value of attribute replaying.
-
#scheduler ⇒ Object
readonly
Returns the value of attribute scheduler.
-
#signal_handlers ⇒ Object
readonly
Returns the value of attribute signal_handlers.
-
#update_handlers ⇒ Object
readonly
Returns the value of attribute update_handlers.
Class Method Summary collapse
Instance Method Summary collapse
- #activate(activation) ⇒ Object
- #add_command(command) ⇒ Object
- #illegal_call_tracing_disabled ⇒ Object
-
#initialize(details) ⇒ WorkflowInstance
constructor
A new instance of WorkflowInstance.
- #instance ⇒ Object
- #memo ⇒ Object
- #metric_meter ⇒ Object
- #now ⇒ Object
- #patch(patch_id:, deprecated:) ⇒ Object
- #search_attributes ⇒ Object
Constructor Details
#initialize(details) ⇒ WorkflowInstance
Returns a new instance of WorkflowInstance.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 62 def initialize(details) # Initialize general state @context = Context.new(self) if details.illegal_calls && !details.illegal_calls.empty? @tracer = IllegalCallTracer.new(details.illegal_calls) end @logger = ReplaySafeLogger.new(logger: details.logger, instance: self) @logger.scoped_values_getter = proc { scoped_logger_info } @runtime_metric_meter = details.metric_meter @scheduler = Scheduler.new(self) @payload_converter = details.payload_converter @failure_converter = details.failure_converter @disable_eager_activity_execution = details.disable_eager_activity_execution @pending_activities = {} # Keyed by sequence, value is fiber to resume with proto result @pending_timers = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflow_starts = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflows = {} # Keyed by sequence, value is ChildWorkflowHandle to resolve with proto result @pending_external_signals = {} # Keyed by sequence, value is fiber to resume with proto result @pending_external_cancels = {} # Keyed by sequence, value is fiber to resume with proto result @buffered_signals = {} # Keyed by signal name, value is array of signal jobs # TODO(cretz): Should these be sets instead? Both should be fairly low counts. @in_progress_handlers = [] # Value is HandlerExecution @patches_notified = [] @definition = details.definition @interceptors = details.interceptors @cancellation, @cancellation_proc = Cancellation.new @continue_as_new_suggested = false @current_history_length = 0 @current_history_size = 0 @replaying = false @failure_exception_types = details.workflow_failure_exception_types + @definition.failure_exception_types @signal_handlers = HandlerHash.new( details.definition.signals, Workflow::Definition::Signal ) do |defn| # New definition, drain buffer. If it's dynamic (i.e. no name) drain them all. to_drain = if defn.name.nil? all_signals = @buffered_signals.values.flatten @buffered_signals.clear all_signals else @buffered_signals.delete(defn.name) end to_drain&.each { |job| apply_signal(job) } end @query_handlers = HandlerHash.new(details.definition.queries, Workflow::Definition::Query) @update_handlers = HandlerHash.new(details.definition.updates, Workflow::Definition::Update) # Create all things needed from initial job @init_job = details.initial_activation.jobs.find { |j| !j.initialize_workflow.nil? }&.initialize_workflow raise 'Missing init job from first activation' unless @init_job illegal_call_tracing_disabled do @info = Workflow::Info.new( attempt: @init_job.attempt, continued_run_id: ProtoUtils.string_or(@init_job.continued_from_execution_run_id), cron_schedule: ProtoUtils.string_or(@init_job.cron_schedule), execution_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_execution_timeout), headers: ProtoUtils.headers_from_proto_map(@init_job.headers, @payload_converter) || {}, last_failure: if @init_job.continued_failure @failure_converter.from_failure(@init_job.continued_failure, @payload_converter) end, last_result: if @init_job.last_completion_result @payload_converter.from_payloads(@init_job.last_completion_result).first end, namespace: details.namespace, parent: if @init_job.parent_workflow_info Workflow::Info::ParentInfo.new( namespace: @init_job.parent_workflow_info.namespace, run_id: @init_job.parent_workflow_info.run_id, workflow_id: @init_job.parent_workflow_info.workflow_id ) end, retry_policy: (RetryPolicy._from_proto(@init_job.retry_policy) if @init_job.retry_policy), run_id: details.initial_activation.run_id, run_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_run_timeout), start_time: ProtoUtils.(details.initial_activation.) || raise, task_queue: details.task_queue, task_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_task_timeout) || raise, workflow_id: @init_job.workflow_id, workflow_type: @init_job.workflow_type ).freeze @random = Random.new(@init_job.randomness_seed) end end |
Instance Attribute Details
#cancellation ⇒ Object (readonly)
Returns the value of attribute cancellation.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def cancellation @cancellation end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def context @context end |
#context_frozen ⇒ Object (readonly)
Returns the value of attribute context_frozen.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def context_frozen @context_frozen end |
#continue_as_new_suggested ⇒ Object (readonly)
Returns the value of attribute continue_as_new_suggested.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def continue_as_new_suggested @continue_as_new_suggested end |
#current_details ⇒ Object
Returns the value of attribute current_details.
60 61 62 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 60 def current_details @current_details end |
#current_history_length ⇒ Object (readonly)
Returns the value of attribute current_history_length.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def current_history_length @current_history_length end |
#current_history_size ⇒ Object (readonly)
Returns the value of attribute current_history_size.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def current_history_size @current_history_size end |
#disable_eager_activity_execution ⇒ Object (readonly)
Returns the value of attribute disable_eager_activity_execution.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def disable_eager_activity_execution @disable_eager_activity_execution end |
#failure_converter ⇒ Object (readonly)
Returns the value of attribute failure_converter.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def failure_converter @failure_converter end |
#in_progress_handlers ⇒ Object (readonly)
Returns the value of attribute in_progress_handlers.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def in_progress_handlers @in_progress_handlers end |
#info ⇒ Object (readonly)
Returns the value of attribute info.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def info @info end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def logger @logger end |
#payload_converter ⇒ Object (readonly)
Returns the value of attribute payload_converter.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def payload_converter @payload_converter end |
#pending_activities ⇒ Object (readonly)
Returns the value of attribute pending_activities.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def pending_activities @pending_activities end |
#pending_child_workflow_starts ⇒ Object (readonly)
Returns the value of attribute pending_child_workflow_starts.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def pending_child_workflow_starts @pending_child_workflow_starts end |
#pending_child_workflows ⇒ Object (readonly)
Returns the value of attribute pending_child_workflows.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def pending_child_workflows @pending_child_workflows end |
#pending_external_cancels ⇒ Object (readonly)
Returns the value of attribute pending_external_cancels.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def pending_external_cancels @pending_external_cancels end |
#pending_external_signals ⇒ Object (readonly)
Returns the value of attribute pending_external_signals.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def pending_external_signals @pending_external_signals end |
#pending_timers ⇒ Object (readonly)
Returns the value of attribute pending_timers.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def pending_timers @pending_timers end |
#query_handlers ⇒ Object (readonly)
Returns the value of attribute query_handlers.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def query_handlers @query_handlers end |
#random ⇒ Object (readonly)
Returns the value of attribute random.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def random @random end |
#replaying ⇒ Object (readonly)
Returns the value of attribute replaying.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def @replaying end |
#scheduler ⇒ Object (readonly)
Returns the value of attribute scheduler.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def scheduler @scheduler end |
#signal_handlers ⇒ Object (readonly)
Returns the value of attribute signal_handlers.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def signal_handlers @signal_handlers end |
#update_handlers ⇒ Object (readonly)
Returns the value of attribute update_handlers.
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 54 def update_handlers @update_handlers end |
Class Method Details
.new_completion_with_failure(run_id:, error:, failure_converter:, payload_converter:) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 37 def self.new_completion_with_failure(run_id:, error:, failure_converter:, payload_converter:) Bridge::Api::WorkflowCompletion::WorkflowActivationCompletion.new( run_id: run_id, failed: Bridge::Api::WorkflowCompletion::Failure.new( failure: begin failure_converter.to_failure(error, payload_converter) rescue Exception => e # rubocop:disable Lint/RescueException Api::Failure::V1::Failure.new( message: "Failed converting error to failure: #{e.}, " \ "original error message: #{error.}", application_failure_info: Api::Failure::V1::ApplicationFailureInfo.new ) end ) ) end |
Instance Method Details
#activate(activation) ⇒ Object
149 150 151 152 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 149 def activate(activation) # Run inside of scheduler run_in_scheduler { activate_internal(activation) } end |
#add_command(command) ⇒ Object
154 155 156 157 158 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 154 def add_command(command) raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen @commands << command end |
#illegal_call_tracing_disabled ⇒ Object
181 182 183 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 181 def illegal_call_tracing_disabled(&) @tracer.disable(&) end |
#instance ⇒ Object
160 161 162 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 160 def instance @instance or raise 'Instance accessed before created' end |
#memo ⇒ Object
171 172 173 174 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 171 def memo # Lazy on first access @memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {}) end |
#metric_meter ⇒ Object
204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 204 def metric_meter @metric_meter ||= ReplaySafeMetric::Meter.new( @runtime_metric_meter.with_additional_attributes( { namespace: info.namespace, task_queue: info.task_queue, workflow_type: info.workflow_type } ) ) end |
#now ⇒ Object
176 177 178 179 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 176 def now # Create each time ProtoUtils.(@now_timestamp) or raise 'Time unexpectedly not present' end |
#patch(patch_id:, deprecated:) ⇒ Object
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 185 def patch(patch_id:, deprecated:) # Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the # command. patch_id = patch_id.to_s @patches_memoized ||= {} @patches_memoized.fetch(patch_id) do patched = ! || @patches_notified.include?(patch_id) @patches_memoized[patch_id] = patched if patched add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:) ) ) end patched end end |
#search_attributes ⇒ Object
164 165 166 167 168 169 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 164 def search_attributes # Lazy on first access @search_attributes ||= SearchAttributes._from_proto( @init_job.search_attributes, disable_mutations: true, never_nil: true ) || raise end |