Class: Temporalio::Cancellation

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/cancellation.rb

Overview

Cancellation representation, often known as a “cancellation token”. This is used by clients, activities, and workflows to represent cancellation in a thread/fiber-safe way.

Instance Method Summary collapse

Constructor Details

#initialize(*parents) ⇒ Cancellation

Create a new cancellation.

This is usually created and destructured into a tuple with the second value being the proc to invoke to cancel. For example: ‘cancel, cancel_proc = Temporalio::Cancellation.new`. This is done via #to_ary which returns a proc to issue the cancellation in the second value of the array.

Parameters:

  • parents (Array<Cancellation>)

    Parent cancellations to link this one to. This cancellation will be canceled when any parents are canceled.



18
19
20
21
22
23
24
25
26
27
# File 'lib/temporalio/cancellation.rb', line 18

def initialize(*parents)
  @canceled = false
  @canceled_reason = nil
  @canceled_mutex = Mutex.new
  @canceled_cond_var = nil
  @cancel_callbacks = {} # Keyed by sentinel value, but value iteration still is deterministic
  @shield_depth = 0
  @shield_pending_cancel = nil # When pending, set as single-reason array
  parents.each { |p| p.add_cancel_callback { on_cancel(reason: p.canceled_reason) } }
end

Instance Method Details

#add_cancel_callback { ... } ⇒ Object

Note:

WARNING: This is advanced API, users should use #wait or similar.

Advanced call to invoke a proc or block on cancel. The callback usually needs to be quick and thread-safe since it is called in the canceler’s thread. Usually the callback will just be something like pushing on a queue or signaling a condition variable. If the cancellation is already canceled, the callback is called inline before returning.

Yields:

  • Accepts block if not using ‘proc`.

Returns:

Raises:

  • (ArgumentError)


120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/temporalio/cancellation.rb', line 120

def add_cancel_callback(&block)
  raise ArgumentError, 'Must provide block' unless block_given?

  callback_to_run_immediately, key = @canceled_mutex.synchronize do
    break [block, nil] if @canceled

    key = Object.new
    @cancel_callbacks[key] = block
    [nil, key]
  end
  callback_to_run_immediately&.call
  key
end

#canceled?Boolean

Returns Whether this cancellation is canceled.

Returns:

  • (Boolean)

    Whether this cancellation is canceled.



30
31
32
# File 'lib/temporalio/cancellation.rb', line 30

def canceled?
  @canceled_mutex.synchronize { @canceled }
end

#canceled_reasonString?

Returns Reason for cancellation. Can be nil if not canceled or no reason provided.

Returns:

  • (String, nil)

    Reason for cancellation. Can be nil if not canceled or no reason provided.



35
36
37
# File 'lib/temporalio/cancellation.rb', line 35

def canceled_reason
  @canceled_mutex.synchronize { @canceled_reason }
end

#check!(err = Error::CanceledError.new('Canceled')) ⇒ Object

Raise an error if this cancellation is canceled.

Parameters:

  • err (Exception) (defaults to: Error::CanceledError.new('Canceled'))

    Error to raise.



52
53
54
# File 'lib/temporalio/cancellation.rb', line 52

def check!(err = Error::CanceledError.new('Canceled'))
  raise err if canceled?
end

#pending_canceled?Boolean

Returns Whether a cancel is pending but currently shielded.

Returns:

  • (Boolean)

    Whether a cancel is pending but currently shielded.



40
41
42
# File 'lib/temporalio/cancellation.rb', line 40

def pending_canceled?
  @canceled_mutex.synchronize { !@shield_pending_cancel.nil? }
end

#pending_canceled_reasonString?

Returns Reason for pending cancellation. Can be nil if not pending canceled or no reason provided.

Returns:

  • (String, nil)

    Reason for pending cancellation. Can be nil if not pending canceled or no reason provided.



45
46
47
# File 'lib/temporalio/cancellation.rb', line 45

def pending_canceled_reason
  @canceled_mutex.synchronize { @shield_pending_cancel&.first }
end

#remove_cancel_callback(key) ⇒ Object

Remove a cancel callback using the key returned from #add_cancel_callback.

Parameters:



137
138
139
140
141
142
# File 'lib/temporalio/cancellation.rb', line 137

def remove_cancel_callback(key)
  @canceled_mutex.synchronize do
    @cancel_callbacks.delete(key)
  end
  nil
end

#shield { ... } ⇒ Object

Shield the given block from cancellation. This means any cancellation that occurs while shielded code is running will be set as “pending” and will not take effect until after the block completes. If shield calls are nested, the cancellation remains “pending” until the last shielded block ends.

Yields:

  • Requires a block to run under shield.

Returns:

  • (Object)

    Result of the block.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/temporalio/cancellation.rb', line 94

def shield
  raise ArgumentError, 'Block required' unless block_given?

  @canceled_mutex.synchronize { @shield_depth += 1 }
  yield
ensure
  callbacks_to_run = @canceled_mutex.synchronize do
    @shield_depth -= 1
    if @shield_depth.zero? && @shield_pending_cancel
      reason = @shield_pending_cancel.first
      @shield_pending_cancel = nil
      prepare_cancel(reason:)
    end
  end
  callbacks_to_run&.each(&:call)
end

#to_aryArray(Cancellation, Proc)

Returns Self and a proc to call to cancel that accepts an optional string ‘reason` keyword argument. As a general practice, only the creator of the cancellation should be the one controlling its cancellation.

Returns:

  • (Array(Cancellation, Proc))

    Self and a proc to call to cancel that accepts an optional string ‘reason` keyword argument. As a general practice, only the creator of the cancellation should be the one controlling its cancellation.



59
60
61
# File 'lib/temporalio/cancellation.rb', line 59

def to_ary
  [self, proc { |reason: nil| on_cancel(reason:) }]
end

#waitObject

Wait on this to be canceled. This is backed by a ConditionVariable outside of workflows or Workflow.wait_condition inside of workflows.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/temporalio/cancellation.rb', line 65

def wait
  # If this is in a workflow, just wait for the canceled. This is because ConditionVariable does a no-duration
  # kernel_sleep to the fiber scheduler which ends up recursing back into this because the workflow implementation
  # of kernel_sleep by default relies on cancellation.
  if Workflow.in_workflow?
    Workflow.wait_condition(cancellation: nil) { @canceled }
    return
  end

  @canceled_mutex.synchronize do
    break if @canceled

    # Add cond var if not present
    if @canceled_cond_var.nil?
      @canceled_cond_var = ConditionVariable.new
      @cancel_callbacks[Object.new] = proc { @canceled_mutex.synchronize { @canceled_cond_var.broadcast } }
    end

    # Wait on it
    @canceled_cond_var.wait(@canceled_mutex)
  end
end