Class: Temporalio::Cancellation
- Inherits:
-
Object
- Object
- Temporalio::Cancellation
- Defined in:
- lib/temporalio/cancellation.rb
Overview
Cancellation representation, often known as a “cancellation token”. This is used by clients, activities, and workflows to represent cancellation in a thread/fiber-safe way.
Instance Method Summary collapse
-
#add_cancel_callback { ... } ⇒ Object
Advanced call to invoke a proc or block on cancel.
-
#canceled? ⇒ Boolean
Whether this cancellation is canceled.
-
#canceled_reason ⇒ String?
Reason for cancellation.
-
#check!(err = Error::CanceledError.new('Canceled')) ⇒ Object
Raise an error if this cancellation is canceled.
-
#initialize(*parents) ⇒ Cancellation
constructor
Create a new cancellation.
-
#pending_canceled? ⇒ Boolean
Whether a cancel is pending but currently shielded.
-
#pending_canceled_reason ⇒ String?
Reason for pending cancellation.
-
#remove_cancel_callback(key) ⇒ Object
Remove a cancel callback using the key returned from #add_cancel_callback.
-
#shield { ... } ⇒ Object
Shield the given block from cancellation.
-
#to_ary ⇒ Array(Cancellation, Proc)
Self and a proc to call to cancel that accepts an optional string ‘reason` keyword argument.
-
#wait ⇒ Object
Wait on this to be canceled.
Constructor Details
#initialize(*parents) ⇒ Cancellation
Create a new cancellation.
This is usually created and destructured into a tuple with the second value being the proc to invoke to cancel. For example: ‘cancel, cancel_proc = Temporalio::Cancellation.new`. This is done via #to_ary which returns a proc to issue the cancellation in the second value of the array.
18 19 20 21 22 23 24 25 26 27 |
# File 'lib/temporalio/cancellation.rb', line 18 def initialize(*parents) @canceled = false @canceled_reason = nil @canceled_mutex = Mutex.new @canceled_cond_var = nil @cancel_callbacks = {} # Keyed by sentinel value, but value iteration still is deterministic @shield_depth = 0 @shield_pending_cancel = nil # When pending, set as single-reason array parents.each { |p| p.add_cancel_callback { on_cancel(reason: p.canceled_reason) } } end |
Instance Method Details
#add_cancel_callback { ... } ⇒ Object
WARNING: This is advanced API, users should use #wait or similar.
Advanced call to invoke a proc or block on cancel. The callback usually needs to be quick and thread-safe since it is called in the canceler’s thread. Usually the callback will just be something like pushing on a queue or signaling a condition variable. If the cancellation is already canceled, the callback is called inline before returning.
120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/temporalio/cancellation.rb', line 120 def add_cancel_callback(&block) raise ArgumentError, 'Must provide block' unless block_given? callback_to_run_immediately, key = @canceled_mutex.synchronize do break [block, nil] if @canceled key = Object.new @cancel_callbacks[key] = block [nil, key] end callback_to_run_immediately&.call key end |
#canceled? ⇒ Boolean
Returns Whether this cancellation is canceled.
30 31 32 |
# File 'lib/temporalio/cancellation.rb', line 30 def canceled? @canceled_mutex.synchronize { @canceled } end |
#canceled_reason ⇒ String?
Returns Reason for cancellation. Can be nil if not canceled or no reason provided.
35 36 37 |
# File 'lib/temporalio/cancellation.rb', line 35 def canceled_reason @canceled_mutex.synchronize { @canceled_reason } end |
#check!(err = Error::CanceledError.new('Canceled')) ⇒ Object
Raise an error if this cancellation is canceled.
52 53 54 |
# File 'lib/temporalio/cancellation.rb', line 52 def check!(err = Error::CanceledError.new('Canceled')) raise err if canceled? end |
#pending_canceled? ⇒ Boolean
Returns Whether a cancel is pending but currently shielded.
40 41 42 |
# File 'lib/temporalio/cancellation.rb', line 40 def pending_canceled? @canceled_mutex.synchronize { !@shield_pending_cancel.nil? } end |
#pending_canceled_reason ⇒ String?
Returns Reason for pending cancellation. Can be nil if not pending canceled or no reason provided.
45 46 47 |
# File 'lib/temporalio/cancellation.rb', line 45 def pending_canceled_reason @canceled_mutex.synchronize { @shield_pending_cancel&.first } end |
#remove_cancel_callback(key) ⇒ Object
Remove a cancel callback using the key returned from #add_cancel_callback.
137 138 139 140 141 142 |
# File 'lib/temporalio/cancellation.rb', line 137 def remove_cancel_callback(key) @canceled_mutex.synchronize do @cancel_callbacks.delete(key) end nil end |
#shield { ... } ⇒ Object
Shield the given block from cancellation. This means any cancellation that occurs while shielded code is running will be set as “pending” and will not take effect until after the block completes. If shield calls are nested, the cancellation remains “pending” until the last shielded block ends.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/temporalio/cancellation.rb', line 94 def shield raise ArgumentError, 'Block required' unless block_given? @canceled_mutex.synchronize { @shield_depth += 1 } yield ensure callbacks_to_run = @canceled_mutex.synchronize do @shield_depth -= 1 if @shield_depth.zero? && @shield_pending_cancel reason = @shield_pending_cancel.first @shield_pending_cancel = nil prepare_cancel(reason:) end end callbacks_to_run&.each(&:call) end |
#to_ary ⇒ Array(Cancellation, Proc)
Returns Self and a proc to call to cancel that accepts an optional string ‘reason` keyword argument. As a general practice, only the creator of the cancellation should be the one controlling its cancellation.
59 60 61 |
# File 'lib/temporalio/cancellation.rb', line 59 def to_ary [self, proc { |reason: nil| on_cancel(reason:) }] end |
#wait ⇒ Object
Wait on this to be canceled. This is backed by a ConditionVariable outside of workflows or Workflow.wait_condition inside of workflows.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/temporalio/cancellation.rb', line 65 def wait # If this is in a workflow, just wait for the canceled. This is because ConditionVariable does a no-duration # kernel_sleep to the fiber scheduler which ends up recursing back into this because the workflow implementation # of kernel_sleep by default relies on cancellation. if Workflow.in_workflow? Workflow.wait_condition(cancellation: nil) { @canceled } return end @canceled_mutex.synchronize do break if @canceled # Add cond var if not present if @canceled_cond_var.nil? @canceled_cond_var = ConditionVariable.new @cancel_callbacks[Object.new] = proc { @canceled_mutex.synchronize { @canceled_cond_var.broadcast } } end # Wait on it @canceled_cond_var.wait(@canceled_mutex) end end |