Skip to content

Commit d2a772f

Browse files
authored
feat: support blocking subscriptions (#56)
1 parent 4d05390 commit d2a772f

File tree

4 files changed

+98
-10
lines changed

4 files changed

+98
-10
lines changed

relay_client/src/http.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,22 @@ impl Client {
128128
.map(|_| ())
129129
}
130130

131-
/// Subscribes on topic to receive messages.
131+
/// Subscribes on topic to receive messages. The request is resolved
132+
/// optimistically as soon as the relay receives it.
132133
pub async fn subscribe(&self, topic: Topic) -> Response<rpc::Subscribe> {
133-
self.request(rpc::Subscribe { topic }).await
134+
self.request(rpc::Subscribe {
135+
topic,
136+
block: false,
137+
})
138+
.await
139+
}
140+
141+
/// Subscribes on topic to receive messages. The request is resolved only
142+
/// when fully processed by the relay.
143+
/// Note: This function is experimental and will likely be removed in the
144+
/// future.
145+
pub async fn subscribe_blocking(&self, topic: Topic) -> Response<rpc::Subscribe> {
146+
self.request(rpc::Subscribe { topic, block: true }).await
134147
}
135148

136149
/// Unsubscribes from a topic.
@@ -223,13 +236,30 @@ impl Client {
223236
self.request(payload).await
224237
}
225238

226-
/// Subscribes on multiple topics to receive messages.
239+
/// Subscribes on multiple topics to receive messages. The request is
240+
/// resolved optimistically as soon as the relay receives it.
227241
pub async fn batch_subscribe(
228242
&self,
229243
topics: impl Into<Vec<Topic>>,
230244
) -> Response<rpc::BatchSubscribe> {
231245
self.request(rpc::BatchSubscribe {
232246
topics: topics.into(),
247+
block: false,
248+
})
249+
.await
250+
}
251+
252+
/// Subscribes on multiple topics to receive messages. The request is
253+
/// resolved only when fully processed by the relay.
254+
/// Note: This function is experimental and will likely be removed in the
255+
/// future.
256+
pub async fn batch_subscribe_blocking(
257+
&self,
258+
topics: impl Into<Vec<Topic>>,
259+
) -> Response<rpc::BatchSubscribe> {
260+
self.request(rpc::BatchSubscribe {
261+
topics: topics.into(),
262+
block: true,
233263
})
234264
.await
235265
}

relay_client/src/websocket.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,25 @@ impl Client {
165165
EmptyResponseFuture::new(response)
166166
}
167167

168-
/// Subscribes on topic to receive messages.
168+
/// Subscribes on topic to receive messages. The request is resolved
169+
/// optimistically as soon as the relay receives it.
169170
pub fn subscribe(&self, topic: Topic) -> ResponseFuture<Subscribe> {
170-
let (request, response) = create_request(Subscribe { topic });
171+
let (request, response) = create_request(Subscribe {
172+
topic,
173+
block: false,
174+
});
175+
176+
self.request(request);
177+
178+
response
179+
}
180+
181+
/// Subscribes on topic to receive messages. The request is resolved only
182+
/// when fully processed by the relay.
183+
/// Note: This function is experimental and will likely be removed in the
184+
/// future.
185+
pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture<Subscribe> {
186+
let (request, response) = create_request(Subscribe { topic, block: true });
171187

172188
self.request(request);
173189

@@ -204,10 +220,30 @@ impl Client {
204220
FetchMessageStream::new(self.clone(), topics.into())
205221
}
206222

207-
/// Subscribes on multiple topics to receive messages.
223+
/// Subscribes on multiple topics to receive messages. The request is
224+
/// resolved optimistically as soon as the relay receives it.
208225
pub fn batch_subscribe(&self, topics: impl Into<Vec<Topic>>) -> ResponseFuture<BatchSubscribe> {
209226
let (request, response) = create_request(BatchSubscribe {
210227
topics: topics.into(),
228+
block: false,
229+
});
230+
231+
self.request(request);
232+
233+
response
234+
}
235+
236+
/// Subscribes on multiple topics to receive messages. The request is
237+
/// resolved only when fully processed by the relay.
238+
/// Note: This function is experimental and will likely be removed in the
239+
/// future.
240+
pub fn batch_subscribe_blocking(
241+
&self,
242+
topics: impl Into<Vec<Topic>>,
243+
) -> ResponseFuture<BatchSubscribe> {
244+
let (request, response) = create_request(BatchSubscribe {
245+
topics: topics.into(),
246+
block: true,
211247
});
212248

213249
self.request(request);

relay_rpc/src/rpc.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,11 @@ pub struct ErrorData {
306306
pub struct Subscribe {
307307
/// The topic to subscribe to.
308308
pub topic: Topic,
309+
310+
/// Whether to disable optimistic response. By default optimistic response
311+
/// is enabled.
312+
#[serde(default)]
313+
pub block: bool,
309314
}
310315

311316
impl RequestPayload for Subscribe {
@@ -403,6 +408,11 @@ pub struct FetchResponse {
403408
pub struct BatchSubscribe {
404409
/// The topics to subscribe to.
405410
pub topics: Vec<Topic>,
411+
412+
/// Whether to disable optimistic response. By default optimistic response
413+
/// is enabled.
414+
#[serde(default)]
415+
pub block: bool,
406416
}
407417

408418
impl RequestPayload for BatchSubscribe {

relay_rpc/src/rpc/tests.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,15 @@ fn subscribe() {
3131
1659980684711969.into(),
3232
Params::Subscribe(Subscribe {
3333
topic: "c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840".into(),
34+
block: false,
3435
}),
3536
));
3637

3738
let serialized = serde_json::to_string(&payload).unwrap();
3839

3940
assert_eq!(
4041
&serialized,
41-
r#"{"id":1659980684711969,"jsonrpc":"2.0","method":"irn_subscribe","params":{"topic":"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840"}}"#
42+
r#"{"id":1659980684711969,"jsonrpc":"2.0","method":"irn_subscribe","params":{"topic":"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840","block":false}}"#
4243
);
4344

4445
let deserialized: Payload = serde_json::from_str(&serialized).unwrap();
@@ -206,7 +207,8 @@ fn deserialize_batch_methods() {
206207
topics: vec![
207208
Topic::from("c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9840"),
208209
Topic::from("c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c9841")
209-
]
210+
],
211+
block: false
210212
})
211213
})
212214
);
@@ -332,6 +334,7 @@ fn validation() {
332334
jsonrpc: jsonrpc.clone(),
333335
params: Params::Subscribe(Subscribe {
334336
topic: topic.clone(),
337+
block: false,
335338
}),
336339
};
337340
assert_eq!(request.validate(), Ok(()));
@@ -342,6 +345,7 @@ fn validation() {
342345
jsonrpc: jsonrpc.clone(),
343346
params: Params::Subscribe(Subscribe {
344347
topic: Topic::from("invalid"),
348+
block: false,
345349
}),
346350
};
347351
assert_eq!(
@@ -459,6 +463,7 @@ fn validation() {
459463
jsonrpc: jsonrpc.clone(),
460464
params: Params::BatchSubscribe(BatchSubscribe {
461465
topics: vec![topic.clone()],
466+
block: false,
462467
}),
463468
};
464469
assert_eq!(request.validate(), Ok(()));
@@ -467,7 +472,10 @@ fn validation() {
467472
let request = Request {
468473
id,
469474
jsonrpc: jsonrpc.clone(),
470-
params: Params::BatchSubscribe(BatchSubscribe { topics: vec![] }),
475+
params: Params::BatchSubscribe(BatchSubscribe {
476+
topics: vec![],
477+
block: false,
478+
}),
471479
};
472480
assert_eq!(request.validate(), Err(ValidationError::BatchEmpty));
473481

@@ -478,7 +486,10 @@ fn validation() {
478486
let request = Request {
479487
id,
480488
jsonrpc: jsonrpc.clone(),
481-
params: Params::BatchSubscribe(BatchSubscribe { topics }),
489+
params: Params::BatchSubscribe(BatchSubscribe {
490+
topics,
491+
block: false,
492+
}),
482493
};
483494
assert_eq!(
484495
request.validate(),
@@ -496,6 +507,7 @@ fn validation() {
496507
topics: vec![Topic::from(
497508
"c4163cf65859106b3f5435fc296e7765411178ed452d1c30337a6230138c98401",
498509
)],
510+
block: false,
499511
}),
500512
};
501513
assert_eq!(

0 commit comments

Comments
 (0)