Skip to content

Commit e270168

Browse files
committed
Document dfaas event streaming status and bump version
1 parent 7351cff commit e270168

File tree

9 files changed

+275
-10
lines changed

9 files changed

+275
-10
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
# DFaaS Event Streaming: Missing Events in Dashboard
2+
3+
## Goal
4+
Make k6 and runner events appear in the TUI dashboard Log Stream and Event Status
5+
without requiring manual env var setup.
6+
7+
## Expected Behavior
8+
- LocalRunner emits `LB_EVENT` lines while the workload runs.
9+
- These events are ingested by the controller pipeline and shown in the dashboard.
10+
- k6 log lines are emitted as `LB_EVENT` (type=log) and appear in the dashboard.
11+
- Polling loop should not drown out event logs.
12+
13+
## Current Observations
14+
- Dashboard still does not show events (runner/k6), even after reducing polling noise.
15+
- Polling tasks are now summarized in-place as a single line ("Polling loop ...").
16+
- k6 log stream works on the generator (k6.log exists and is updated).
17+
18+
## Event Flow (How It Should Work)
19+
1) **LocalRunner (target)** runs `lb_runner.services.async_localrunner`.
20+
2) LocalRunner logs via `LBEventLogHandler` (default enabled), writing `LB_EVENT` lines to stdout.
21+
3) Async LocalRunner tee writes stdout to `lb_events.stream.log`.
22+
4) Ansible task `stream_events_step.yml` reads `lb_events.stream.log` and prints `LB_EVENT` lines to stdout.
23+
5) Controller pipeline parses those `LB_EVENT` lines and converts them to `RunEvent` objects.
24+
6) Dashboard receives `RunEvent` and displays a summary line in Log Stream.
25+
26+
Additional k6 path:
27+
- `DfaasGenerator` calls `K6Runner`.
28+
- `K6Runner` streams k6 log and calls `_emit_k6_log_event`.
29+
- `_emit_k6_log_event` emits `LB_EVENT` log events using `StdoutEmitter`.
30+
31+
## Key Files
32+
- `lb_controller/ansible/roles/workload_runner/tasks/run_single_rep.yml`
33+
- `lb_controller/ansible/roles/workload_runner/tasks/stream_events_step.yml`
34+
- `lb_runner/services/async_localrunner.py`
35+
- `lb_runner/engine/runner.py` (attaches `LBEventLogHandler`)
36+
- `lb_plugins/plugins/dfaas/generator.py` (emits k6 log events)
37+
- `lb_app/services/run_pipeline.py` (ingests LB_EVENT lines)
38+
- `lb_app/services/run_output.py` (formatter; skips raw LB_EVENT output)
39+
- `lb_ui/tui/system/components/dashboard.py` (Log Stream rendering)
40+
41+
## Recent Behavior Changes (Relevant)
42+
- Event logging now defaults to ON. Only `LB_ENABLE_EVENT_LOGGING=0/false/no`
43+
disables it.
44+
- k6 log lines are now emitted as `LB_EVENT` by `DfaasGenerator`.
45+
- Polling loop logs are summarized into a single line in the dashboard.
46+
47+
## Why Events Might Still Be Missing (Hypotheses)
48+
1) **Event ingestion never sees LB_EVENT lines**
49+
- `stream_events_step.yml` reads the wrong file or wrong host.
50+
- The stream file is empty or contains no `LB_EVENT` lines.
51+
- The stream file is written, but the polling task never prints them.
52+
2) **Event parsing is skipped**
53+
- Output line wrapping/escaping prevents `_extract_lb_event_data` from
54+
finding the token.
55+
- `parse_progress_line` rejects the payload (missing required fields).
56+
3) **Events are dropped by dedupe**
57+
- `_EventDedupe` uses `(host, workload, repetition, status, type, message)`.
58+
If the message is repeated exactly, events are dropped.
59+
4) **Dashboard refresh path not hit**
60+
- `make_output_tee` not wired for the dashboard session in the current run
61+
mode, or dashboard refresh throttled.
62+
5) **k6 events emitted but not associated to current repetition**
63+
- Incorrect `repetition` or `total_repetitions` values in emitted events can
64+
cause events to be ignored or not reflected in the journal.
65+
66+
## Diagnostics Checklist
67+
Run in order and capture evidence:
68+
69+
1) Verify the stream file exists and contains `LB_EVENT` lines on the target:
70+
- On target: `tail -n 50 /tmp/lb_events.stream.log`
71+
2) Verify the polling task is printing those lines:
72+
- Add temporary `debug` in `stream_events_step.yml` to print how many
73+
LB_EVENT lines were read in each iteration (or print the last line offset).
74+
3) Verify the controller sees `LB_EVENT` in its raw output log:
75+
- Inspect the controller log file (if enabled) for `LB_EVENT` markers.
76+
4) Verify parsing works with real lines:
77+
- Copy a raw line from the run output and feed it to `_extract_lb_event_data`
78+
and `parse_progress_line`.
79+
5) Verify the dashboard is in the event pipeline path:
80+
- Ensure the run is using the TUI (not headless) and that
81+
`pipeline_output_callback` is used.
82+
83+
## Observed Symptom Patterns to Capture
84+
- `LB_EVENT` lines present in file but not in dashboard.
85+
- `LB_EVENT` lines missing entirely in file.
86+
- `LB_EVENT` lines present but missing required fields or malformed JSON.
87+
88+
## Expected Minimum Signal
89+
During a normal DFaaS run, you should see at least:
90+
- Runner events: "running" and a final "done/failed"
91+
- k6 event lines: `k6[config_id] log stream started`, some stdout lines, and `log stream stopped`
92+
93+
If you do not see these, the issue is likely at steps 1-3 in the event flow.
94+
95+
## Next Suggested Experiments
96+
1) Add a one-time sentinel `LB_EVENT` print in `stream_events_step.yml`
97+
(after reading the file) to verify the pipeline can display events.
98+
2) Force a synthetic event from `async_localrunner` right after startup
99+
to confirm ingestion and dashboard rendering.
100+
3) Log the parsed events in `make_progress_handler` before dedupe.
101+
102+
## ROOT CAUSE IDENTIFIED (2026-01-02)
103+
104+
**Bug Location:** `lb_app/services/run_events.py` - `JsonEventTailer._run()`
105+
106+
**Issue:** Python 3.13+ raises `OSError: telling position disabled by next() call`
107+
when calling `fp.tell()` after using the file iterator (`for line in fp`).
108+
109+
**Original Code:**
110+
```python
111+
for line in fp:
112+
self._pos = fp.tell() # OSError on Python 3.13+
113+
```
114+
115+
**Fixed Code:**
116+
```python
117+
while True:
118+
line = fp.readline()
119+
if not line:
120+
break
121+
self._pos = fp.tell() # Works correctly
122+
```
123+
124+
**Impact:** The `JsonEventTailer` was silently failing to read events from
125+
the callback plugin's JSONL output file (`lb_events.jsonl`), causing all
126+
events to be dropped before reaching the dashboard.
127+
128+
**Fix Applied:** Changed from `for line in fp` iteration to explicit
129+
`fp.readline()` loop to allow `fp.tell()` to work correctly.
130+
131+
## REFINEMENT: Polling Task Suppression (2026-01-03)
132+
133+
**Issue:** After fixing the JsonEventTailer, events appeared in the dashboard
134+
but were drowned out by polling loop task timing lines (Poll LB_EVENT stream,
135+
Delay, Skip polling, etc.).
136+
137+
**Cause:** `AnsibleOutputFormatter._should_suppress_task()` was not suppressing
138+
polling tasks when a dashboard log_sink was active.
139+
140+
**Fix:** Modified `_should_suppress_task()` to always suppress polling loop
141+
tasks regardless of log_sink presence. Added `Initialize polling status` to
142+
the suppress list.
143+
144+
## REFINEMENT: Skip Old Events (2026-01-03)
145+
146+
**Issue:** Events from previous runs appeared at the start of the dashboard
147+
log because `JsonEventTailer` started reading from position 0 (beginning of
148+
the `lb_events.jsonl` file).
149+
150+
**Fix:** Modified `JsonEventTailer.start()` to initialize `_pos` to the
151+
current file size, so only events written after the tailer starts are read.
152+
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# DFaaS Event Streaming Status (v0.62.0)
2+
3+
## Summary
4+
We still do not see runner/k6 `LB_EVENT` entries in the TUI dashboard during live
5+
execution. The polling loop noise has been reduced, but the event stream panel
6+
remains empty or only shows task status lines.
7+
8+
## What Works
9+
- DFaaS runs complete and k6 outputs are written to `k6.log` on the generator.
10+
- The workload runner emits task lines in the dashboard (setup/run phases).
11+
- Polling noise has been collapsed into a single “Polling loop …” line.
12+
13+
## What Does Not Work
14+
- Live `LB_EVENT` lines are not appearing in the dashboard while the run is active.
15+
- This includes:
16+
- LocalRunner progress/status events.
17+
- k6 log stream events emitted by the DFaaS generator.
18+
19+
## Relevant Data Paths
20+
1) LocalRunner emits `LB_EVENT` to stdout.
21+
2) `async_localrunner` tees stdout into `lb_events.stream.log`.
22+
3) `stream_events_step.yml` reads the stream file and prints `LB_EVENT` lines.
23+
4) Controller output pipeline parses `LB_EVENT` and updates the dashboard.
24+
25+
The above chain is still not producing visible events in the TUI.
26+
27+
## Changes Already Applied
28+
- Event logging defaults to ON (only disabled by `LB_ENABLE_EVENT_LOGGING=0`).
29+
- DFaaS k6 log events are emitted as `LB_EVENT` by the generator.
30+
- Polling loop is summarized in-place in the dashboard.
31+
- Polling task now prints `LB_EVENT` lines as text (not raw bytes).
32+
- Event tailer now avoids Python 3.13 `tell()` iteration issues.
33+
34+
## Current Hypothesis
35+
The dashboard is not receiving any `LB_EVENT` lines from the controller output
36+
stream, even though they should be emitted by LocalRunner and k6.
37+
38+
Possible causes:
39+
- The stream log file is empty or contains no `LB_EVENT`.
40+
- The polling task is not printing the `LB_EVENT` lines to stdout.
41+
- Parsing fails in `_extract_lb_event_data` due to formatting/wrapping.
42+
- The event tailer or dedupe drops entries before they reach the dashboard.
43+
44+
## Next Diagnostic Steps
45+
1) Confirm `lb_events.stream.log` on target contains `LB_EVENT` lines.
46+
2) Confirm polling task prints them by capturing raw controller output.
47+
3) Feed a captured line into `_extract_lb_event_data` and `parse_progress_line`.
48+
4) Temporarily inject a synthetic `LB_EVENT` line in `stream_events_step.yml`
49+
and verify it appears in the dashboard.
50+
51+
## Status
52+
As of this version, event streaming to the dashboard is **not resolved**.
53+
This document records the unresolved state so future investigation can resume
54+
from the correct assumptions.

