Class: Temporalio::Worker::ThreadPool
- Inherits:
-
Object
- Object
- Temporalio::Worker::ThreadPool
- Defined in:
- lib/temporalio/worker/thread_pool.rb
Overview
Implementation of a thread pool. This implementation is a stripped down form of Concurrent Ruby’s ‘CachedThreadPool`.
Class Method Summary collapse
-
.default ⇒ ThreadPool
Default/shared thread pool instance with unlimited max threads.
Instance Method Summary collapse
-
#active_count ⇒ Integer
The number of threads that are actively executing tasks.
-
#completed_task_count ⇒ Integer
The number of tasks that have been completed by the pool since construction.
-
#execute { ... } ⇒ Object
Execute the given block in a thread.
-
#initialize(max_threads: nil, idle_timeout: 20) ⇒ ThreadPool
constructor
Create a new thread pool that creates threads as needed.
-
#kill ⇒ Object
Kill each thread.
-
#largest_length ⇒ Integer
The largest number of threads that have been created in the pool since construction.
-
#length ⇒ Integer
The number of threads currently in the pool.
-
#queue_length ⇒ Integer
The number of tasks in the queue awaiting execution.
-
#scheduled_task_count ⇒ Integer
The number of tasks that have been scheduled for execution on the pool since construction.
-
#shutdown ⇒ Object
Gracefully shutdown each thread when it is done with its current task.
Constructor Details
#initialize(max_threads: nil, idle_timeout: 20) ⇒ ThreadPool
Create a new thread pool that creates threads as needed.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/temporalio/worker/thread_pool.rb', line 28 def initialize(max_threads: nil, idle_timeout: 20) @max_threads = max_threads @idle_timeout = idle_timeout @mutex = Mutex.new @pool = [] @ready = [] @queue = [] @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @workers_counter = 0 @prune_interval = @idle_timeout / 2 @next_prune_time = ThreadPool._monotonic_time + @prune_interval end |
Class Method Details
.default ⇒ ThreadPool
Returns Default/shared thread pool instance with unlimited max threads.
14 15 16 |
# File 'lib/temporalio/worker/thread_pool.rb', line 14 def self.default @default ||= new end |
Instance Method Details
#active_count ⇒ Integer
Returns The number of threads that are actively executing tasks.
71 72 73 |
# File 'lib/temporalio/worker/thread_pool.rb', line 71 def active_count @mutex.synchronize { @pool.length - @ready.length } end |
#completed_task_count ⇒ Integer
Returns The number of tasks that have been completed by the pool since construction.
66 67 68 |
# File 'lib/temporalio/worker/thread_pool.rb', line 66 def completed_task_count @mutex.synchronize { @completed_task_count } end |
#execute { ... } ⇒ Object
Execute the given block in a thread. The block should be built to never raise and need no arguments.
47 48 49 50 51 52 53 |
# File 'lib/temporalio/worker/thread_pool.rb', line 47 def execute(&block) @mutex.synchronize do locked_assign_worker(&block) || locked_enqueue(&block) @scheduled_task_count += 1 locked_prune_pool if @next_prune_time < ThreadPool._monotonic_time end end |
#kill ⇒ Object
Kill each thread. This should not be called until all workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the global default).
97 98 99 100 101 102 103 104 |
# File 'lib/temporalio/worker/thread_pool.rb', line 97 def kill @mutex.synchronize do # Kill all workers @pool.each(&:kill) @pool.clear @ready.clear end end |
#largest_length ⇒ Integer
Returns The largest number of threads that have been created in the pool since construction.
56 57 58 |
# File 'lib/temporalio/worker/thread_pool.rb', line 56 def largest_length @mutex.synchronize { @largest_length } end |
#length ⇒ Integer
Returns The number of threads currently in the pool.
76 77 78 |
# File 'lib/temporalio/worker/thread_pool.rb', line 76 def length @mutex.synchronize { @pool.length } end |
#queue_length ⇒ Integer
Returns The number of tasks in the queue awaiting execution.
81 82 83 |
# File 'lib/temporalio/worker/thread_pool.rb', line 81 def queue_length @mutex.synchronize { @queue.length } end |
#scheduled_task_count ⇒ Integer
Returns The number of tasks that have been scheduled for execution on the pool since construction.
61 62 63 |
# File 'lib/temporalio/worker/thread_pool.rb', line 61 def scheduled_task_count @mutex.synchronize { @scheduled_task_count } end |
#shutdown ⇒ Object
Gracefully shutdown each thread when it is done with its current task. This should not be called until all workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the global default).
88 89 90 91 92 93 |
# File 'lib/temporalio/worker/thread_pool.rb', line 88 def shutdown @mutex.synchronize do # Stop all workers @pool.each(&:stop) end end |