Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.

Commit dd3c23c

Browse files
committed
fixed logging + integration of RPC with jsonrpsee crate
1 parent 30c6877 commit dd3c23c

File tree

9 files changed

+107
-34
lines changed

9 files changed

+107
-34
lines changed

src/app/api.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufStream};
1111
use tokio::net::UnixStream;
1212
use std::sync::atomic::{AtomicU64, Ordering};
1313

14-
use super::rpc::{ZinitRpcApiServer, ZinitServiceApiServer, ZinitSystemApiServer};
14+
use super::rpc::{ZinitLoggingApiServer, ZinitRpcApiServer, ZinitServiceApiServer, ZinitSystemApiServer};
1515

1616
// JSON-RPC 2.0 structures
1717
#[derive(Debug, Deserialize, Serialize)]
@@ -89,8 +89,15 @@ impl Api {
8989
let mut module = ZinitRpcApiServer::into_rpc(self.clone());
9090
module.merge(ZinitSystemApiServer::into_rpc(self.clone()))?;
9191
module.merge(ZinitServiceApiServer::into_rpc(self.clone()))?;
92+
module.merge(ZinitLoggingApiServer::into_rpc(self.clone()))?;
93+
9294

9395
let _handle = server.start(module).await?;
96+
97+
// TODO: ipv van server nen ipcserver, da moet gewoon nen jsonrpsee server buidler (voor http +ws ) en die spawnt
98+
// TODO: niewwe handle, terug alles merges,
99+
// let server_rpc = jsonrpsee::server::ServerBuilder::default().build()
100+
94101

95102
Ok(ApiServer { _handle })
96103
}

src/app/rpc.rs

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use crate::zinit::config;
22
use crate::{app::api::Status, zinit};
33
use async_trait::async_trait;
4-
use jsonrpsee::core::RpcResult;
4+
use jsonrpsee::core::{RpcResult, SubscriptionResult};
55
use jsonrpsee::proc_macros::rpc;
66
use jsonrpsee::types::{ErrorCode, ErrorObject, ErrorObjectOwned};
7+
use jsonrpsee::PendingSubscriptionSink;
78
use serde_json::{Map, Value};
89
use std::collections::HashMap;
910
use std::str::FromStr;
11+
use tokio_stream::StreamExt;
1012

1113
use super::api::Api;
1214

