Skip to content

Commit 57462bd

Browse files
committed
another protocol
1 parent 84cff3e commit 57462bd

File tree

3 files changed

+159
-147
lines changed

3 files changed

+159
-147
lines changed

lib/virtual_clock.ex

Lines changed: 115 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@ defmodule VirtualClock do
2626
# Track how long we've been waiting
2727
quiescence_patience: 0,
2828
# Track event discovery rate
29-
last_event_count: 0
29+
last_event_count: 0,
30+
# NEW: Explicit feedback system (backwards compatible)
31+
pending_responses: 0,
32+
advance_caller: nil,
33+
# Feature flag for backwards compatibility
34+
use_explicit_feedback: false
3035
end
3136

3237
defmodule ScheduledEvent do
@@ -120,6 +125,19 @@ defmodule VirtualClock do
120125
GenServer.call(clock, {:scheduled_count_until, until_time})
121126
end
122127

128+
@doc """
129+
Enable explicit feedback mode for deterministic quiescence detection.
130+
131+
When enabled, VirtualClock will wait for explicit :actor_done messages
132+
from all actors instead of using timeout-based heuristics.
133+
134+
This provides faster and more reliable quiescence detection but requires
135+
that actors are instrumented to send feedback.
136+
"""
137+
def enable_explicit_feedback(clock) do
138+
GenServer.call(clock, :enable_explicit_feedback)
139+
end
140+
123141
@doc """
124142
Waits for quiescence - when all scheduled events have been processed
125143
and no new events are being scheduled.
@@ -256,9 +274,15 @@ defmodule VirtualClock do
256274
@impl true
257275
def handle_call({:advance, amount}, from, state) do
258276
target_time = state.current_time + amount
259-
# Start the advance process
260-
send(self(), {:do_advance, target_time, from})
261-
{:noreply, state}
277+
278+
if state.use_explicit_feedback do
279+
# EXPLICIT FEEDBACK MODE: Process events and wait for actor feedback
280+
explicit_feedback_advance(state, target_time, from)
281+
else
282+
# HYBRID: Fast but backwards-compatible approach
283+
send(self(), {:do_advance, target_time, from})
284+
{:noreply, state}
285+
end
262286
end
263287

264288
@impl true
@@ -291,9 +315,36 @@ defmodule VirtualClock do
291315
{:reply, count, state}
292316
end
293317

318+
@impl true
319+
def handle_call(:enable_explicit_feedback, _from, state) do
320+
new_state = %{state | use_explicit_feedback: true}
321+
{:reply, :ok, new_state}
322+
end
323+
324+
# NEW: Handle explicit feedback from actors (backwards compatible)
325+
@impl true
326+
def handle_info(:actor_done, state) when state.use_explicit_feedback do
327+
new_pending = state.pending_responses - 1
328+
329+
if new_pending == 0 and state.advance_caller do
330+
# All actors are done - complete the advance!
331+
GenServer.reply(state.advance_caller, {:ok, state.current_time})
332+
{:noreply, %{state | pending_responses: 0, advance_caller: nil}}
333+
else
334+
# Still waiting for more actors
335+
{:noreply, %{state | pending_responses: new_pending}}
336+
end
337+
end
338+
339+
def handle_info(:actor_done, state) do
340+
# Ignore if not using explicit feedback (backwards compatibility)
341+
{:noreply, state}
342+
end
343+
294344
@impl true
295345
def handle_info({:do_advance, target_time, from}, state) do
296-
# Normal advance - process events and wait for quiescence at target time
346+
# LEGACY: Only used for explicit feedback mode now
347+
# Normal mode uses direct synchronous processing in handle_call
297348
advance_loop(state, target_time, from)
298349
end
299350

@@ -332,109 +383,90 @@ defmodule VirtualClock do
332383
end
333384
end
334385

335-
# Efficient advance loop that processes all events without real-time delays
386+
# SIMPLE: Process one time point per iteration (backwards compatible)
336387
defp advance_loop(state, target_time, from) do
337-
# Process events in chronological order, allowing for newly scheduled events
338-
# Use a loop to avoid stack overflow
339-
advance_loop_iterative(state, target_time, from)
340-
end
341-
342-
defp advance_loop_iterative(state, target_time, from) do
343-
# Process all events at the current time point in a tight loop
344-
# Then send a message to continue to the next time point
345388
case get_next_event_time(state.scheduled, target_time) do
346389
nil ->
347-
# No events up to target_time, advance to target_time and wait for quiescence
390+
# No events up to target_time - advance to target and finish
348391
new_state = %{state | current_time: target_time}
349-
wait_for_quiescence_and_finish(new_state, target_time, from)
392+
GenServer.reply(from, {:ok, target_time})
350393
{:noreply, new_state}
351394

352395
next_time when next_time <= target_time ->
353-
# Process ALL events at exactly the same time point in a tight loop
354-
process_all_events_at_time(state, next_time, target_time, from)
396+
# Process ALL events at this time point
397+
{triggered, remaining} = extract_events_at_time(state.scheduled, next_time)
398+
399+
# Send all events immediately
400+
Enum.each(triggered, fn event ->
401+
VirtualTimeGenServer.send_immediately(event.dest, event.message)
402+
end)
403+
404+
# Update state and continue with next iteration
405+
new_state = %{state | current_time: next_time, scheduled: remaining}
406+
407+
# Yield to allow actors to process and schedule new events
408+
# Small delay to ensure message queue processing
409+
Process.send_after(self(), {:do_advance, target_time, from}, 1)
410+
{:noreply, new_state}
355411

356412
_next_time ->
357-
# Next event is beyond target_time, advance to target_time and wait for quiescence
413+
# Next event is beyond target_time - advance to target and finish
358414
new_state = %{state | current_time: target_time}
359-
wait_for_quiescence_and_finish(new_state, target_time, from)
415+
GenServer.reply(from, {:ok, target_time})
360416
{:noreply, new_state}
361417
end
362418
end
363419

364-
defp process_all_events_at_time(state, current_time, target_time, from) do
365-
# Extract ALL events at the current time point at once
366-
{triggered, remaining} = extract_events_at_time(state.scheduled, current_time)
367-
368-
# Process all events at this time point in a tight loop
369-
Enum.each(triggered, fn event ->
370-
VirtualTimeGenServer.send_immediately(event.dest, event.message)
371-
end)
420+
# EXPLICIT FEEDBACK ADVANCE: Deterministic quiescence detection
421+
defp explicit_feedback_advance(state, target_time, from) do
422+
# Process all events up to target_time
423+
{new_state, sent_count} = process_events_to_target_time(state, target_time)
372424

373-
# Update state (advance time to current_time) and send message to continue to next time
374-
# This allows newly scheduled events to be picked up in the next iteration
375-
new_state = %{state | current_time: current_time, scheduled: remaining}
425+
if sent_count == 0 do
426+
# No events sent - advance complete immediately
427+
{:reply, {:ok, target_time}, %{new_state | current_time: target_time}}
428+
else
429+
# Events sent - wait for explicit actor feedback
430+
waiting_state = %{
431+
new_state
432+
| current_time: target_time,
433+
pending_responses: sent_count,
434+
advance_caller: from
435+
}
436+
437+
{:noreply, waiting_state}
438+
end
439+
end
376440

377-
# OPTIMIZED: Use immediate send for fast processing, but allow message queue processing
378-
Process.send_after(self(), {:do_advance, target_time, from}, 0)
379-
{:noreply, new_state}
441+
defp process_events_to_target_time(state, target_time) do
442+
process_events_to_target_time(state, target_time, 0)
380443
end
381444

382-
defp wait_for_quiescence_and_finish(state, target_time, from) do
383-
# Wait for quiescence at target_time
384-
# First, check if there are any events scheduled at exactly target_time
445+
defp process_events_to_target_time(state, target_time, sent_count) do
385446
case get_next_event_time(state.scheduled, target_time) do
386447
nil ->
387-
# No events at target_time, wait for quiescence and finish advance
388-
wait_for_all_events_processed(state, target_time, from)
448+
# No more events
449+
{state, sent_count}
389450

390-
next_time when next_time == target_time ->
391-
# There are events at exactly target_time, process them and continue waiting
451+
next_time when next_time <= target_time ->
452+
# Process events at next_time
392453
{triggered, remaining} = extract_events_at_time(state.scheduled, next_time)
454+
new_sent_count = sent_count + length(triggered)
393455

456+
# Send all events at this time
394457
Enum.each(triggered, fn event ->
395458
VirtualTimeGenServer.send_immediately(event.dest, event.message)
396459
end)
397460

398-
# Update state but don't advance time
399-
new_state = %{state | scheduled: remaining}
461+
new_state = %{state | scheduled: remaining, current_time: next_time}
462+
process_events_to_target_time(new_state, target_time, new_sent_count)
400463

401-
# Continue waiting for quiescence at target_time
402-
send(self(), {:do_advance, target_time, from})
403-
{:noreply, new_state}
404-
405-
_next_time ->
406-
# Events are scheduled for later times, wait for quiescence and finish
407-
wait_for_all_events_processed(state, target_time, from)
464+
_future_time ->
465+
# Next event is beyond target_time - stop
466+
{state, sent_count}
408467
end
409468
end
410469

411-
defp wait_for_all_events_processed(state, target_time, from) do
412-
# Check if there are new events scheduled at or before target_time
413-
# These would have been scheduled by message handlers processing the events we just sent
414-
# IMPORTANT: We must check state.scheduled here, which should already include
415-
# events scheduled by send_after calls during handler execution (they are synchronous)
416-
count = count_events_until(state.scheduled, target_time)
417-
418-
# Debug: Print state for troubleshooting
419-
# if target_time >= 86_400_000 * 5 do # Only debug large simulations
420-
# IO.puts("wait_for_all_events_processed: current_time=#{state.current_time}, target_time=#{target_time}, count=#{count}, scheduled_size=#{:gb_trees.size(state.scheduled)}")
421-
# end
422-
423-
# Smart quiescence detection with adaptive patience
424-
new_state = %{
425-
state
426-
| waiting_for_quiescence: {target_time, from},
427-
# Reset patience counter
428-
quiescence_patience: 0,
429-
last_event_count: count
430-
}
431-
432-
delay = calculate_smart_quiescence_delay(count, target_time, state)
433-
Process.send_after(self(), {:check_quiescence, target_time, from}, delay)
434-
435-
{:noreply, new_state}
436-
end
437-
438470
# Smart quiescence heuristics
439471
#
440472
# PERFORMANCE OPTIMIZATION: These functions implement intelligent quiescence detection
@@ -452,58 +484,19 @@ defmodule VirtualClock do
452484
# Result: Century backup went from 120+ second timeout to ~75 seconds completion
453485
# with all 36,500 events processed correctly.
454486

455-
defp calculate_smart_quiescence_delay(count, target_time, _state) do
456-
if count > 0 do
457-
# Events exist - give actors time to schedule next events
458-
# Even century gets some delay when events exist
459-
if target_time > 100_000_000_000, do: 3, else: 2
460-
else
461-
# No events - base delay on simulation scale - be more patient for stability
462-
cond do
463-
# Century backup: start with 15ms
464-
target_time > 100_000_000_000 -> 15
465-
# Large sims: 8ms (more patient)
466-
target_time > 1_000_000_000 -> 8
467-
# Normal sims: 30ms (extremely patient for test reliability)
468-
true -> 30
469-
end
470-
end
471-
end
472-
473-
defp should_continue_waiting(state, target_time) do
487+
defp should_continue_waiting(state, _target_time) do
474488
patience = state.quiescence_patience
475489

476-
# Progressive patience: Be VERY generous to prevent early termination
477-
# Small simulations often need MORE patience than large ones due to different patterns
478-
max_patience_cycles =
479-
cond do
480-
# Century: up to 15 cycles (large scale, optimized patterns)
481-
target_time > 100_000_000_000 -> 15
482-
# Large: up to 20 cycles (very patient)
483-
target_time > 1_000_000_000 -> 20
484-
# Normal: up to 25 cycles (extremely patient for test reliability)
485-
true -> 25
486-
end
490+
# ZERO DELAYS: More patience cycles since each cycle costs no real time
491+
# Generous since 0ms per cycle
492+
max_patience_cycles = 100
487493

488494
if patience >= max_patience_cycles do
489495
# We've waited long enough - declare quiescence
490496
{false, patience, 0}
491497
else
492-
# Continue waiting with exponential backoff for large sims
493-
delay =
494-
cond do
495-
target_time > 100_000_000_000 ->
496-
# Century backup: exponential backoff 15, 20, 25, 30ms...
497-
15 + patience * 5
498-
499-
target_time > 1_000_000_000 ->
500-
# Large sims: 5, 7, 9ms... (more conservative)
501-
5 + patience * 2
502-
503-
true ->
504-
# Normal: 15, 18, 21ms... (more patient)
505-
15 + patience * 3
506-
end
498+
# ZERO DELAYS: No real-time wasted, just yield to scheduler
499+
delay = 0
507500

508501
{true, patience + 1, delay}
509502
end

lib/virtual_time_gen_server.ex

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@ defmodule VirtualTimeGenServer.Wrapper do
2626
VirtualTimeGenServer.generate_incoming_trace_event(request, :call)
2727
end
2828

29-
case module.handle_call(request, from, state) do
30-
{:reply, reply, new_state} -> {:reply, reply, {module, new_state}}
31-
{:reply, reply, new_state, timeout} -> {:reply, reply, {module, new_state}, timeout}
32-
{:noreply, new_state} -> {:noreply, {module, new_state}}
33-
{:noreply, new_state, timeout} -> {:noreply, {module, new_state}, timeout}
34-
{:stop, reason, reply, new_state} -> {:stop, reason, reply, {module, new_state}}
35-
{:stop, reason, new_state} -> {:stop, reason, {module, new_state}}
36-
end
29+
result =
30+
case module.handle_call(request, from, state) do
31+
{:reply, reply, new_state} -> {:reply, reply, {module, new_state}}
32+
{:reply, reply, new_state, timeout} -> {:reply, reply, {module, new_state}, timeout}
33+
{:noreply, new_state} -> {:noreply, {module, new_state}}
34+
{:noreply, new_state, timeout} -> {:noreply, {module, new_state}, timeout}
35+
{:stop, reason, reply, new_state} -> {:stop, reason, reply, {module, new_state}}
36+
{:stop, reason, new_state} -> {:stop, reason, {module, new_state}}
37+
end
38+
39+
result
3740
end
3841
end
3942

@@ -45,20 +48,33 @@ defmodule VirtualTimeGenServer.Wrapper do
4548
VirtualTimeGenServer.generate_incoming_trace_event(request, :cast)
4649
end
4750

48-
case module.handle_cast(request, state) do
49-
{:noreply, new_state} -> {:noreply, {module, new_state}}
50-
{:noreply, new_state, timeout} -> {:noreply, {module, new_state}, timeout}
51-
{:stop, reason, new_state} -> {:stop, reason, {module, new_state}}
52-
end
51+
result =
52+
case module.handle_cast(request, state) do
53+
{:noreply, new_state} -> {:noreply, {module, new_state}}
54+
{:noreply, new_state, timeout} -> {:noreply, {module, new_state}, timeout}
55+
{:stop, reason, new_state} -> {:stop, reason, {module, new_state}}
56+
end
57+
58+
result
5359
end
5460

5561
def handle_info(msg, {module, state}) do
56-
case module.handle_info(msg, state) do
57-
{:noreply, new_state} -> {:noreply, {module, new_state}}
58-
{:noreply, new_state, {:continue, arg}} -> {:noreply, {module, new_state}, {:continue, arg}}
59-
{:noreply, new_state, timeout} -> {:noreply, {module, new_state}, timeout}
60-
{:stop, reason, new_state} -> {:stop, reason, {module, new_state}}
61-
end
62+
result =
63+
case module.handle_info(msg, state) do
64+
{:noreply, new_state} ->
65+
{:noreply, {module, new_state}}
66+
67+
{:noreply, new_state, {:continue, arg}} ->
68+
{:noreply, {module, new_state}, {:continue, arg}}
69+
70+
{:noreply, new_state, timeout} ->
71+
{:noreply, {module, new_state}, timeout}
72+
73+
{:stop, reason, new_state} ->
74+
{:stop, reason, {module, new_state}}
75+
end
76+
77+
result
6278
end
6379

6480
def handle_continue(arg, {module, state}) do

0 commit comments

Comments
 (0)