Class: Temporalio::Testing::ActivityEnvironment

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/testing/activity_environment.rb

Overview

Test environment for testing activities.

Users can create this environment and then use #run to execute activities on it. Often, since mutable things like cancellation can be set, users create this for each activity that is run. There is no real performance penalty for creating an environment for every run.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(info: ActivityEnvironment.default_info, on_heartbeat: nil, cancellation: Cancellation.new, worker_shutdown_cancellation: Cancellation.new, payload_converter: Converters::PayloadConverter.default, logger: Logger.new(nil), activity_executors: Worker::ActivityExecutor.defaults, metric_meter: nil, client: nil) ⇒ ActivityEnvironment

Create a test environment for activities.

Parameters:



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/temporalio/testing/activity_environment.rb', line 54

def initialize(
  info: ActivityEnvironment.default_info,
  on_heartbeat: nil,
  cancellation: Cancellation.new,
  worker_shutdown_cancellation: Cancellation.new,
  payload_converter: Converters::PayloadConverter.default,
  logger: Logger.new(nil),
  activity_executors: Worker::ActivityExecutor.defaults,
  metric_meter: nil,
  client: nil
)
  @info = info
  @on_heartbeat = on_heartbeat
  @cancellation = cancellation
  @worker_shutdown_cancellation = worker_shutdown_cancellation
  @payload_converter = payload_converter
  @logger = logger
  @activity_executors = activity_executors
  @metric_meter = metric_meter
  @client = client
end

Class Method Details

.default_infoActivity::Info

Returns The activity info used by default. This is frozen, but ‘with` can be used to make a new instance with changes to pass in to #initialize.

Returns:

  • (Activity::Info)

    The activity info used by default. This is frozen, but ‘with` can be used to make a new instance with changes to pass in to #initialize.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/temporalio/testing/activity_environment.rb', line 18

def self.default_info
  @default_info ||= Activity::Info.new(
    activity_id: 'test',
    activity_type: 'unknown',
    attempt: 1,
    current_attempt_scheduled_time: Time.at(0),
    heartbeat_details: [],
    heartbeat_timeout: nil,
    local?: false,
    schedule_to_close_timeout: 1.0,
    scheduled_time: Time.at(0),
    start_to_close_timeout: 1.0,
    started_time: Time.at(0),
    task_queue: 'test',
    task_token: String.new('test', encoding: Encoding::ASCII_8BIT),
    workflow_id: 'test',
    workflow_namespace: 'default',
    workflow_run_id: 'test-run',
    workflow_type: 'test'
  )
end

Instance Method Details

#run(activity, *args) ⇒ Object

Run an activity and returns its result or raises its exception.

Parameters:

Returns:

  • Activity result.

Raises:

  • (ArgumentError)


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/temporalio/testing/activity_environment.rb', line 81

def run(activity, *args)
  defn = Activity::Definition::Info.from_activity(activity)
  executor = @activity_executors[defn.executor]
  raise ArgumentError, "Unknown executor: #{defn.executor}" if executor.nil?

  queue = Queue.new
  executor.execute_activity(defn) do
    Activity::Context._current_executor = executor
    executor.set_activity_context(defn, Context.new(
                                          info: @info.dup,
                                          instance:
                                            defn.instance.is_a?(Proc) ? defn.instance.call : defn.instance,
                                          on_heartbeat: @on_heartbeat,
                                          cancellation: @cancellation,
                                          worker_shutdown_cancellation: @worker_shutdown_cancellation,
                                          payload_converter: @payload_converter,
                                          logger: @logger,
                                          metric_meter: @metric_meter,
                                          client: @client
                                        ))
    queue.push([defn.proc.call(*args), nil])
  rescue Exception => e # rubocop:disable Lint/RescueException -- Intentionally capturing all exceptions
    queue.push([nil, e])
  ensure
    executor.set_activity_context(defn, nil)
    Activity::Context._current_executor = nil
  end

  result, err = queue.pop
  raise err unless err.nil?

  result
end