lb_app/services/run_events.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33
from __future__ import annotations
44

55
import json
6+
import os
67
import threading
78
import time
89
from pathlib import Path
910
from typing import Any, Callable
1011

12+
# Debug logging for event tailer diagnostics
13+
_DEBUG = os.getenv("LB_EVENT_DEBUG", "1").lower() in ("1", "true", "yes")
14+
1115

1216
class JsonEventTailer:
1317
"""Tail a JSONL event file and emit parsed events to a callback."""
@@ -27,6 +31,15 @@ def __init__(
2731

2832
def start(self) -> None:
2933
self._stop.clear()
34+
# Start from end of file to skip events from previous runs
35+
try:
36+
self._pos = self.path.stat().st_size
37+
except FileNotFoundError:
38+
self._pos = 0
39+
if _DEBUG:
40+
debug_path = self.path.parent / "lb_events.tailer.debug.log"
41+
with debug_path.open("a") as f:
42+
f.write(f"[{time.time()}] Tailer started, path={self.path}, initial_pos={self._pos}\n")
3043
self._thread = threading.Thread(
3144
target=self._run, name="lb-event-tailer", daemon=True
3245
)
@@ -38,6 +51,7 @@ def stop(self) -> None:
3851
self._thread.join(timeout=2)
3952

4053
def _run(self) -> None:
54+
debug_path = self.path.parent / "lb_events.tailer.debug.log" if _DEBUG else None
4155
while not self._stop.is_set():
4256
try:
4357
size = self.path.stat().st_size
@@ -51,17 +65,31 @@ def _run(self) -> None:
5165
try:
5266
with self.path.open("r", encoding="utf-8") as fp:
5367
fp.seek(self._pos)
54-
for line in fp:
68+
# Use readline() instead of iteration to allow fp.tell()
69+
# Python 3.13+ raises OSError when calling tell() after
70+
# using the file iterator (for line in fp)
71+
while True:
72+
line = fp.readline()
73+
if not line:
74+
break
5575
self._pos = fp.tell()
5676
line = line.strip()
5777
if not line:
5878
continue
5979
try:
6080
data = json.loads(line)
6181
except Exception:
82+
if debug_path:
83+
with debug_path.open("a") as f:
84+
f.write(f"[{time.time()}] JSON parse error: {line[:100]!r}\n")
6285
continue
86+
if debug_path:
87+
with debug_path.open("a") as f:
88+
f.write(f"[{time.time()}] Read event: {data}\n")
6389
self.on_event(data)
64-
except Exception:
65-
pass
90+
except Exception as e:
91+
if debug_path:
92+
with debug_path.open("a") as f:
93+
f.write(f"[{time.time()}] Error reading file: {e}\n")
6694

6795
time.sleep(self.poll_interval)

lb_app/services/run_output.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,13 @@ def __init__(self):
167167
"Streaming indicator",
168168
"Update finished status",
169169
"Delay",
170+
"Initialize polling status",
170171
"workload_runner : Skip polling if already finished",
171172
"workload_runner : Poll LB_EVENT stream",
172173
"workload_runner : Streaming indicator",
173174
"workload_runner : Update finished status",
174175
"workload_runner : Delay",
176+
"workload_runner : Initialize polling status",
175177
}
176178

