Class: Temporalio::Worker::ActivityExecutor::ThreadPool

Inherits:
Temporalio::Worker::ActivityExecutor show all
Defined in:
lib/temporalio/worker/activity_executor/thread_pool.rb

Overview

Activity executor for scheduling activities in their own thread using ThreadPool.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Temporalio::Worker::ActivityExecutor

defaults, #initialize_activity

Constructor Details

#initialize(thread_pool = Worker::ThreadPool.default) ⇒ ThreadPool

Create a new thread pool executor.

Parameters:

  • thread_pool (Worker::ThreadPool) (defaults to: Worker::ThreadPool.default)

    Thread pool to use.



18
19
20
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 18

def initialize(thread_pool = Worker::ThreadPool.default) # rubocop:disable Lint/MissingSuper
  @thread_pool = thread_pool
end

Class Method Details

.defaultThreadPool

Returns Default/shared thread pool executor using default thread pool.

Returns:

  • (ThreadPool)

    Default/shared thread pool executor using default thread pool.



11
12
13
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 11

def self.default
  @default ||= new
end

Instance Method Details

#activity_contextObject



28
29
30
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 28

def activity_context
  Thread.current[:temporal_activity_context]
end

#execute_activity(_defn) ⇒ Object



23
24
25
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 23

def execute_activity(_defn, &)
  @thread_pool.execute(&)
end

#set_activity_context(defn, context) ⇒ Object



33
34
35
36
37
38
39
40
41
42
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 33

def set_activity_context(defn, context)
  Thread.current[:temporal_activity_context] = context
  # If they have opted in to raising on cancel, wire that up
  return unless defn.cancel_raise

  thread = Thread.current
  context&.cancellation&.add_cancel_callback do
    thread.raise(Error::CanceledError.new('Activity canceled')) if thread[:temporal_activity_context] == context
  end
end