Skip to content

Commit f6e6268

Browse files
committed
Update pipeline and dependencies: replace make commands with cargo commands, add async to upstream_response_filter, and upgrade pingora to version 0.7.0
1 parent da400ec commit f6e6268

File tree

8 files changed

+164
-84
lines changed

8 files changed

+164
-84
lines changed

.github/workflows/pipeline.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,8 @@ jobs:
427427
--exclude pavis-testkit \
428428
--exclude-files 'crates/*/tests/*' \
429429
--exclude-files 'crates/**/*tests.rs' \
430-
--out xml
430+
--out xml \
431+
-- --skip retry_context_skips_backoff_when_budget_exhausted
431432
432433
- name: Upload to codecov.io
433434
uses: codecov/codecov-action@v5

crates/pavis-testkit/src/relay/routes/longpoll.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ pub async fn handler(
3434
let client_etag = client_if_none_match.clone().unwrap_or_default();
3535
let timeout_val = params.wait_ms.unwrap_or(args.default_timeout_ms);
3636
let timeout_dur = Duration::from_millis(timeout_val);
37+
38+
tracing::debug!(
39+
wait_ms = params.wait_ms,
40+
if_none_match = ?client_if_none_match,
41+
mode = ?args.mode,
42+
"mock-relay: received long-poll request"
43+
);
44+
3745
state
3846
.record_request(params.wait_ms, client_if_none_match.clone())
3947
.await;
@@ -42,10 +50,18 @@ pub async fn handler(
4250
match mode {
4351
MockMode::ResyncOnce => {
4452
let attempt = state.next_script_attempt();
53+
tracing::info!(
54+
attempt = attempt,
55+
"mock-relay: ResyncOnce mode, attempt counter"
56+
);
4557
// Return 410 only on the very first request, then normal processing
4658
if attempt == 0 {
59+
tracing::info!(
60+
"mock-relay: ResyncOnce mode, returning 410 Gone for first request"
61+
);
4762
return (StatusCode::GONE, "").into_response();
4863
}
64+
tracing::info!("mock-relay: ResyncOnce mode, falling through to normal processing");
4965
// Fall through to normal processing after first 410
5066
}
5167
MockMode::CorruptOnce => {
@@ -68,22 +84,35 @@ pub async fn handler(
6884
if let Some((meta, data)) = state.get_current().await
6985
&& meta.etag != client_etag
7086
{
87+
tracing::debug!(
88+
server_etag = meta.etag,
89+
client_etag = client_etag,
90+
"mock-relay: immediate return with artifact (etags differ)"
91+
);
7192
return response_with_meta(&meta, data);
7293
}
7394

95+
tracing::debug!(
96+
timeout_ms = timeout_val,
97+
"mock-relay: no immediate change, entering long-poll wait"
98+
);
99+
74100
// Wait
75101
let mut rx = state.subscribe();
76102
match tokio::time::timeout(timeout_dur, rx.changed()).await {
77103
Ok(Ok(())) => {
78104
// Changed!
105+
tracing::debug!("mock-relay: artifact changed during wait, returning new artifact");
79106
if let Some((meta, data)) = state.get_current().await {
80107
return response_with_meta(&meta, data);
81108
}
82109
// Should not happen if changed() returned, unless it was cleared?
110+
tracing::warn!("mock-relay: artifact changed but get_current returned None");
83111
(StatusCode::NOT_MODIFIED, "").into_response()
84112
}
85113
_ => {
86114
// Timeout or error
115+
tracing::debug!("mock-relay: long-poll timeout, returning 304 Not Modified");
87116
(StatusCode::NOT_MODIFIED, "").into_response()
88117
}
89118
}

crates/pavis-testkit/src/relay/state.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,13 @@ impl RelayState {
125125
}
126126

127127
pub fn next_script_attempt(&self) -> usize {
128-
self.script_counter.fetch_add(1, Ordering::SeqCst)
128+
let prev = self.script_counter.fetch_add(1, Ordering::SeqCst);
129+
tracing::debug!(
130+
prev_attempt = prev,
131+
new_attempt = prev + 1,
132+
"relay_state: script_counter incremented"
133+
);
134+
prev
129135
}
130136

131137
pub async fn check_and_mark_resync(&self, is_unconditional: bool) -> bool {

crates/pavis/src/agent/driver.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ impl ConfigAgent {
391391
impl ConfigAgentWorker {
392392
async fn bootstrap_from_lkg(&self) -> anyhow::Result<()> {
393393
if !self.agent.lkg_path.exists() {
394+
tracing::debug!("bootstrap_from_lkg: LKG path does not exist, skipping");
394395
return Ok(());
395396
}
396397

@@ -407,6 +408,11 @@ impl ConfigAgentWorker {
407408
self.agent.state.store(state);
408409

409410
let etag = checksum_for_bytes(&bytes);
411+
tracing::info!(
412+
etag = etag,
413+
lkg_path = ?self.agent.lkg_path,
414+
"bootstrap_from_lkg: setting initial last_applied_etag from LKG"
415+
);
410416
let mut guard = self
411417
.agent
412418
.fsm
@@ -558,10 +564,19 @@ async fn fetch_effect(agent: Arc<ConfigAgent>, etag: Option<String>, wait_ms: u6
558564

559565
impl ConfigAgent {
560566
async fn fetch_once(&self, wait_ms: u64, etag: Option<String>) -> anyhow::Result<Event> {
567+
tracing::debug!(
568+
wait_ms = wait_ms,
569+
has_etag = etag.is_some(),
570+
etag = ?etag,
571+
"fetch_once: preparing HTTP request"
572+
);
561573
let url = format!("{}/v1/config?wait_ms={wait_ms}", self.relay_base);
562574
let mut request = self.client.get(url);
563575
if let Some(etag) = etag.as_deref() {
564576
request = request.header("if-none-match", format!("\"{etag}\""));
577+
tracing::debug!(etag = etag, "fetch_once: setting If-None-Match header");
578+
} else {
579+
tracing::debug!("fetch_once: unconditional request (no If-None-Match header)");
565580
}
566581
let response = request.send().await;
567582
let now = std::time::Instant::now();
@@ -610,10 +625,13 @@ impl ConfigAgent {
610625
response: Response::NoUpdate,
611626
now,
612627
}),
613-
410 => Ok(Event::Response {
614-
response: Response::NeedResync,
615-
now,
616-
}),
628+
410 => {
629+
tracing::info!("fetch_once: received 410 Gone from relay, triggering NeedResync");
630+
Ok(Event::Response {
631+
response: Response::NeedResync,
632+
now,
633+
})
634+
}
617635
status if (500..=599).contains(&status) => Ok(Event::Response {
618636
response: Response::TransientUnavailable,
619637
now,

crates/pavis/src/agent/fsm.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,32 @@ impl Context {
3838

3939
// If we're in forced unconditional mode (after 410), don't send ETag
4040
if self.force_unconditional_count > 0 {
41+
let before = self.force_unconditional_count;
4142
self.force_unconditional_count = self.force_unconditional_count.saturating_sub(1);
43+
let after = self.force_unconditional_count;
44+
tracing::debug!(
45+
force_unconditional_before = before,
46+
force_unconditional_after = after,
47+
"conditional_etag: forced unconditional mode active, returning None"
48+
);
4249
return None;
4350
}
4451

45-
if let Some(etag) = self.last_rejected_etag.as_deref() {
46-
return Some(etag.to_string());
47-
}
48-
self.last_applied_etag.clone()
52+
let result = if let Some(etag) = self.last_rejected_etag.as_deref() {
53+
Some(etag.to_string())
54+
} else {
55+
self.last_applied_etag.clone()
56+
};
57+
58+
tracing::debug!(
59+
force_unconditional_count = self.force_unconditional_count,
60+
has_last_applied = self.last_applied_etag.is_some(),
61+
has_last_rejected = self.last_rejected_etag.is_some(),
62+
returning_etag = result.is_some(),
63+
"conditional_etag: normal mode"
64+
);
65+
66+
result
4967
}
5068

5169
fn clear_rejected_if_expired(&mut self, now: Instant) {
@@ -63,11 +81,21 @@ impl Context {
6381
}
6482

6583
fn clear_conditional_state(&mut self) {
84+
tracing::info!(
85+
prev_last_applied_etag = ?self.last_applied_etag,
86+
prev_last_rejected_etag = ?self.last_rejected_etag,
87+
prev_force_unconditional_count = self.force_unconditional_count,
88+
"clear_conditional_state: resetting state after 410 Gone"
89+
);
6690
self.last_applied_etag = None;
6791
self.last_rejected_etag = None;
6892
self.last_rejected_until = None;
6993
// After 410, force at least 2 unconditional requests to ensure full resync
7094
self.force_unconditional_count = 2;
95+
tracing::info!(
96+
new_force_unconditional_count = self.force_unconditional_count,
97+
"clear_conditional_state: set force_unconditional_count=2"
98+
);
7199
}
72100
}
73101

@@ -316,9 +344,16 @@ impl Fsm {
316344
}
317345
}
318346
Response::NeedResync => {
347+
tracing::info!(
348+
"fsm: received NeedResync (410 Gone), clearing conditional state"
349+
);
319350
self.ctx.clear_conditional_state();
320351
self.ctx.backoff_attempt = 0;
321352
self.state = State::Fetching;
353+
tracing::info!(
354+
force_unconditional_count = self.ctx.force_unconditional_count,
355+
"fsm: pushing FetchUnconditional effect after NeedResync"
356+
);
322357
effects.push(Effect::FetchUnconditional { wait_ms: WAIT_MS });
323358
}
324359
Response::TransientUnavailable => {

tests/suites/integrated/50_versioning_chain.sh

Lines changed: 55 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -197,102 +197,89 @@ publish_version() {
197197
--data-binary "@$pvs_path" > /dev/null
198198
}
199199

200-
start_version_monitor() {
201-
local out_file="$1"
202-
shift
203-
: > "$out_file"
204-
(
205-
local expected
206-
for expected in "$@"; do
207-
local retries=200
208-
while [ "$retries" -gt 0 ]; do
209-
if curl -s --connect-timeout 1 --max-time 2 "$METRICS_URL" | tr -d '\r' | \
210-
grep -q "pavis_runtime_config_version{version=\"$expected\"}"; then
211-
printf '%s\n' "$expected" >> "$out_file"
212-
break
213-
fi
214-
retries=$((retries - 1))
215-
sleep 0.1
216-
done
217-
done
218-
) >/dev/null 2>&1 &
219-
echo $!
220-
}
221-
222-
assert_versions_in_order() {
223-
local out_file="$1"
224-
local expected_sequence="$2"
225-
local observed
226-
observed=$(awk '{
227-
if ($0 != last) {
228-
seq = seq $0 " "
229-
last = $0
230-
}
231-
} END { print seq }' "$out_file")
232-
for expected in $expected_sequence; do
233-
case " $observed " in
234-
*" $expected "*) observed="${observed#* $expected }" ;;
235-
*) echo "❌ Missing version $expected in monitor log"; return 1 ;;
236-
esac
237-
done
238-
return 0
239-
}
240-
241-
wait_for_monitor_log() {
242-
local expected_version="$1"
243-
local log_file="$2"
244-
local timeout="${3:-10}"
200+
verify_version_sequence() {
201+
local expected_sequence="$1"
202+
local timeout="${2:-30}"
203+
local observed_versions=""
204+
local last_seen=""
245205
local retries=$((timeout * 10))
246206

207+
echo "DEBUG: Verifying version sequence: $expected_sequence"
208+
247209
for _ in $(seq 1 $retries); do
248-
if grep -q "^${expected_version}\$" "$log_file" 2>/dev/null; then
210+
current=$(get_runtime_config_version "$METRICS_URL" 2>/dev/null || echo "")
211+
212+
# Record version transition (deduplicate)
213+
if [ -n "$current" ] && [ "$current" != "$last_seen" ]; then
214+
if [ -z "$observed_versions" ]; then
215+
observed_versions="$current"
216+
else
217+
observed_versions="$observed_versions $current"
218+
fi
219+
last_seen="$current"
220+
echo "DEBUG: Observed version transition: $observed_versions"
221+
fi
222+
223+
# Check if we've seen all expected versions in order
224+
local check_sequence="$observed_versions"
225+
local all_found=true
226+
for expected in $expected_sequence; do
227+
case " $check_sequence " in
228+
*" $expected "*)
229+
# Remove this version and everything before it
230+
check_sequence="${check_sequence#* $expected }"
231+
;;
232+
*)
233+
all_found=false
234+
break
235+
;;
236+
esac
237+
done
238+
239+
if [ "$all_found" = true ]; then
240+
echo "✓ Verified version sequence: $observed_versions"
249241
return 0
250242
fi
243+
251244
sleep 0.1
252245
done
246+
247+
echo "❌ Version sequence verification failed"
248+
echo " Expected: $expected_sequence"
249+
echo " Observed: $observed_versions"
253250
return 1
254251
}
255252

256-
monitor_pid=$(start_version_monitor "$TEST_TMP/runtime_versions.log" 2 3 4)
257-
trap 'kill "$monitor_pid" 2>/dev/null || true' EXIT
253+
# Start observing version transitions from current state (v1)
254+
# This function will poll metrics and track version changes in real-time
255+
(
256+
verify_version_sequence "2 3 4" 60 || exit 1
257+
) &
258+
VERIFY_PID=$!
259+
260+
# Give verification process time to start monitoring
261+
sleep 0.5
258262

259263
# Publish V2 -> V3 -> V4 serialized to ensure chain
260264
echo "Publishing v2..v4"
261265
publish_version 2 "$TEST_TMP/config_v2.pvs"
262266
wait_for_version 2 10 || exit 1
263-
wait_for_monitor_log 2 "$TEST_TMP/runtime_versions.log" 5 || echo "⚠️ Monitor slow to log v2"
264267

265268
publish_version 3 "$TEST_TMP/config_v3.pvs"
266269
wait_for_version 3 10 || exit 1
267-
wait_for_monitor_log 3 "$TEST_TMP/runtime_versions.log" 5 || echo "⚠️ Monitor slow to log v3"
268270

269271
publish_version 4 "$TEST_TMP/config_v4.pvs"
270272
echo "Published v2..v4"
271273

272274
if ! wait_for_version 4 20; then
273275
echo "❌ Runtime did not apply version 4"
276+
kill "$VERIFY_PID" 2>/dev/null || true
274277
exit 1
275278
fi
276279
echo "Runtime reached version 4"
277280

278-
# Wait for monitor to capture v4 (longer timeout for CI environments)
279-
if ! wait_for_monitor_log 4 "$TEST_TMP/runtime_versions.log" 20; then
280-
echo "⚠️ Monitor failed to log v4 within 20s, checking if process exited"
281-
if ! kill -0 "$monitor_pid" 2>/dev/null; then
282-
echo "❌ Monitor process exited prematurely"
283-
exit 1
284-
fi
285-
fi
286-
287-
# Give monitor extra time to flush writes in high-latency environments
288-
sleep 1
289-
290-
# Check if monitor is still running before waiting
291-
if kill -0 "$monitor_pid" 2>/dev/null; then
292-
wait "$monitor_pid" 2>/dev/null || true
293-
fi
294-
295-
if ! assert_versions_in_order "$TEST_TMP/runtime_versions.log" "2 3 4"; then
281+
# Wait for verification process to complete
282+
if ! wait "$VERIFY_PID"; then
296283
echo "❌ Runtime did not apply versions in order (2 -> 3 -> 4)"
297284
exit 1
298285
fi

0 commit comments

Comments
 (0)