@@ -180,11 +182,10 @@ impl ZinitServiceApiServer for Api {
180182

181183
async fn kill(&self, name: String, signal: String) -> RpcResult<()> {
182184
if let Ok(sig) = nix::sys::signal::Signal::from_str(&signal.to_uppercase()) {
183-
184-
self.zinit
185-
.kill(name, sig)
186-
.await
187-
.map_err(|_e| ErrorObjectOwned::from(ErrorCode::InternalError))
185+
self.zinit
186+
.kill(name, sig)
187+
.await
188+
.map_err(|_e| ErrorObjectOwned::from(ErrorCode::InternalError))
188189
} else {
189190
Err(ErrorObjectOwned::from(ErrorCode::InternalError))
190191
}
@@ -205,7 +206,9 @@ impl ZinitServiceApiServer for Api {
205206

206207
// Check if the service file already exists
207208
if file_path.exists() {
208-
return Err(ErrorObjectOwned::from(ErrorCode::ServerError(SERVICE_ALREADY_EXISTS)));
209+
return Err(ErrorObjectOwned::from(ErrorCode::ServerError(
210+
SERVICE_ALREADY_EXISTS,
211+
)));
209212
}
210213

211214
// Convert the JSON content to YAML
@@ -236,7 +239,9 @@ impl ZinitServiceApiServer for Api {
236239

237240
// Check if the service file exists
238241
if !file_path.exists() {
239-
return Err(ErrorObjectOwned::from(ErrorCode::ServerError(SERVICE_NOT_FOUND)));
242+
return Err(ErrorObjectOwned::from(ErrorCode::ServerError(
243+
SERVICE_NOT_FOUND,
244+
)));
240245
}
241246

242247
// Delete the file
@@ -260,7 +265,9 @@ impl ZinitServiceApiServer for Api {
260265

261266
// Check if the service file exists
262267
if !file_path.exists() {
263-
return Err(ErrorObjectOwned::from(ErrorCode::ServerError(SERVICE_NOT_FOUND)));
268+
return Err(ErrorObjectOwned::from(ErrorCode::ServerError(
269+
SERVICE_NOT_FOUND,
270+
)));
264271
}
265272

266273
// Read the file content
@@ -294,11 +301,17 @@ pub trait ZinitSystemApi {
294301
#[async_trait]
295302
impl ZinitSystemApiServer for Api {
296303
async fn shutdown(&self) -> RpcResult<()> {
297-
self.zinit.shutdown().await.map_err(|_e| ErrorObjectOwned::from(ErrorCode::ServerError(SHUTTING_DOWN)))
304+
self.zinit
305+
.shutdown()
306+
.await
307+
.map_err(|_e| ErrorObjectOwned::from(ErrorCode::ServerError(SHUTTING_DOWN)))
298308
}
299-
309+
300310
async fn reboot(&self) -> RpcResult<()> {
301-
self.zinit.reboot().await.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))
311+
self.zinit
312+
.reboot()
313+
.await
314+
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))
302315
}
303316
}
304317

@@ -310,13 +323,61 @@ pub trait ZinitLoggingApi {
310323
/// Subscribe to log messages generated by zinit and monitored services.
311324
/// An optional filter can be provided to only receive logs containing the filter string.
312325
/// The subscription returns a stream of log lines (String).
313-
#[subscription(name = "subscribeLogs", item = String, unsubscribe = "log_unsubscribe")]
314-
async fn log_subscribe(&self, filter: Option<String>);
326+
#[subscription(name = "subscribeLogs", item = String)]
327+
async fn log_subscribe(&self, filter: Option<String>) -> SubscriptionResult;
315328
}
316329

317330
#[async_trait]
318331
impl ZinitLoggingApiServer for Api {
319332
async fn logs(&self, name: Option<String>) -> RpcResult<Vec<String>> {
320-
self.zinit.logs()
333+
let filter = name.map(|n| format!("{n}:"));
334+
Ok(
335+
tokio_stream::wrappers::ReceiverStream::new(self.zinit.logs(true, false).await)
336+
.filter_map(|l| {
337+
if let Some(ref filter) = filter {
338+
if l[4..].starts_with(filter) {
339+
Some(l.to_string())
340+
} else {
341+
None
342+
}
343+
} else {
344+
Some(l.to_string())
345+
}
346+
})
347+
.collect()
348+
.await,
349+
)
350+
}
351+
352+
async fn log_subscribe(
353+
&self,
354+
sink: PendingSubscriptionSink,
355+
name: Option<String>,
356+
) -> SubscriptionResult {
357+
let sink = sink.accept().await?;
358+
let filter = name.map(|n| format!("{n}:"));
359+
let mut stream =
360+
tokio_stream::wrappers::ReceiverStream::new(self.zinit.logs(false, true).await)
361+
.filter_map(|l| {
362+
if let Some(ref filter) = filter {
363+
if l[4..].starts_with(filter) {
364+
Some(l.to_string())
365+
} else {
366+
None
367+
}
368+
} else {
369+
Some(l.to_string())
370+
}
371+
});
372+
while let Some(log) = stream
373+
.next()
374+
.await
375+
{
376+
if sink.send(log.into()).await.is_err() {
377+
break;
378+
}
379+
}
380+
381+
Ok(())
321382
}
322383
}

src/bin/testapp.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use serde_json::json;
55
use std::env;
66
use tokio::time::{sleep, Duration};
77

8-
use zinit::app::api::Client;
98
use zinit::testapp;
109

1110
#[tokio::main]

src/bin/zinit-http.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ extern crate zinit;
33
use anyhow::Result;
44
use clap::{App, Arg};
55
use git_version::git_version;
6-
use zinit::app::api::Client;
76

87
const GIT_VERSION: &str = git_version!(args = ["--tags", "--always", "--dirty=-modified"]);
98

src/manager/buffer.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,23 @@ impl Ring {
102102
/// then if follow is true the logs stream will remain
103103
/// open and fed each received line forever until the
104104
/// received closed the channel from its end.
105-
pub async fn stream(&self, follow: bool) -> Logs {
105+
pub async fn stream(&self, existing_logs: bool, follow: bool) -> Logs {
106106
let (tx, stream) = mpsc::channel::<Arc<String>>(100);
107107
let mut rx = self.sender.subscribe();
108-
let buffer = self
109-
.buffer
110-
.lock()
111-
.await
112-
.into_iter()
113-
.cloned()
114-
.collect::<Vec<_>>();
108+
109+
let buffer = if existing_logs {
110+
// Get current exisiting logs
111+
self.buffer
112+
.lock()
113+
.await
114+
.into_iter()
115+
.cloned()
116+
.collect::<Vec<_>>()
117+
} else {
118+
// Don't care about existing logs
119+
vec![]
120+
};
121+
115122
tokio::spawn(async move {
116123
for item in buffer {
117124
let _ = tx.send(Arc::clone(&item)).await;

src/manager/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ impl ProcessManager {
138138
});
139139
}
140140

141-
pub async fn stream(&self, follow: bool) -> Logs {
142-
self.ring.stream(follow).await
141+
pub async fn stream(&self, existing_logs: bool, follow: bool) -> Logs {
142+
self.ring.stream(existing_logs, follow).await
143143
}
144144

145145
pub fn signal(&self, pid: Pid, sig: signal::Signal) -> Result<()> {

src/zinit/lifecycle.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ impl LifecycleManager {
7777
}
7878

7979
/// Get logs from the process manager
80-
pub async fn logs(&self, follow: bool) -> Logs {
81-
self.pm.stream(follow).await
80+
pub async fn logs(&self, existing_logs: bool, follow: bool) -> Logs {
81+
self.pm.stream(existing_logs, follow).await
8282
}
8383

8484
/// Monitor a service

src/zinit/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ impl ZInit {
6161
}
6262

6363
/// Get logs from the process manager
64-
pub async fn logs(&self, follow: bool) -> Logs {
65-
self.lifecycle.logs(follow).await
64+
/// `existing_logs` TODO:
65+
pub async fn logs(&self, existing_logs: bool, follow: bool) -> Logs {
66+
self.lifecycle.logs(existing_logs, follow).await
6667
}
6768

6869
/// Monitor a service

zinit-client/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,4 @@ path = "examples/basic_usage.rs"
2121

2222
[[example]]
2323
name = "http_client"
24-
path = "examples/http_client.rs"
25-
log = "0.4"
24+
path = "examples/http_client.rs"

0 commit comments

Comments
 (0)