Class: Temporalio::Worker::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker/thread_pool.rb

Overview

Implementation of a thread pool. This implementation is a stripped down form of Concurrent Ruby’s ‘CachedThreadPool`.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_threads: nil, idle_timeout: 20) ⇒ ThreadPool

Create a new thread pool that creates threads as needed.

Parameters:

  • max_threads (Integer, nil) (defaults to: nil)

    Maximum number of thread workers to create, or nil for unlimited max.

  • idle_timeout (Float) (defaults to: 20)

    Number of seconds before a thread worker with no work should be stopped. Note, the check of whether a thread worker is idle is only done on each new #execute call.



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/temporalio/worker/thread_pool.rb', line 28

def initialize(max_threads: nil, idle_timeout: 20)
  @max_threads = max_threads
  @idle_timeout = idle_timeout

  @mutex = Mutex.new
  @pool = []
  @ready = []
  @queue = []
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length       = 0
  @workers_counter = 0
  @prune_interval = @idle_timeout / 2
  @next_prune_time = ThreadPool._monotonic_time + @prune_interval
end

Class Method Details

.defaultThreadPool

Returns Default/shared thread pool instance with unlimited max threads.

Returns:

  • (ThreadPool)

    Default/shared thread pool instance with unlimited max threads.



14
15
16
# File 'lib/temporalio/worker/thread_pool.rb', line 14

def self.default
  @default ||= new
end

Instance Method Details

#active_countInteger

Returns The number of threads that are actively executing tasks.

Returns:

  • (Integer)

    The number of threads that are actively executing tasks.



71
72
73
# File 'lib/temporalio/worker/thread_pool.rb', line 71

def active_count
  @mutex.synchronize { @pool.length - @ready.length }
end

#completed_task_countInteger

Returns The number of tasks that have been completed by the pool since construction.

Returns:

  • (Integer)

    The number of tasks that have been completed by the pool since construction.



66
67
68
# File 'lib/temporalio/worker/thread_pool.rb', line 66

def completed_task_count
  @mutex.synchronize { @completed_task_count }
end

#execute { ... } ⇒ Object

Execute the given block in a thread. The block should be built to never raise and need no arguments.

Yields:

  • Block to execute.



47
48
49
50
51
52
53
# File 'lib/temporalio/worker/thread_pool.rb', line 47

def execute(&block)
  @mutex.synchronize do
    locked_assign_worker(&block) || locked_enqueue(&block)
    @scheduled_task_count += 1
    locked_prune_pool if @next_prune_time < ThreadPool._monotonic_time
  end
end

#killObject

Kill each thread. This should not be called until all workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the global default).



97
98
99
100
101
102
103
104
# File 'lib/temporalio/worker/thread_pool.rb', line 97

def kill
  @mutex.synchronize do
    # Kill all workers
    @pool.each(&:kill)
    @pool.clear
    @ready.clear
  end
end

#largest_lengthInteger

Returns The largest number of threads that have been created in the pool since construction.

Returns:

  • (Integer)

    The largest number of threads that have been created in the pool since construction.



56
57
58
# File 'lib/temporalio/worker/thread_pool.rb', line 56

def largest_length
  @mutex.synchronize { @largest_length }
end

#lengthInteger

Returns The number of threads currently in the pool.

Returns:

  • (Integer)

    The number of threads currently in the pool.



76
77
78
# File 'lib/temporalio/worker/thread_pool.rb', line 76

def length
  @mutex.synchronize { @pool.length }
end

#queue_lengthInteger

Returns The number of tasks in the queue awaiting execution.

Returns:

  • (Integer)

    The number of tasks in the queue awaiting execution.



81
82
83
# File 'lib/temporalio/worker/thread_pool.rb', line 81

def queue_length
  @mutex.synchronize { @queue.length }
end

#scheduled_task_countInteger

Returns The number of tasks that have been scheduled for execution on the pool since construction.

Returns:

  • (Integer)

    The number of tasks that have been scheduled for execution on the pool since construction.



61
62
63
# File 'lib/temporalio/worker/thread_pool.rb', line 61

def scheduled_task_count
  @mutex.synchronize { @scheduled_task_count }
end

#shutdownObject

Gracefully shutdown each thread when it is done with its current task. This should not be called until all workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the global default).



88
89
90
91
92
93
# File 'lib/temporalio/worker/thread_pool.rb', line 88

def shutdown
  @mutex.synchronize do
    # Stop all workers
    @pool.each(&:stop)
  end
end