Class: Temporalio::Internal::Worker::WorkflowInstance
- Inherits:
-
Object
- Object
- Temporalio::Internal::Worker::WorkflowInstance
- Defined in:
- lib/temporalio/internal/worker/workflow_instance.rb,
lib/temporalio/internal/worker/workflow_instance/context.rb,
lib/temporalio/internal/worker/workflow_instance/details.rb,
lib/temporalio/internal/worker/workflow_instance/scheduler.rb,
lib/temporalio/internal/worker/workflow_instance/handler_hash.rb,
lib/temporalio/internal/worker/workflow_instance/handler_execution.rb,
lib/temporalio/internal/worker/workflow_instance/replay_safe_logger.rb,
lib/temporalio/internal/worker/workflow_instance/replay_safe_metric.rb,
lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb,
lib/temporalio/internal/worker/workflow_instance/child_workflow_handle.rb,
lib/temporalio/internal/worker/workflow_instance/inbound_implementation.rb,
lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb,
lib/temporalio/internal/worker/workflow_instance/external_workflow_handle.rb,
lib/temporalio/internal/worker/workflow_instance/externally_immutable_hash.rb
Overview
Instance of a user workflow. This is the instance with all state needed to run the workflow and is expected to be cached by the worker for sticky execution.
Defined Under Namespace
Classes: ChildWorkflowHandle, Context, Details, ExternalWorkflowHandle, ExternallyImmutableHash, HandlerExecution, HandlerHash, IllegalCallTracer, InboundImplementation, OutboundImplementation, ReplaySafeLogger, ReplaySafeMetric, Scheduler
Instance Attribute Summary collapse
-
#cancellation ⇒ Object
readonly
Returns the value of attribute cancellation.
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#context_frozen ⇒ Object
readonly
Returns the value of attribute context_frozen.
-
#continue_as_new_suggested ⇒ Object
readonly
Returns the value of attribute continue_as_new_suggested.
-
#current_details ⇒ Object
Returns the value of attribute current_details.
-
#current_history_length ⇒ Object
readonly
Returns the value of attribute current_history_length.
-
#current_history_size ⇒ Object
readonly
Returns the value of attribute current_history_size.
-
#disable_eager_activity_execution ⇒ Object
readonly
Returns the value of attribute disable_eager_activity_execution.
-
#failure_converter ⇒ Object
readonly
Returns the value of attribute failure_converter.
-
#in_progress_handlers ⇒ Object
readonly
Returns the value of attribute in_progress_handlers.
-
#info ⇒ Object
readonly
Returns the value of attribute info.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#payload_converter ⇒ Object
readonly
Returns the value of attribute payload_converter.
-
#pending_activities ⇒ Object
readonly
Returns the value of attribute pending_activities.
-
#pending_child_workflow_starts ⇒ Object
readonly
Returns the value of attribute pending_child_workflow_starts.
-
#pending_child_workflows ⇒ Object
readonly
Returns the value of attribute pending_child_workflows.
-
#pending_external_cancels ⇒ Object
readonly
Returns the value of attribute pending_external_cancels.
-
#pending_external_signals ⇒ Object
readonly
Returns the value of attribute pending_external_signals.
-
#pending_timers ⇒ Object
readonly
Returns the value of attribute pending_timers.
-
#query_handlers ⇒ Object
readonly
Returns the value of attribute query_handlers.
-
#random ⇒ Object
readonly
Returns the value of attribute random.
-
#replaying ⇒ Object
readonly
Returns the value of attribute replaying.
-
#scheduler ⇒ Object
readonly
Returns the value of attribute scheduler.
-
#signal_handlers ⇒ Object
readonly
Returns the value of attribute signal_handlers.
-
#update_handlers ⇒ Object
readonly
Returns the value of attribute update_handlers.
Class Method Summary collapse
Instance Method Summary collapse
- #activate(activation) ⇒ Object
- #add_command(command) ⇒ Object
- #illegal_call_tracing_disabled ⇒ Object
-
#initialize(details) ⇒ WorkflowInstance
constructor
A new instance of WorkflowInstance.
- #instance ⇒ Object
- #memo ⇒ Object
- #metric_meter ⇒ Object
- #now ⇒ Object
- #patch(patch_id:, deprecated:) ⇒ Object
- #search_attributes ⇒ Object
Constructor Details
#initialize(details) ⇒ WorkflowInstance
Returns a new instance of WorkflowInstance.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 61 def initialize(details) # Initialize general state @context = Context.new(self) if details.illegal_calls && !details.illegal_calls.empty? @tracer = IllegalCallTracer.new(details.illegal_calls) end @logger = ReplaySafeLogger.new(logger: details.logger, instance: self) @logger.scoped_values_getter = proc { scoped_logger_info } @runtime_metric_meter = details.metric_meter @scheduler = Scheduler.new(self) @payload_converter = details.payload_converter @failure_converter = details.failure_converter @disable_eager_activity_execution = details.disable_eager_activity_execution @pending_activities = {} # Keyed by sequence, value is fiber to resume with proto result @pending_timers = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflow_starts = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflows = {} # Keyed by sequence, value is ChildWorkflowHandle to resolve with proto result @pending_external_signals = {} # Keyed by sequence, value is fiber to resume with proto result @pending_external_cancels = {} # Keyed by sequence, value is fiber to resume with proto result @buffered_signals = {} # Keyed by signal name, value is array of signal jobs # TODO(cretz): Should these be sets instead? Both should be fairly low counts. @in_progress_handlers = [] # Value is HandlerExecution @patches_notified = [] @definition = details.definition @interceptors = details.interceptors @cancellation, @cancellation_proc = Cancellation.new @continue_as_new_suggested = false @current_history_length = 0 @current_history_size = 0 @replaying = false @failure_exception_types = details.workflow_failure_exception_types + @definition.failure_exception_types @signal_handlers = HandlerHash.new( details.definition.signals, Workflow::Definition::Signal ) do |defn| # New definition, drain buffer. If it's dynamic (i.e. no name) drain them all. to_drain = if defn.name.nil? all_signals = @buffered_signals.values.flatten @buffered_signals.clear all_signals else @buffered_signals.delete(defn.name) end to_drain&.each { |job| apply_signal(job) } end @query_handlers = HandlerHash.new(details.definition.queries, Workflow::Definition::Query) @update_handlers = HandlerHash.new(details.definition.updates, Workflow::Definition::Update) # Create all things needed from initial job @init_job = details.initial_activation.jobs.find { |j| !j.initialize_workflow.nil? }&.initialize_workflow raise 'Missing init job from first activation' unless @init_job illegal_call_tracing_disabled do @info = Workflow::Info.new( attempt: @init_job.attempt, continued_run_id: ProtoUtils.string_or(@init_job.continued_from_execution_run_id), cron_schedule: ProtoUtils.string_or(@init_job.cron_schedule), execution_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_execution_timeout), last_failure: if @init_job.continued_failure @failure_converter.from_failure(@init_job.continued_failure, @payload_converter) end, last_result: if @init_job.last_completion_result @payload_converter.from_payloads(@init_job.last_completion_result).first end, namespace: details.namespace, parent: if @init_job.parent_workflow_info Workflow::Info::ParentInfo.new( namespace: @init_job.parent_workflow_info.namespace, run_id: @init_job.parent_workflow_info.run_id, workflow_id: @init_job.parent_workflow_info.workflow_id ) end, retry_policy: (RetryPolicy._from_proto(@init_job.retry_policy) if @init_job.retry_policy), run_id: details.initial_activation.run_id, run_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_run_timeout), start_time: ProtoUtils.(details.initial_activation.) || raise, task_queue: details.task_queue, task_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_task_timeout) || raise, workflow_id: @init_job.workflow_id, workflow_type: @init_job.workflow_type ).freeze @random = Random.new(@init_job.randomness_seed) end end |
Instance Attribute Details
#cancellation ⇒ Object (readonly)
Returns the value of attribute cancellation.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def cancellation @cancellation end |
#context ⇒ Object (readonly)
Returns the value of attribute context.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def context @context end |
#context_frozen ⇒ Object (readonly)
Returns the value of attribute context_frozen.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def context_frozen @context_frozen end |
#continue_as_new_suggested ⇒ Object (readonly)
Returns the value of attribute continue_as_new_suggested.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def continue_as_new_suggested @continue_as_new_suggested end |
#current_details ⇒ Object
Returns the value of attribute current_details.
59 60 61 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 59 def current_details @current_details end |
#current_history_length ⇒ Object (readonly)
Returns the value of attribute current_history_length.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def current_history_length @current_history_length end |
#current_history_size ⇒ Object (readonly)
Returns the value of attribute current_history_size.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def current_history_size @current_history_size end |
#disable_eager_activity_execution ⇒ Object (readonly)
Returns the value of attribute disable_eager_activity_execution.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def disable_eager_activity_execution @disable_eager_activity_execution end |
#failure_converter ⇒ Object (readonly)
Returns the value of attribute failure_converter.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def failure_converter @failure_converter end |
#in_progress_handlers ⇒ Object (readonly)
Returns the value of attribute in_progress_handlers.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def in_progress_handlers @in_progress_handlers end |
#info ⇒ Object (readonly)
Returns the value of attribute info.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def info @info end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def logger @logger end |
#payload_converter ⇒ Object (readonly)
Returns the value of attribute payload_converter.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def payload_converter @payload_converter end |
#pending_activities ⇒ Object (readonly)
Returns the value of attribute pending_activities.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def pending_activities @pending_activities end |
#pending_child_workflow_starts ⇒ Object (readonly)
Returns the value of attribute pending_child_workflow_starts.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def pending_child_workflow_starts @pending_child_workflow_starts end |
#pending_child_workflows ⇒ Object (readonly)
Returns the value of attribute pending_child_workflows.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def pending_child_workflows @pending_child_workflows end |
#pending_external_cancels ⇒ Object (readonly)
Returns the value of attribute pending_external_cancels.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def pending_external_cancels @pending_external_cancels end |
#pending_external_signals ⇒ Object (readonly)
Returns the value of attribute pending_external_signals.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def pending_external_signals @pending_external_signals end |
#pending_timers ⇒ Object (readonly)
Returns the value of attribute pending_timers.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def pending_timers @pending_timers end |
#query_handlers ⇒ Object (readonly)
Returns the value of attribute query_handlers.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def query_handlers @query_handlers end |
#random ⇒ Object (readonly)
Returns the value of attribute random.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def random @random end |
#replaying ⇒ Object (readonly)
Returns the value of attribute replaying.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def @replaying end |
#scheduler ⇒ Object (readonly)
Returns the value of attribute scheduler.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def scheduler @scheduler end |
#signal_handlers ⇒ Object (readonly)
Returns the value of attribute signal_handlers.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def signal_handlers @signal_handlers end |
#update_handlers ⇒ Object (readonly)
Returns the value of attribute update_handlers.
53 54 55 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 53 def update_handlers @update_handlers end |
Class Method Details
.new_completion_with_failure(run_id:, error:, failure_converter:, payload_converter:) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 36 def self.new_completion_with_failure(run_id:, error:, failure_converter:, payload_converter:) Bridge::Api::WorkflowCompletion::WorkflowActivationCompletion.new( run_id: run_id, failed: Bridge::Api::WorkflowCompletion::Failure.new( failure: begin failure_converter.to_failure(error, payload_converter) rescue Exception => e # rubocop:disable Lint/RescueException Api::Failure::V1::Failure.new( message: "Failed converting error to failure: #{e.}, " \ "original error message: #{error.}", application_failure_info: Api::Failure::V1::ApplicationFailureInfo.new ) end ) ) end |
Instance Method Details
#activate(activation) ⇒ Object
147 148 149 150 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 147 def activate(activation) # Run inside of scheduler run_in_scheduler { activate_internal(activation) } end |
#add_command(command) ⇒ Object
152 153 154 155 156 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 152 def add_command(command) raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen @commands << command end |
#illegal_call_tracing_disabled ⇒ Object
179 180 181 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 179 def illegal_call_tracing_disabled(&) @tracer.disable(&) end |
#instance ⇒ Object
158 159 160 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 158 def instance @instance or raise 'Instance accessed before created' end |
#memo ⇒ Object
169 170 171 172 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 169 def memo # Lazy on first access @memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {}) end |
#metric_meter ⇒ Object
202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 202 def metric_meter @metric_meter ||= ReplaySafeMetric::Meter.new( @runtime_metric_meter.with_additional_attributes( { namespace: info.namespace, task_queue: info.task_queue, workflow_type: info.workflow_type } ) ) end |
#now ⇒ Object
174 175 176 177 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 174 def now # Create each time ProtoUtils.(@now_timestamp) or raise 'Time unexpectedly not present' end |
#patch(patch_id:, deprecated:) ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 183 def patch(patch_id:, deprecated:) # Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the # command. patch_id = patch_id.to_s @patches_memoized ||= {} @patches_memoized.fetch(patch_id) do patched = ! || @patches_notified.include?(patch_id) @patches_memoized[patch_id] = patched if patched add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:) ) ) end patched end end |
#search_attributes ⇒ Object
162 163 164 165 166 167 |
# File 'lib/temporalio/internal/worker/workflow_instance.rb', line 162 def search_attributes # Lazy on first access @search_attributes ||= SearchAttributes._from_proto( @init_job.search_attributes, disable_mutations: true, never_nil: true ) || raise end |