Skip to content

Commit 1c0da12

Browse files
authored
Merge pull request #1767 from tursodatabase/lucio/retry-replica-proxy-conn
sqld: retry proxy conn on unavailable
2 parents 0916848 + 208c940 commit 1c0da12

File tree

1 file changed

+29
-12
lines changed

1 file changed

+29
-12
lines changed

libsql-server/src/connection/write_proxy.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::sync::Arc;
2+
use std::time::Duration;
23

34
use futures_core::future::BoxFuture;
45
use futures_core::Stream;
@@ -11,7 +12,7 @@ use parking_lot::Mutex as PMutex;
1112
use tokio::sync::{mpsc, watch, Mutex};
1213
use tokio_stream::StreamExt;
1314
use tonic::transport::Channel;
14-
use tonic::{Request, Streaming};
15+
use tonic::{Code, Request, Streaming};
1516

1617
use crate::connection::program::{DescribeCol, DescribeParam};
1718
use crate::error::Error;
@@ -249,19 +250,35 @@ impl RemoteConnection {
249250
ctx: RequestContext,
250251
builder_config: QueryBuilderConfig,
251252
) -> crate::Result<Self> {
252-
let (request_sender, receiver) = mpsc::channel(1);
253+
let mut retries = 0;
253254

254-
let stream = tokio_stream::wrappers::ReceiverStream::new(receiver);
255-
let mut req = Request::new(stream);
256-
ctx.upgrade_grpc_request(&mut req);
257-
let response_stream = client.stream_exec(req).await?.into_inner();
255+
loop {
256+
let (request_sender, receiver) = mpsc::channel(1);
258257

259-
Ok(Self {
260-
response_stream,
261-
request_sender,
262-
current_request_id: 0,
263-
builder_config,
264-
})
258+
let stream = tokio_stream::wrappers::ReceiverStream::new(receiver);
259+
let mut req = Request::new(stream);
260+
ctx.upgrade_grpc_request(&mut req);
261+
let response_stream = match client.stream_exec(req).await {
262+
Ok(i) => i.into_inner(),
263+
Err(e) => {
264+
if e.code() == Code::Unavailable {
265+
tracing::error!("retrying proxy connection: {}", e);
266+
tokio::time::sleep(Duration::from_millis(500) * 2u32.pow(retries)).await;
267+
retries += 1;
268+
continue;
269+
} else {
270+
return Err(e.into());
271+
}
272+
}
273+
};
274+
275+
return Ok(Self {
276+
response_stream,
277+
request_sender,
278+
current_request_id: 0,
279+
builder_config,
280+
});
281+
}
265282
}
266283
}
267284

0 commit comments

Comments
 (0)