Skip to content

Commit bdebbc4

Browse files
committed
fix: format source files
Signed-off-by: Magyari Sandor Szilard <[email protected]>
1 parent de54ff7 commit bdebbc4

File tree

9 files changed

+183
-135
lines changed

9 files changed

+183
-135
lines changed

data-plane/core/slimrpc/examples/client.rs

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,8 @@ struct Args {
5858

5959
#[tokio::main]
6060
async fn main() -> Result<()> {
61-
6261
// Initialize tracing
63-
tracing_subscriber::fmt()
64-
.with_max_level(Level::INFO)
65-
.init();
62+
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
6663

6764
let args = Args::parse();
6865

@@ -107,10 +104,24 @@ async fn main() -> Result<()> {
107104
unary_stream_example(&channel, &args.method, &args.message, timeout).await?;
108105
}
109106
"stream-unary" => {
110-
stream_unary_example(&channel, &args.method, &args.message, args.iterations, timeout).await?;
107+
stream_unary_example(
108+
&channel,
109+
&args.method,
110+
&args.message,
111+
args.iterations,
112+
timeout,
113+
)
114+
.await?;
111115
}
112116
"stream-stream" => {
113-
stream_stream_example(&channel, &args.method, &args.message, args.iterations, timeout).await?;
117+
stream_stream_example(
118+
&channel,
119+
&args.method,
120+
&args.message,
121+
args.iterations,
122+
timeout,
123+
)
124+
.await?;
114125
}
115126
_ => {
116127
anyhow::bail!("Unknown RPC type: {}", args.rpc_type);
@@ -122,15 +133,18 @@ async fn main() -> Result<()> {
122133
}
123134

124135
async fn unary_unary_example(
125-
channel: &Channel<slim_auth::shared_secret::SharedSecret, slim_auth::shared_secret::SharedSecret>,
136+
channel: &Channel<
137+
slim_auth::shared_secret::SharedSecret,
138+
slim_auth::shared_secret::SharedSecret,
139+
>,
126140
method: &str,
127141
message: &str,
128142
timeout: Option<Duration>,
129143
) -> Result<()> {
130144
info!("=== Unary-Unary RPC Example ===");
131-
145+
132146
let request = message.as_bytes().to_vec();
133-
147+
134148
info!("Sending request: {}", message);
135149
let response = channel
136150
.unary_unary(method, request, timeout, None)
@@ -144,15 +158,18 @@ async fn unary_unary_example(
144158
}
145159

146160
async fn unary_stream_example(
147-
channel: &Channel<slim_auth::shared_secret::SharedSecret, slim_auth::shared_secret::SharedSecret>,
161+
channel: &Channel<
162+
slim_auth::shared_secret::SharedSecret,
163+
slim_auth::shared_secret::SharedSecret,
164+
>,
148165
method: &str,
149166
message: &str,
150167
timeout: Option<Duration>,
151168
) -> Result<()> {
152169
info!("=== Unary-Stream RPC Example ===");
153-
170+
154171
let request = message.as_bytes().to_vec();
155-
172+
156173
info!("Sending request: {}", message);
157174
let mut response_stream = channel
158175
.unary_stream(method, request, timeout, None)
@@ -173,14 +190,17 @@ async fn unary_stream_example(
173190
}
174191

175192
async fn stream_unary_example(
176-
channel: &Channel<slim_auth::shared_secret::SharedSecret, slim_auth::shared_secret::SharedSecret>,
193+
channel: &Channel<
194+
slim_auth::shared_secret::SharedSecret,
195+
slim_auth::shared_secret::SharedSecret,
196+
>,
177197
method: &str,
178198
message: &str,
179199
iterations: usize,
180200
timeout: Option<Duration>,
181201
) -> Result<()> {
182202
info!("=== Stream-Unary RPC Example ===");
183-
203+
184204
// Create request stream with owned strings
185205
let message_owned = message.to_string();
186206
let requests = (0..iterations).map(move |i| {
@@ -202,14 +222,17 @@ async fn stream_unary_example(
202222
}
203223

204224
async fn stream_stream_example(
205-
channel: &Channel<slim_auth::shared_secret::SharedSecret, slim_auth::shared_secret::SharedSecret>,
225+
channel: &Channel<
226+
slim_auth::shared_secret::SharedSecret,
227+
slim_auth::shared_secret::SharedSecret,
228+
>,
206229
method: &str,
207230
message: &str,
208231
iterations: usize,
209232
timeout: Option<Duration>,
210233
) -> Result<()> {
211234
info!("=== Stream-Stream RPC Example ===");
212-
235+
213236
// Create request stream with owned strings
214237
let message_owned = message.to_string();
215238
let requests = (0..iterations).map(move |i| {
@@ -235,4 +258,4 @@ async fn stream_stream_example(
235258

236259
info!("Received {} responses", count);
237260
Ok(())
238-
}
261+
}

data-plane/core/slimrpc/examples/server.rs

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use agntcy_slimrpc::{
5-
MessageContext, RequestStream, ResponseStream, Server, SessionContext, SLIMAppConfig,
6-
StreamStreamHandler, StreamUnaryHandler, UnaryStreamHandler, UnaryUnaryHandler, RPCHandler,
5+
MessageContext, RPCHandler, RequestStream, ResponseStream, SLIMAppConfig, Server,
6+
SessionContext, StreamStreamHandler, StreamUnaryHandler, UnaryStreamHandler, UnaryUnaryHandler,
77
};
88
use anyhow::{Context, Result};
99
use async_trait::async_trait;
@@ -50,9 +50,7 @@ impl UnaryUnaryHandler<Vec<u8>, Vec<u8>> for EchoUnaryHandler {
5050
let request_str = String::from_utf8_lossy(&request);
5151
info!(
5252
"UnaryUnary: Received '{}' from {} in session {}",
53-
request_str,
54-
msg_ctx.source_name,
55-
session_ctx.session_id
53+
request_str, msg_ctx.source_name, session_ctx.session_id
5654
);
5755

5856
let response = format!("Echo: {}", request_str);
@@ -73,9 +71,7 @@ impl UnaryStreamHandler<Vec<u8>, Vec<u8>> for EchoUnaryStreamHandler {
7371
let request_str = String::from_utf8_lossy(&request).to_string();
7472
info!(
7573
"UnaryStream: Received '{}' from {} in session {}",
76-
request_str,
77-
msg_ctx.source_name,
78-
session_ctx.session_id
74+
request_str, msg_ctx.source_name, session_ctx.session_id
7975
);
8076

8177
// Send 3 responses
@@ -97,22 +93,25 @@ impl StreamUnaryHandler<Vec<u8>, Vec<u8>> for EchoStreamUnaryHandler {
9793
mut request_stream: RequestStream<Vec<u8>>,
9894
session_ctx: SessionContext,
9995
) -> agntcy_slimrpc::error::Result<Vec<u8>> {
100-
info!("StreamUnary: Receiving stream in session {}", session_ctx.session_id);
96+
info!(
97+
"StreamUnary: Receiving stream in session {}",
98+
session_ctx.session_id
99+
);
101100

102101
let mut messages = Vec::new();
103102
while let Some(result) = request_stream.next().await {
104103
let (request, msg_ctx) = result?;
105104
let request_str = String::from_utf8_lossy(&request);
106105
info!(
107106
"StreamUnary: Received '{}' from {}",
108-
request_str,
109-
msg_ctx.source_name
107+
request_str, msg_ctx.source_name
110108
);
111109
messages.push(request_str.to_string());
112110
}
113111

114-
let response = format!("Echo: Received {} messages: [{}]",
115-
messages.len(),
112+
let response = format!(
113+
"Echo: Received {} messages: [{}]",
114+
messages.len(),
116115
messages.join(", ")
117116
);
118117
Ok(response.into_bytes())
@@ -128,7 +127,10 @@ impl StreamStreamHandler<Vec<u8>, Vec<u8>> for EchoStreamStreamHandler {
128127
mut request_stream: RequestStream<Vec<u8>>,
129128
session_ctx: SessionContext,
130129
) -> agntcy_slimrpc::error::Result<ResponseStream<Vec<u8>>> {
131-
info!("StreamStream: Receiving stream in session {}", session_ctx.session_id);
130+
info!(
131+
"StreamStream: Receiving stream in session {}",
132+
session_ctx.session_id
133+
);
132134

133135
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
134136

@@ -139,8 +141,7 @@ impl StreamStreamHandler<Vec<u8>, Vec<u8>> for EchoStreamStreamHandler {
139141
let request_str = String::from_utf8_lossy(&request);
140142
info!(
141143
"StreamStream: Received '{}' from {}",
142-
request_str,
143-
msg_ctx.source_name
144+
request_str, msg_ctx.source_name
144145
);
145146

146147
let response = format!("Echo: {}", request_str);
@@ -156,7 +157,9 @@ impl StreamStreamHandler<Vec<u8>, Vec<u8>> for EchoStreamStreamHandler {
156157
}
157158
});
158159

159-
Ok(Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx)))
160+
Ok(Box::pin(
161+
tokio_stream::wrappers::UnboundedReceiverStream::new(rx),
162+
))
160163
}
161164
}
162165

@@ -174,9 +177,7 @@ impl UnaryUnaryHandler<Vec<u8>, Vec<u8>> for SimpleEchoHandler {
174177
let request_str = String::from_utf8_lossy(&request);
175178
info!(
176179
"SimpleEcho: '{}' from {} in session {}",
177-
request_str,
178-
msg_ctx.source_name,
179-
session_ctx.session_id
180+
request_str, msg_ctx.source_name, session_ctx.session_id
180181
);
181182

182183
// Simple echo back
@@ -186,11 +187,8 @@ impl UnaryUnaryHandler<Vec<u8>, Vec<u8>> for SimpleEchoHandler {
186187

187188
#[tokio::main]
188189
async fn main() -> Result<()> {
189-
190190
// Initialize tracing
191-
tracing_subscriber::fmt()
192-
.with_max_level(Level::INFO)
193-
.init();
191+
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
194192

195193
let args = Args::parse();
196194

@@ -257,4 +255,4 @@ async fn main() -> Result<()> {
257255
server.run().await?;
258256

259257
Ok(())
260-
}
258+
}

data-plane/core/slimrpc/src/channel.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use slim_auth::traits::{TokenProvider, Verifier};
1010
use slim_datapath::api::ProtoSessionType;
1111
use slim_datapath::messages::Name;
1212
use slim_service::app::App;
13-
use slim_session::SessionConfig;
1413
use slim_session::session_controller::SessionController;
14+
use slim_session::SessionConfig;
1515
use std::collections::HashMap;
1616
use std::pin::Pin;
1717
use std::sync::Arc;
@@ -46,15 +46,23 @@ where
4646
&self,
4747
method: &str,
4848
metadata: Option<HashMap<String, String>>,
49-
) -> Result<(Name, Arc<SessionController>, slim_session::context::SessionContext, HashMap<String, String>)> {
49+
) -> Result<(
50+
Name,
51+
Arc<SessionController>,
52+
slim_session::context::SessionContext,
53+
HashMap<String, String>,
54+
)> {
5055
let service_name = service_and_method_to_name(&self.remote, method)
5156
.context("Failed to create service name")?;
5257

53-
info!("Setting route for service {} with conn_id {}", service_name, self.conn_id);
54-
58+
info!(
59+
"Setting route for service {} with conn_id {}",
60+
service_name, self.conn_id
61+
);
62+
5563
// Set route using the stored connection ID
5664
self.app
57-
.set_route(&service_name, self.conn_id)
65+
.set_route(&service_name, self.conn_id)
5866
.await
5967
.context("Failed to set route")?;
6068

@@ -79,11 +87,14 @@ where
7987
.session_arc()
8088
.ok_or_else(|| SRPCError::Session("Failed to get session".to_string()))?;
8189

82-
init_ack
83-
.await
84-
.context("Failed to initialize session")?;
90+
init_ack.await.context("Failed to initialize session")?;
8591

86-
Ok((service_name, session, session_ctx, metadata.unwrap_or_default()))
92+
Ok((
93+
service_name,
94+
session,
95+
session_ctx,
96+
metadata.unwrap_or_default(),
97+
))
8798
}
8899

89100
async fn send_unary(
@@ -125,7 +136,7 @@ where
125136
// Send end of stream
126137
let mut end_metadata = metadata.clone();
127138
end_metadata.insert("code".to_string(), "0".to_string());
128-
139+
129140
session
130141
.publish(service_name, vec![], None, Some(end_metadata))
131142
.await
@@ -166,8 +177,11 @@ where
166177
.get_payload()
167178
.ok_or_else(|| SRPCError::Session("No payload in response".to_string()))?
168179
.as_application_payload()
169-
.map_err(|e| SRPCError::Session(format!("Failed to get application payload: {}", e)))?
170-
.blob.clone();
180+
.map_err(|e| {
181+
SRPCError::Session(format!("Failed to get application payload: {}", e))
182+
})?
183+
.blob
184+
.clone();
171185

172186
Ok((msg_ctx, response))
173187
})
@@ -229,7 +243,8 @@ where
229243
timeout: Option<Duration>,
230244
metadata: Option<HashMap<String, String>>,
231245
) -> Result<Vec<u8>> {
232-
let (service_name, session, mut session_ctx, metadata) = self.common_setup(method, metadata).await?;
246+
let (service_name, session, mut session_ctx, metadata) =
247+
self.common_setup(method, metadata).await?;
233248
let deadline = compute_deadline(timeout);
234249

235250
self.send_unary(request, &session, &service_name, metadata, deadline)
@@ -249,7 +264,8 @@ where
249264
timeout: Option<Duration>,
250265
metadata: Option<HashMap<String, String>>,
251266
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<u8>>> + Send>>> {
252-
let (service_name, session, session_ctx, metadata) = self.common_setup(method, metadata).await?;
267+
let (service_name, session, session_ctx, metadata) =
268+
self.common_setup(method, metadata).await?;
253269
let deadline = compute_deadline(timeout);
254270

255271
self.send_unary(request, &session, &service_name, metadata, deadline)
@@ -267,7 +283,8 @@ where
267283
timeout: Option<Duration>,
268284
metadata: Option<HashMap<String, String>>,
269285
) -> Result<Vec<u8>> {
270-
let (service_name, session, mut session_ctx, metadata) = self.common_setup(method, metadata).await?;
286+
let (service_name, session, mut session_ctx, metadata) =
287+
self.common_setup(method, metadata).await?;
271288
let deadline = compute_deadline(timeout);
272289

273290
self.send_stream(request_stream, &session, &service_name, metadata, deadline)
@@ -287,7 +304,8 @@ where
287304
timeout: Option<Duration>,
288305
metadata: Option<HashMap<String, String>>,
289306
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<u8>>> + Send>>> {
290-
let (service_name, session, session_ctx, metadata) = self.common_setup(method, metadata).await?;
307+
let (service_name, session, session_ctx, metadata) =
308+
self.common_setup(method, metadata).await?;
291309
let deadline = compute_deadline(timeout);
292310

293311
self.send_stream(request_stream, &session, &service_name, metadata, deadline)

0 commit comments

Comments
 (0)