Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.

Commit 40d8375

Browse files
authored
Support log snapshot (#52)
The log snapshot is basically printing a copy of the current logs buffer (then terminate) instead of the default `follow` behavior. by providing a `-s` flag to the log command line, the logs will print the current buffer, then exits. running `zinit log` will still automatically follow the logs as the default behavior
1 parent 6edf2c5 commit 40d8375

File tree

7 files changed

+70
-25
lines changed

7 files changed

+70
-25
lines changed

src/app/api.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ impl Api {
6565
};
6666

6767
let response = match Self::process(cmd, &mut stream, zinit).await {
68-
Ok(body) => Response {
68+
// When process returns None means we can terminate without
69+
// writing any result to the socket.
70+
Ok(None) => return,
71+
Ok(Some(body)) => Response {
6972
body,
7073
state: State::Ok,
7174
},
@@ -94,7 +97,7 @@ impl Api {
9497
cmd: String,
9598
stream: &mut BufStream<UnixStream>,
9699
zinit: ZInit,
97-
) -> Result<Value> {
100+
) -> Result<Option<Value>> {
98101
let parts = match shlex::split(&cmd) {
99102
Some(parts) => parts,
100103
None => bail!("invalid command syntax"),
@@ -104,23 +107,34 @@ impl Api {
104107
bail!("unknown command");
105108
}
106109

107-
match parts[0].as_ref() {
110+
if &parts[0] == "log" {
111+
match parts.len() {
112+
1 => Self::log(stream, zinit, true).await,
113+
2 if parts[1] == "snapshot" => Self::log(stream, zinit, false).await,
114+
_ => bail!("invalid log command arguments"),
115+
}?;
116+
117+
return Ok(None);
118+
}
119+
120+
let value = match parts[0].as_ref() {
108121
"list" => Self::list(zinit).await,
109122
"shutdown" => Self::shutdown(zinit).await,
110123
"reboot" => Self::reboot(zinit).await,
111-
"log" => Self::log(stream, zinit).await,
112124
"start" if parts.len() == 2 => Self::start(&parts[1], zinit).await,
113125
"stop" if parts.len() == 2 => Self::stop(&parts[1], zinit).await,
114126
"kill" if parts.len() == 3 => Self::kill(&parts[1], &parts[2], zinit).await,
115127
"status" if parts.len() == 2 => Self::status(&parts[1], zinit).await,
116128
"forget" if parts.len() == 2 => Self::forget(&parts[1], zinit).await,
117129
"monitor" if parts.len() == 2 => Self::monitor(&parts[1], zinit).await,
118130
_ => bail!("unknown command '{}' or wrong arguments count", parts[0]),
119-
}
131+
}?;
132+
133+
Ok(Some(value))
120134
}
121135

122-
async fn log(stream: &mut BufStream<UnixStream>, zinit: ZInit) -> Result<Value> {
123-
let mut logs = zinit.logs().await?;
136+
async fn log(stream: &mut BufStream<UnixStream>, zinit: ZInit, follow: bool) -> Result<Value> {
137+
let mut logs = zinit.logs(follow).await;
124138

125139
while let Some(line) = logs.recv().await {
126140
stream.write_all(line.as_bytes()).await?;
@@ -256,9 +270,18 @@ impl Client {
256270
&self,
257271
mut out: O,
258272
filter: Option<S>,
273+
follow: bool,
259274
) -> Result<()> {
260275
let mut con = self.connect().await?;
261-
con.write_all(b"log\n").await?;
276+
if follow {
277+
// default behavior of log with no extra arguments
278+
// is to stream all logs
279+
con.write_all(b"log\n").await?;
280+
} else {
281+
// adding a snapshot subcmd will make it auto terminate
282+
// immediate after
283+
con.write_all(b"log snapshot\n").await?;
284+
}
262285
con.flush().await?;
263286
match filter {
264287
None => tokio::io::copy(&mut con, &mut out).await?,

src/app/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ pub async fn kill(socket: &str, name: &str, signal: &str) -> Result<()> {
146146
client.kill(name, signal).await?;
147147
Ok(())
148148
}
149-
pub async fn logs(socket: &str, filter: Option<&str>) -> Result<()> {
149+
pub async fn logs(socket: &str, filter: Option<&str>, follow: bool) -> Result<()> {
150150
let client = api::Client::new(socket);
151-
client.logs(tokio::io::stdout(), filter).await
151+
if let Some(filter) = filter {
152+
client.status(filter).await?;
153+
}
154+
client.logs(tokio::io::stdout(), filter, follow).await
152155
}

src/main.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,13 @@ async fn main() -> Result<()> {
107107
)
108108
.subcommand(
109109
SubCommand::with_name("log")
110+
.arg(
111+
Arg::with_name("snapshot")
112+
.short("s")
113+
.long("snapshot")
114+
.required(false)
115+
.help("if set log prints current buffer without following")
116+
)
110117
.arg(
111118
Arg::with_name("filter")
112119
.value_name("FILTER")
@@ -171,14 +178,21 @@ async fn main() -> Result<()> {
171178
)
172179
.await
173180
}
174-
("log", Some(matches)) => app::logs(socket, matches.value_of("filter")).await,
181+
("log", Some(matches)) => {
182+
app::logs(
183+
socket,
184+
matches.value_of("filter"),
185+
!matches.is_present("snapshot"),
186+
)
187+
.await
188+
}
175189
_ => app::list(socket).await, // default command
176190
};
177191

178192
match result {
179193
Ok(_) => Ok(()),
180194
Err(e) => {
181-
eprintln!("{}", e);
195+
eprintln!("{:#}", e);
182196
std::process::exit(1);
183197
}
184198
}

src/manager/buffer.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,12 @@ impl Ring {
9797
Ok(())
9898
}
9999

100-
pub async fn stream(&self) -> Result<Logs> {
100+
/// stream returns a continues stream that first receive
101+
/// a snapshot of the current buffer.
102+
/// then if follow is true the logs stream will remain
103+
/// open and fed each received line forever until the
104+
/// received closed the channel from its end.
105+
pub async fn stream(&self, follow: bool) -> Logs {
101106
let (tx, stream) = mpsc::channel::<Arc<String>>(100);
102107
let mut rx = self.sender.subscribe();
103108
let buffer = self
@@ -112,6 +117,10 @@ impl Ring {
112117
let _ = tx.send(Arc::clone(&item)).await;
113118
}
114119

120+
if !follow {
121+
return;
122+
}
123+
115124
loop {
116125
let line = match rx.recv().await {
117126
Ok(line) => line,
@@ -128,6 +137,6 @@ impl Ring {
128137
}
129138
});
130139

131-
Ok(stream)
140+
stream
132141
}
133142
}

src/manager/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ impl ProcessManager {
141141
});
142142
}
143143

144-
pub async fn stream(&self) -> Result<Logs> {
145-
self.ring.stream().await
144+
pub async fn stream(&self, follow: bool) -> Logs {
145+
self.ring.stream(follow).await
146146
}
147147

148148
pub fn signal(&self, pid: Pid, sig: signal::Signal) -> Result<()> {

src/zinit/config.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,15 @@ impl Default for Signal {
2323
}
2424
}
2525

26-
#[derive(Clone, Debug, Deserialize)]
26+
#[derive(Default, Clone, Debug, Deserialize)]
2727
#[serde(rename_all = "lowercase")]
2828
pub enum Log {
2929
None,
30+
#[default]
3031
Ring,
3132
Stdout,
3233
}
3334

34-
impl Default for Log {
35-
fn default() -> Self {
36-
Log::Ring
37-
}
38-
}
3935
fn default_shutdown_timeout_fn() -> u64 {
4036
DEFAULT_SHUTDOWN_TIMEOUT
4137
}

src/zinit/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ impl ZInit {
194194
let _ = self.shutdown().await;
195195
}
196196

197-
pub async fn logs(&self) -> Result<Logs> {
198-
self.pm.stream().await
197+
pub async fn logs(&self, follow: bool) -> Logs {
198+
self.pm.stream(follow).await
199199
}
200200

201201
pub async fn monitor<S: Into<String>>(&self, name: S, service: config::Service) -> Result<()> {
@@ -668,7 +668,7 @@ impl ZInit {
668668
drop(service);
669669
if config.one_shot {
670670
// we don't need to restart the service anymore
671-
let _ = self.notify.notify_waiters();
671+
self.notify.notify_waiters();
672672
break;
673673
}
674674
// we trying again in 2 seconds

0 commit comments

Comments
 (0)