Skip to content

Commit b1c16e6

Browse files
authored
feat: .cat --follow builtin for xs exec (#130)
1 parent 8ad475c commit b1c16e6

File tree

5 files changed

+330
-5
lines changed

5 files changed

+330
-5
lines changed

src/api.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,7 @@ async fn handle_exec(store: &Store, body: hyper::body::Incoming) -> HTTPResult {
607607
// Add context-specific commands
608608
engine
609609
.add_commands(vec![
610-
Box::new(nu::commands::cat_command::CatCommand::new(
610+
Box::new(nu::commands::cat_stream_command::CatStreamCommand::new(
611611
store.clone(),
612612
context_id,
613613
)),
@@ -794,10 +794,12 @@ mod tests {
794794
// Add context-specific commands
795795
engine
796796
.add_commands(vec![
797-
Box::new(crate::nu::commands::cat_command::CatCommand::new(
798-
store.clone(),
799-
context_id,
800-
)),
797+
Box::new(
798+
crate::nu::commands::cat_stream_command::CatStreamCommand::new(
799+
store.clone(),
800+
context_id,
801+
),
802+
),
801803
Box::new(crate::nu::commands::head_command::HeadCommand::new(
802804
store.clone(),
803805
context_id,
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use nu_engine::CallExt;
2+
use nu_protocol::engine::{Call, Command, EngineState, Stack};
3+
use nu_protocol::{
4+
Category, ListStream, PipelineData, ShellError, Signature, SyntaxShape, Type, Value,
5+
};
6+
use std::time::Duration;
7+
8+
use crate::store::{FollowOption, ReadOptions, Store};
9+
10+
#[derive(Clone)]
11+
pub struct CatStreamCommand {
12+
store: Store,
13+
context_id: scru128::Scru128Id,
14+
}
15+
16+
impl CatStreamCommand {
17+
pub fn new(store: Store, context_id: scru128::Scru128Id) -> Self {
18+
Self { store, context_id }
19+
}
20+
}
21+
22+
impl Command for CatStreamCommand {
23+
fn name(&self) -> &str {
24+
".cat"
25+
}
26+
27+
fn signature(&self) -> Signature {
28+
Signature::build(".cat")
29+
.input_output_types(vec![(Type::Nothing, Type::Any)])
30+
.switch("follow", "long poll for new events", Some('f'))
31+
.named(
32+
"pulse",
33+
SyntaxShape::Int,
34+
"interval in ms for synthetic xs.pulse events",
35+
Some('p'),
36+
)
37+
.switch("tail", "start at end of stream", Some('t'))
38+
.switch("detail", "include all frame fields", Some('d'))
39+
.switch("all", "read across all contexts", Some('a'))
40+
.named(
41+
"limit",
42+
SyntaxShape::Int,
43+
"limit the number of frames to retrieve",
44+
None,
45+
)
46+
.named(
47+
"last-id",
48+
SyntaxShape::String,
49+
"start from a specific frame ID",
50+
None,
51+
)
52+
.named("topic", SyntaxShape::String, "filter by topic", Some('T'))
53+
.category(Category::Experimental)
54+
}
55+
56+
fn description(&self) -> &str {
57+
"Reads the event stream and returns frames (streaming version)"
58+
}
59+
60+
fn run(
61+
&self,
62+
engine_state: &EngineState,
63+
stack: &mut Stack,
64+
call: &Call,
65+
_input: PipelineData,
66+
) -> Result<PipelineData, ShellError> {
67+
let follow = call.has_flag(engine_state, stack, "follow")?;
68+
let pulse: Option<i64> = call.get_flag(engine_state, stack, "pulse")?;
69+
let tail = call.has_flag(engine_state, stack, "tail")?;
70+
let detail = call.has_flag(engine_state, stack, "detail")?;
71+
let all = call.has_flag(engine_state, stack, "all")?;
72+
let limit: Option<i64> = call.get_flag(engine_state, stack, "limit")?;
73+
let last_id: Option<String> = call.get_flag(engine_state, stack, "last-id")?;
74+
let topic: Option<String> = call.get_flag(engine_state, stack, "topic")?;
75+
76+
// Parse last_id
77+
let last_id: Option<scru128::Scru128Id> = last_id
78+
.as_deref()
79+
.map(|s| {
80+
s.parse().map_err(|e| ShellError::GenericError {
81+
error: "Invalid last-id".into(),
82+
msg: format!("Failed to parse Scru128Id: {e}"),
83+
span: Some(call.head),
84+
help: None,
85+
inner: vec![],
86+
})
87+
})
88+
.transpose()?;
89+
90+
// For non-follow mode, always use async path for consistency
91+
// The store.read() will handle topic filtering correctly
92+
93+
// Build ReadOptions for async path (follow mode or no topic filter)
94+
let options = ReadOptions::builder()
95+
.follow(if let Some(pulse_ms) = pulse {
96+
FollowOption::WithHeartbeat(Duration::from_millis(pulse_ms as u64))
97+
} else if follow {
98+
FollowOption::On
99+
} else {
100+
FollowOption::Off
101+
})
102+
.tail(tail)
103+
.maybe_last_id(last_id)
104+
.maybe_limit(limit.map(|l| l as usize))
105+
.maybe_context_id(if all { None } else { Some(self.context_id) })
106+
.maybe_topic(topic.clone())
107+
.build();
108+
109+
let store = self.store.clone();
110+
let span = call.head;
111+
let signals = engine_state.signals().clone();
112+
113+
// Create channel for async -> sync bridge
114+
let (tx, rx) = std::sync::mpsc::channel();
115+
116+
// Spawn thread to handle async store.read()
117+
std::thread::spawn(move || {
118+
let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
119+
rt.block_on(async move {
120+
let mut receiver = store.read(options).await;
121+
122+
while let Some(frame) = receiver.recv().await {
123+
// Convert frame to Nu value
124+
let mut value = crate::nu::util::frame_to_value(&frame, span);
125+
126+
// Filter fields if not --detail
127+
if !detail {
128+
value = match value {
129+
Value::Record { val, .. } => {
130+
let mut filtered = val.into_owned();
131+
filtered.remove("context_id");
132+
filtered.remove("ttl");
133+
Value::record(filtered, span)
134+
}
135+
v => v,
136+
};
137+
}
138+
139+
if tx.send(value).is_err() {
140+
break;
141+
}
142+
}
143+
});
144+
});
145+
146+
// Create ListStream from channel
147+
let stream = ListStream::new(std::iter::from_fn(move || rx.recv().ok()), span, signals);
148+
149+
Ok(PipelineData::ListStream(stream, None))
150+
}
151+
}

src/nu/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod append_command;
22
pub mod append_command_buffered;
33
pub mod cas_command;
44
pub mod cat_command;
5+
pub mod cat_stream_command;
56
pub mod get_command;
67
pub mod head_command;
78
pub mod remove_command;

src/nu/util.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ pub fn frame_to_value(frame: &Frame, span: Span) -> Value {
4949
record.push("meta", json_to_value(meta, span));
5050
}
5151

52+
if let Some(ttl) = &frame.ttl {
53+
let ttl_str = match ttl {
54+
crate::store::TTL::Forever => "forever".to_string(),
55+
crate::store::TTL::Ephemeral => "ephemeral".to_string(),
56+
crate::store::TTL::Time(duration) => format!("{}s", duration.as_secs()),
57+
crate::store::TTL::Head(n) => format!("head:{}", n),
58+
};
59+
record.push("ttl", Value::string(ttl_str, span));
60+
}
61+
5262
Value::record(record, span)
5363
}
5464

tests/integration.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,167 @@ async fn test_exec_bytestream_behavior() {
415415
assert!(!output_str.starts_with('{')); // Not a JSON object
416416
}
417417

418+
#[tokio::test]
419+
async fn test_exec_cat_streaming() {
420+
let temp_dir = TempDir::new().expect("Failed to create temp dir");
421+
let store_path = temp_dir.path();
422+
423+
let mut child = spawn_xs_supervisor(store_path).await;
424+
425+
let sock_path = store_path.join("sock");
426+
let start = std::time::Instant::now();
427+
while !sock_path.exists() {
428+
if start.elapsed() > Duration::from_secs(5) {
429+
panic!("Timeout waiting for sock file");
430+
}
431+
tokio::time::sleep(Duration::from_millis(100)).await;
432+
}
433+
tokio::time::sleep(Duration::from_millis(500)).await;
434+
435+
// Append initial test data
436+
cmd!(cargo_bin("xs"), "append", store_path, "stream.test")
437+
.stdin_bytes(b"initial")
438+
.run()
439+
.unwrap();
440+
441+
// Test 1: .cat without --follow (snapshot mode)
442+
let output = cmd!(
443+
cargo_bin("xs"),
444+
"exec",
445+
store_path,
446+
".cat --topic stream.test"
447+
)
448+
.read()
449+
.unwrap();
450+
451+
let frames: Vec<serde_json::Value> = output
452+
.lines()
453+
.map(|l| serde_json::from_str(l).unwrap())
454+
.collect();
455+
assert_eq!(frames.len(), 1);
456+
assert_eq!(frames[0]["topic"], "stream.test");
457+
458+
// Test 2: .cat --follow streams new frames
459+
let mut follow_child = tokio::process::Command::new(cargo_bin("xs"))
460+
.arg("exec")
461+
.arg(store_path)
462+
.arg(".cat --topic stream.test --follow")
463+
.stdout(std::process::Stdio::piped())
464+
.spawn()
465+
.unwrap();
466+
467+
let stdout = follow_child.stdout.take().unwrap();
468+
let mut reader = tokio::io::BufReader::new(stdout);
469+
470+
// Read initial frame (historical)
471+
let mut line = String::new();
472+
let result = tokio::time::timeout(Duration::from_secs(1), reader.read_line(&mut line))
473+
.await
474+
.expect("Timeout reading initial frame")
475+
.expect("Failed to read initial frame");
476+
assert!(result > 0, "Should read initial frame");
477+
let initial_frame: serde_json::Value = serde_json::from_str(&line.trim()).unwrap();
478+
assert_eq!(initial_frame["topic"], "stream.test");
479+
480+
// Read threshold frame (indicates caught up to real-time)
481+
line.clear();
482+
let result = tokio::time::timeout(Duration::from_secs(1), reader.read_line(&mut line))
483+
.await
484+
.expect("Timeout reading threshold")
485+
.expect("Failed to read threshold");
486+
assert!(result > 0, "Should read threshold frame");
487+
let threshold_frame: serde_json::Value = serde_json::from_str(&line.trim()).unwrap();
488+
assert_eq!(
489+
threshold_frame["topic"], "xs.threshold",
490+
"Should receive threshold frame indicating caught up"
491+
);
492+
493+
// Append new frame while following
494+
cmd!(cargo_bin("xs"), "append", store_path, "stream.test")
495+
.stdin_bytes(b"streamed")
496+
.run()
497+
.unwrap();
498+
499+
// Should receive new frame via streaming
500+
line.clear();
501+
let result = tokio::time::timeout(Duration::from_secs(2), reader.read_line(&mut line))
502+
.await
503+
.expect("Timeout reading streamed frame")
504+
.expect("Failed to read streamed frame");
505+
assert!(result > 0, "Should read streamed frame");
506+
let streamed_frame: serde_json::Value = serde_json::from_str(&line.trim()).unwrap();
507+
assert_eq!(streamed_frame["topic"], "stream.test");
508+
509+
// Test 3: .cat --tail starts at end (skip for now - can block)
510+
follow_child.kill().await.unwrap();
511+
512+
// Test 4: .cat --limit respects limit
513+
let output = cmd!(
514+
cargo_bin("xs"),
515+
"exec",
516+
store_path,
517+
".cat --topic stream.test --limit 1"
518+
)
519+
.read()
520+
.unwrap();
521+
522+
eprintln!(
523+
"Output from .cat --topic stream.test --limit 1: '{}'",
524+
output
525+
);
526+
assert!(!output.trim().is_empty(), "Output should not be empty");
527+
528+
let frames: Vec<serde_json::Value> = output
529+
.lines()
530+
.filter(|l| !l.is_empty())
531+
.map(|l| serde_json::from_str(l).expect(&format!("Failed to parse JSON: {}", l)))
532+
.collect();
533+
assert_eq!(frames.len(), 1, "Should respect --limit flag");
534+
535+
// Test 5: .cat --detail includes context_id and ttl
536+
let output = cmd!(
537+
cargo_bin("xs"),
538+
"exec",
539+
store_path,
540+
".cat --topic stream.test --limit 1 --detail"
541+
)
542+
.read()
543+
.unwrap();
544+
545+
let frame: serde_json::Value = serde_json::from_str(output.lines().next().unwrap()).unwrap();
546+
assert!(
547+
frame.get("context_id").is_some(),
548+
"Should include context_id with --detail"
549+
);
550+
assert!(
551+
frame.get("ttl").is_some(),
552+
"Should include ttl with --detail"
553+
);
554+
555+
// Test 6: Without --detail, context_id and ttl are filtered
556+
let output = cmd!(
557+
cargo_bin("xs"),
558+
"exec",
559+
store_path,
560+
".cat --topic stream.test --limit 1"
561+
)
562+
.read()
563+
.unwrap();
564+
565+
let frame: serde_json::Value = serde_json::from_str(output.lines().next().unwrap()).unwrap();
566+
assert!(
567+
frame.get("context_id").is_none(),
568+
"Should not include context_id without --detail"
569+
);
570+
assert!(
571+
frame.get("ttl").is_none(),
572+
"Should not include ttl without --detail"
573+
);
574+
575+
// Clean up
576+
child.kill().await.unwrap();
577+
}
578+
418579
#[tokio::test]
419580
async fn test_iroh_networking() {
420581
let temp_dir = TempDir::new().expect("Failed to create temp dir");

0 commit comments

Comments
 (0)