Skip to content

Commit 8c5d6d2

Browse files
feat!: add ClientBuilder::connect_lazy
1 parent 39d8a15 commit 8c5d6d2

File tree

3 files changed

+127
-34
lines changed

3 files changed

+127
-34
lines changed

watermelon/src/client/builder.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,17 @@ impl ClientBuilder {
209209
pub async fn connect(self, addr: ServerAddr) -> Result<Client, ConnectHandlerError> {
210210
Client::connect(addr, self).await
211211
}
212+
213+
/// Creates a new [`Client`], connecting to the given address in the background.
214+
///
215+
/// This method is for applications that want to construct a client without
216+
/// waiting for the server connection to be established. This may result in
217+
/// the connection never succeeding, despite the continuous attempts
218+
/// made by the client, and the [`Client`] buffer filling up with requests
219+
/// and blocking all subsequent commands forever.
220+
pub fn connect_lazy(self, addr: ServerAddr) -> Client {
221+
Client::connect_lazy(addr, self)
222+
}
212223
}
213224

214225
impl Default for ClientBuilder {

watermelon/src/client/mod.rs

Lines changed: 108 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -144,34 +144,13 @@ impl Client {
144144
HandlerOutput::ServerError
145145
| HandlerOutput::Disconnected
146146
| HandlerOutput::UnexpectedState => {
147-
let mut recycle = handle.recycle().await;
148-
149-
let mut delay = MIN_RECONNECT_DELAY;
150-
151-
loop {
152-
select! {
153-
biased;
154-
() = recycle.wait_shutdown() => {
155-
return;
156-
},
157-
() = sleep(delay) => {},
158-
}
159-
160-
match Handler::connect(&addr, &builder, recycle).await {
161-
Ok(Some(new_handle)) => {
162-
handle = new_handle;
163-
break;
164-
}
165-
Ok(None) => {
166-
// shutdown
167-
return;
168-
}
169-
Err((_err, prev_recycle)) => {
170-
recycle = prev_recycle;
171-
delay *= 2;
172-
delay = delay.min(MAX_RECONNECT_DELAY);
173-
}
174-
}
147+
let recycle = handle.recycle().await;
148+
if let Some(new_handle) =
149+
connect(&addr, &builder, recycle, MIN_RECONNECT_DELAY).await
150+
{
151+
handle = new_handle;
152+
} else {
153+
break;
175154
}
176155
}
177156
HandlerOutput::Closed => break,
@@ -194,6 +173,63 @@ impl Client {
194173
})
195174
}
196175

176+
pub(crate) fn connect_lazy(addr: ServerAddr, builder: ClientBuilder) -> Self {
177+
let (sender, receiver) = mpsc::channel(CLIENT_OP_CHANNEL_SIZE);
178+
179+
let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);
180+
181+
let quick_info = Arc::new(RawQuickInfo::new());
182+
183+
let recycle = RecycledHandler::new(
184+
receiver,
185+
Arc::clone(&quick_info),
186+
&builder,
187+
shutdown_receiver,
188+
);
189+
let info = Arc::clone(recycle.info());
190+
let multiplexed_subscription_prefix = recycle.multiplexed_subscription_prefix().clone();
191+
let inbox_prefix = builder.inbox_prefix.clone();
192+
let default_response_timeout = builder.default_response_timeout;
193+
194+
let handler = tokio::spawn(async move {
195+
let Some(mut handle) = connect(&addr, &builder, recycle, Duration::ZERO).await else {
196+
return;
197+
};
198+
#[expect(clippy::while_let_loop)]
199+
loop {
200+
match (&mut handle).await {
201+
HandlerOutput::ServerError
202+
| HandlerOutput::Disconnected
203+
| HandlerOutput::UnexpectedState => {
204+
let recycle = handle.recycle().await;
205+
if let Some(new_handle) =
206+
connect(&addr, &builder, recycle, MIN_RECONNECT_DELAY).await
207+
{
208+
handle = new_handle;
209+
} else {
210+
break;
211+
}
212+
}
213+
HandlerOutput::Closed => break,
214+
}
215+
}
216+
});
217+
218+
Self {
219+
inner: Arc::new(ClientInner {
220+
info,
221+
sender,
222+
quick_info,
223+
multiplexed_subscription_prefix,
224+
next_subscription_id: AtomicU64::new(u64::from(MULTIPLEXED_SUBSCRIPTION_ID) + 1),
225+
inbox_prefix,
226+
default_response_timeout,
227+
handler,
228+
shutdown_sender,
229+
}),
230+
}
231+
}
232+
197233
#[cfg(test)]
198234
pub(crate) fn test(client_to_handler_chan_size: usize) -> (Self, TestHandler) {
199235
let builder = Self::builder();
@@ -417,13 +453,13 @@ impl Client {
417453
///
418454
/// Consider calling [`Client::quick_info`] if you only need
419455
/// information about Lame Duck Mode.
456+
///
457+
/// Returns `None` if the client was created using
458+
/// [`ClientBuilder::connect_lazy`] AND the connection
459+
/// has not been successfully established yet.
420460
#[must_use]
421-
#[expect(
422-
clippy::missing_panics_doc,
423-
reason = "we don't expect the panic to ever happen"
424-
)]
425-
pub fn server_info(&self) -> Arc<ServerInfo> {
426-
self.inner.info.load_full().expect("never connected")
461+
pub fn server_info(&self) -> Option<Arc<ServerInfo>> {
462+
self.inner.info.load_full()
427463
}
428464

429465
/// Get information about the client
@@ -535,3 +571,41 @@ pub(crate) fn create_inbox_subject(prefix: &Subject) -> Subject {
535571

536572
Subject::from_dangerous_value(subject.into())
537573
}
574+
575+
async fn connect(
576+
addr: &ServerAddr,
577+
builder: &ClientBuilder,
578+
mut recycle: RecycledHandler,
579+
initial_delay: Duration,
580+
) -> Option<Handler> {
581+
let mut delay = initial_delay;
582+
583+
loop {
584+
select! {
585+
biased;
586+
() = recycle.wait_shutdown() => {
587+
return None;
588+
},
589+
() = sleep(delay) => {},
590+
}
591+
592+
match Handler::connect(addr, builder, recycle).await {
593+
Ok(Some(new_handle)) => {
594+
return Some(new_handle);
595+
}
596+
Ok(None) => {
597+
// shutdown
598+
return None;
599+
}
600+
Err((_err, prev_recycle)) => {
601+
recycle = prev_recycle;
602+
if delay < MIN_RECONNECT_DELAY {
603+
delay = MIN_RECONNECT_DELAY;
604+
} else {
605+
delay *= 2;
606+
delay = delay.min(MAX_RECONNECT_DELAY);
607+
}
608+
}
609+
}
610+
}
611+
}

watermelon/src/handler/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,14 @@ impl RecycledHandler {
691691
}
692692
}
693693

694+
pub(crate) fn info(&self) -> &Arc<ArcSwapOption<ServerInfo>> {
695+
&self.info
696+
}
697+
698+
pub(crate) fn multiplexed_subscription_prefix(&self) -> &Subject {
699+
&self.multiplexed_subscription_prefix
700+
}
701+
694702
pub(crate) async fn wait_shutdown(&mut self) {
695703
let _ = self.shutdown_recv.recv().await;
696704
}

0 commit comments

Comments
 (0)