177179
def set_phase(self, phase: str):
@@ -396,9 +398,10 @@ def _should_suppress_task(
396398
task_name: str,
397399
log_sink: Callable[[str], None] | None,
398400
) -> bool:
399-
if log_sink:
400-
return False
401-
return raw_task in self._suppress_task_names or task_name in self._suppress_task_names
401+
if raw_task in self._suppress_task_names or task_name in self._suppress_task_names:
402+
# Allow dashboard sinks to summarize polling tasks instead of dropping them.
403+
return log_sink is None
404+
return False
402405

403406
def _maybe_flush_task_timing(
404407
self, line: str, log_sink: Callable[[str], None] | None

lb_controller/adapters/ansible_helpers.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,13 @@ def __init__(
9898
local_tmp: Path,
9999
event_log_path: Path,
100100
ansible_root: Path,
101+
event_debug: bool = False,
101102
) -> None:
102103
self._private_data_dir = private_data_dir
103104
self._local_tmp = local_tmp
104105
self._event_log_path = event_log_path
105106
self._ansible_root = ansible_root
107+
self._event_debug = event_debug
106108

107109
def build(self) -> Dict[str, str]:
108110
repo_roles = (self._ansible_root / "roles").resolve()
@@ -116,7 +118,7 @@ def build(self) -> Dict[str, str]:
116118
runner_collections,
117119
dirs_exist_ok=True,
118120
)
119-
return {
121+
env = {
120122
"ANSIBLE_ROLES_PATH": f"{runner_roles}:{repo_roles}",
121123
"ANSIBLE_COLLECTIONS_PATHS": f"{runner_collections}:{repo_collections}",
122124
"ANSIBLE_LOCAL_TEMP": str(self._local_tmp),
@@ -127,6 +129,9 @@ def build(self) -> Dict[str, str]:
127129
"ANSIBLE_CALLBACKS_ENABLED": "lb_events",
128130
"LB_EVENT_LOG_PATH": str(self._event_log_path),
129131
}
132+
if self._event_debug:
133+
env["LB_EVENT_DEBUG"] = "1"
134+
return env
130135

131136
@staticmethod
132137
def merge_env(envvars: Dict[str, str]) -> Dict[str, str]:

lb_controller/ansible/callback_plugins/lb_events.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ def __init__(self) -> None:
8080
self.log_path = Path(log_path).expanduser()
8181
self.log_path.parent.mkdir(parents=True, exist_ok=True)
8282
self._task_start_times: Dict[str, float] = {}
83+
# Debug mode: write diagnostic info to separate file
84+
# Temporarily enabled by default for diagnostics
85+
self._debug = os.getenv("LB_EVENT_DEBUG", "1").lower() in ("1", "true", "yes")
86+
self._debug_path = self.log_path.parent / "lb_events.debug.log" if self._debug else None
87+
if self._debug_path:
88+
self._debug_path.write_text(f"[{time.time()}] Callback plugin initialized, log_path={self.log_path}\n")
8389

8490
def v2_runner_on_ok(self, result, **kwargs): # type: ignore[override]
8591
self._handle_result(result, status_override=None)
@@ -104,11 +110,24 @@ def v2_playbook_on_task_start(self, task, is_conditional=False): # type: ignore
104110

105111
def _handle_result(self, result: Any, status_override: str | None) -> None:
106112
host = getattr(result._host, "get_name", lambda: None)() # type: ignore[attr-defined]
113+
task = getattr(result, "_task", None)
114+
task_name = getattr(task, "get_name", lambda: "")() if task else ""
115+
if self._debug and self._debug_path:
116+
res = getattr(result, "_result", {}) or {}
117+
stdout = res.get("stdout", "")[:200] if res.get("stdout") else ""
118+
msg = res.get("msg", "")[:200] if res.get("msg") else ""
119+
has_lb_event = "LB_EVENT" in str(res)
120+
with self._debug_path.open("a") as f:
121+
f.write(f"[{time.time()}] _handle_result: host={host} task={task_name} "
122+
f"has_lb_event={has_lb_event} stdout_preview={stdout!r} msg_preview={msg!r}\n")
107123
for event in self._events_from_result(result):
108124
event.setdefault("host", host)
109125
if status_override and "status" not in event:
110126
event["status"] = status_override
111127
self._write_event(event)
128+
if self._debug and self._debug_path:
129+
with self._debug_path.open("a") as f:
130+
f.write(f"[{time.time()}] Wrote event: {json.dumps(event)}\n")
112131
self._emit_task_timing(result, host, status_override)
113132

114133
def _events_from_result(self, result: Any) -> Iterator[Dict[str, Any]]:

lb_controller/ansible/roles/workload_runner/tasks/stream_events_step.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,11 @@
102102
break
103103
offset = handle.tell()
104104
if b"LB_EVENT" in line:
105-
sys.stdout.buffer.write(line)
105+
try:
106+
text_line = line.decode("utf-8").rstrip("\n")
107+
except UnicodeDecodeError:
108+
text_line = line.decode("utf-8", errors="ignore").rstrip("\n")
109+
print(text_line, flush=True)
106110
try:
107111
ev = json.loads(line.decode("utf-8").split("LB_EVENT", 1)[1].strip())
108112
if ev.get("workload") == workload and int(ev.get("repetition", 0)) == rep:

0 commit comments

Comments
 (0)