Skip to content

Commit 5feab3f

Browse files
authored
Add replaying_history_events? to allow logging in queries and validators (#378)
Related: temporalio/features#718
1 parent 00a5453 commit 5feab3f

File tree

9 files changed

+45
-9
lines changed

9 files changed

+45
-9
lines changed

temporalio/lib/temporalio/internal/worker/workflow_instance.rb

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa
5757
:pending_external_signals, :pending_external_cancels, :in_progress_handlers, :payload_converter,
5858
:failure_converter, :cancellation, :continue_as_new_suggested, :current_deployment_version,
5959
:current_history_length, :current_history_size, :replaying, :random,
60-
:signal_handlers, :query_handlers, :update_handlers, :context_frozen, :assert_valid_local_activity
60+
:signal_handlers, :query_handlers, :update_handlers, :context_frozen, :assert_valid_local_activity,
61+
:in_query_or_validator
6162
attr_accessor :io_enabled, :current_details
6263

6364
def initialize(details)
@@ -91,6 +92,7 @@ def initialize(details)
9192
@current_history_length = 0
9293
@current_history_size = 0
9394
@replaying = false
95+
@in_query_or_validator = false
9496
@workflow_failure_exception_types = details.workflow_failure_exception_types
9597
@signal_handlers = HandlerHash.new(
9698
details.definition.signals,
@@ -182,7 +184,7 @@ def activate(activation)
182184
# Apply jobs and run event loop
183185
begin
184186
# Create instance if it doesn't already exist
185-
@instance ||= with_context_frozen { create_instance }
187+
@instance ||= with_context_frozen(in_query_or_validator: false) { create_instance }
186188

187189
# Apply jobs
188190
activation.jobs.each { |job| apply(job) }
@@ -439,7 +441,7 @@ def apply_query(job)
439441
end
440442
result_hint = defn.result_hint
441443

442-
with_context_frozen do
444+
with_context_frozen(in_query_or_validator: true) do
443445
@inbound.handle_query(
444446
Temporalio::Worker::Interceptor::Workflow::HandleQueryInput.new(
445447
id: job.query_id,
@@ -502,7 +504,7 @@ def apply_update(job)
502504
# other SDKs, we are re-converting the args between validate and update to disallow user mutation in
503505
# validator/interceptor.
504506
if job.run_validator && defn.validator_to_invoke
505-
with_context_frozen do
507+
with_context_frozen(in_query_or_validator: true) do
506508
@inbound.validate_update(
507509
Temporalio::Worker::Interceptor::Workflow::HandleUpdateInput.new(
508510
id: job.id,
@@ -663,11 +665,13 @@ def failure_exception?(err)
663665
@definition_options.failure_exception_types&.any? { |cls| err.is_a?(cls) }
664666
end
665667

666-
def with_context_frozen(&)
668+
def with_context_frozen(in_query_or_validator:, &)
667669
@context_frozen = true
670+
@in_query_or_validator = in_query_or_validator
668671
yield
669672
ensure
670673
@context_frozen = false
674+
@in_query_or_validator = false
671675
end
672676

673677
def convert_handler_args(payload_array:, defn:)

temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ def replaying?
242242
@instance.replaying
243243
end
244244

245+
def replaying_history_events?
246+
@instance.replaying && !@instance.in_query_or_validator
247+
end
248+
245249
def search_attributes
246250
@instance.search_attributes
247251
end

temporalio/lib/temporalio/internal/worker/workflow_instance/replay_safe_logger.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ def replay_safety_disabled(&)
2323
end
2424

2525
def add(...)
26-
if !@replay_safety_disabled && Temporalio::Workflow.in_workflow? && Temporalio::Workflow::Unsafe.replaying?
26+
if !@replay_safety_disabled && Temporalio::Workflow.in_workflow? &&
27+
Temporalio::Workflow::Unsafe.replaying_history_events?
2728
return true
2829
end
2930

temporalio/lib/temporalio/internal/worker/workflow_instance/replay_safe_metric.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ class WorkflowInstance
99
# Wrapper for a metric that does not log on replay.
1010
class ReplaySafeMetric < SimpleDelegator
1111
def record(value, additional_attributes: nil)
12-
return if Temporalio::Workflow.in_workflow? && Temporalio::Workflow::Unsafe.replaying?
12+
return if Temporalio::Workflow.in_workflow? &&
13+
Temporalio::Workflow::Unsafe.replaying_history_events?
1314

1415
super
1516
end

temporalio/lib/temporalio/workflow.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,11 +533,18 @@ def self._current_or_nil
533533
# Unsafe module contains only-in-workflow methods that are considered unsafe. These should not be used unless the
534534
# consequences are understood.
535535
module Unsafe
536-
# @return [Boolean] True if the workflow is replaying, false otherwise. Most code should not check this value.
536+
# @return [Boolean] True if the workflow is replaying (including during queries and update validators), false
537+
# otherwise. Most code should not check this value.
537538
def self.replaying?
538539
Workflow._current.replaying?
539540
end
540541

542+
# @return [Boolean] True if the workflow is replaying history events (excluding queries and update validators),
543+
# false otherwise. Most code should not check this value.
544+
def self.replaying_history_events?
545+
Workflow._current.replaying_history_events?
546+
end
547+
541548
# Run a block of code with illegal call tracing disabled. Users should be cautious about using this as it can
542549
# often signify unsafe code.
543550
#

temporalio/sig/temporalio/internal/worker/workflow_instance.rbs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ module Temporalio
3434
attr_reader update_handlers: Hash[String?, Workflow::Definition::Update]
3535
attr_reader context_frozen: bool
3636
attr_reader assert_valid_local_activity: ^(String) -> void
37+
attr_reader in_query_or_validator: bool
3738

3839
attr_accessor io_enabled: bool
3940
attr_accessor current_details: String?
@@ -76,7 +77,7 @@ module Temporalio
7677

7778
def failure_exception?: (Exception err) -> bool
7879

79-
def with_context_frozen: [T] { -> T } -> T
80+
def with_context_frozen: [T] (in_query_or_validator: bool) { -> T } -> T
8081

8182
def convert_handler_args: (
8283
payload_array: Array[untyped],

temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ module Temporalio
9191

9292
def replaying?: -> bool
9393

94+
def replaying_history_events?: -> bool
95+
9496
def search_attributes: -> SearchAttributes
9597

9698
def signal_handlers: -> HandlerHash[Workflow::Definition::Signal]

temporalio/sig/temporalio/workflow.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ module Temporalio
153153
module Unsafe
154154
def self.replaying?: -> bool
155155

156+
def self.replaying_history_events?: -> bool
157+
156158
def self.illegal_call_tracing_disabled: [T] { -> T } -> T
157159

158160
def self.io_enabled: [T] { -> T } -> T

temporalio/test/worker_workflow_test.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,12 @@ def update
823823
Temporalio::Workflow.sleep(0.01)
824824
end
825825

826+
workflow_query
827+
def query
828+
Temporalio::Workflow.logger.info('query-log')
829+
'query-result'
830+
end
831+
826832
workflow_signal
827833
def cause_task_failure
828834
raise 'Some failure'
@@ -836,6 +842,9 @@ def test_logger
836842
handle.execute_update(LoggerWorkflow.update)
837843
# Send signal which causes replay when cache disabled
838844
handle.signal(:some_signal)
845+
# Query during replay - should still allow logging
846+
result = handle.query(LoggerWorkflow.query)
847+
assert_equal 'query-result', result
839848
end
840849
end
841850
lines = out.split("\n")
@@ -851,6 +860,11 @@ def test_logger
851860
assert bad_lines.size >= 2
852861
refute_includes bad_lines.first, '"LoggerWorkflow"'
853862

863+
# Confirm query logs appear (should not be suppressed)
864+
query_lines = lines.select { |l| l.include?('query-log') }
865+
assert query_lines.size >= 1, 'Expected at least one query log'
866+
assert_includes query_lines.first, 'workflow_type'
867+
854868
# Confirm task failure logs
855869
out, = safe_capture_io do
856870
execute_workflow(LoggerWorkflow, logger: Logger.new($stdout)) do |handle|

0 commit comments

Comments
 (0)