Skip to content

Commit 4f91917

Browse files
committed
type-erase config override
1 parent 94217e2 commit 4f91917

File tree

5 files changed

+67
-44
lines changed

5 files changed

+67
-44
lines changed

libsql-wal/src/storage/async_storage.rs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! `AsyncStorage` is a `Storage` implementation that defer storage to a background thread. The
22
//! durable frame_no is notified asynchronously.
33
4+
use std::any::Any;
45
use std::sync::Arc;
56

67
use chrono::Utc;
@@ -22,9 +23,9 @@ use super::{OnStoreCallback, RestoreOptions, Storage, StoreSegmentRequest};
2223
///
2324
/// On shutdown, attempts to empty the queue, and flush the receiver. When the last handle of the
2425
/// receiver is dropped, and the queue is empty, exit.
25-
pub struct AsyncStorageLoop<B: Backend, IO: Io, S> {
26-
receiver: mpsc::UnboundedReceiver<StorageLoopMessage<B::Config, S>>,
27-
scheduler: Scheduler<B::Config, S>,
26+
pub struct AsyncStorageLoop<B, IO: Io, S> {
27+
receiver: mpsc::UnboundedReceiver<StorageLoopMessage<S>>,
28+
scheduler: Scheduler<S>,
2829
backend: Arc<B>,
2930
io: Arc<IO>,
3031
max_in_flight: usize,
@@ -112,14 +113,28 @@ where
112113
fn fetch_durable_frame_no_async(
113114
&self,
114115
namespace: NamespaceName,
115-
ret: oneshot::Sender<u64>,
116-
config_override: Option<Arc<B::Config>>,
116+
ret: oneshot::Sender<super::Result<u64>>,
117+
config_override: Option<Arc<dyn Any + Send + Sync>>,
117118
) {
118119
let backend = self.backend.clone();
119-
let config = config_override.unwrap_or_else(|| backend.default_config());
120+
let config = match config_override
121+
.map(|c| c.downcast::<B::Config>())
122+
.transpose()
123+
{
124+
Ok(Some(config)) => config,
125+
Ok(None) => backend.default_config(),
126+
Err(_) => {
127+
let _ = ret.send(Err(super::Error::InvalidConfigType));
128+
return;
129+
}
130+
};
131+
120132
tokio::spawn(async move {
121-
let meta = backend.meta(&config, &namespace).await.unwrap();
122-
let _ = ret.send(meta.max_frame_no);
133+
let res = backend
134+
.meta(&config, &namespace)
135+
.await
136+
.map(|meta| meta.max_frame_no);
137+
let _ = ret.send(res);
123138
});
124139
}
125140
}
@@ -132,18 +147,18 @@ pub struct BottomlessConfig<C> {
132147
pub config: C,
133148
}
134149

135-
enum StorageLoopMessage<C, S> {
136-
StoreReq(StoreSegmentRequest<C, S>),
150+
enum StorageLoopMessage<S> {
151+
StoreReq(StoreSegmentRequest<S>),
137152
DurableFrameNoReq {
138153
namespace: NamespaceName,
139-
config_override: Option<Arc<C>>,
140-
ret: oneshot::Sender<u64>,
154+
config_override: Option<Arc<dyn Any + Send + Sync>>,
155+
ret: oneshot::Sender<super::Result<u64>>,
141156
},
142157
}
143158

144-
pub struct AsyncStorage<B: Backend, S> {
159+
pub struct AsyncStorage<B, S> {
145160
/// send request to the main loop
146-
job_sender: mpsc::UnboundedSender<StorageLoopMessage<B::Config, S>>,
161+
job_sender: mpsc::UnboundedSender<StorageLoopMessage<S>>,
147162
force_shutdown: oneshot::Sender<()>,
148163
backend: Arc<B>,
149164
}
@@ -163,11 +178,15 @@ where
163178
config_override: Option<Arc<Self::Config>>,
164179
on_store_callback: OnStoreCallback,
165180
) {
181+
fn into_any<T: Sync + Send + 'static>(t: Arc<T>) -> Arc<dyn Any + Sync + Send> {
182+
t
183+
}
184+
166185
let req = StoreSegmentRequest {
167186
namespace: namespace.clone(),
168187
segment,
169188
created_at: Utc::now(),
170-
storage_config_override: config_override,
189+
storage_config_override: config_override.map(into_any),
171190
on_store_callback,
172191
};
173192

libsql-wal/src/storage/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ pub enum Error {
1919
// We may recover from this error, and rebuild the index from the data file.
2020
#[error("invalid index: {0}")]
2121
InvalidIndex(&'static str),
22+
#[error("Provided config is of an invalid type")]
23+
InvalidConfigType,
2224
}
2325

2426
impl Error {

libsql-wal/src/storage/job.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ use crate::segment::Segment;
99

1010
/// A request, with an id
1111
#[derive(Debug)]
12-
pub(crate) struct IndexedRequest<C, T> {
13-
pub(crate) request: StoreSegmentRequest<C, T>,
12+
pub(crate) struct IndexedRequest<T> {
13+
pub(crate) request: StoreSegmentRequest<T>,
1414
pub(crate) id: u64,
1515
}
1616

17-
impl<C, T> Deref for IndexedRequest<C, T> {
18-
type Target = StoreSegmentRequest<C, T>;
17+
impl<T> Deref for IndexedRequest<T> {
18+
type Target = StoreSegmentRequest<T>;
1919

2020
fn deref(&self) -> &Self::Target {
2121
&self.request
@@ -24,10 +24,10 @@ impl<C, T> Deref for IndexedRequest<C, T> {
2424

2525
/// A storage Job to be performed
2626
#[derive(Debug)]
27-
pub(crate) struct Job<C, T> {
27+
pub(crate) struct Job<T> {
2828
/// Segment to store.
2929
// TODO: implement request batching (merge segment and send).
30-
pub(crate) request: IndexedRequest<C, T>,
30+
pub(crate) request: IndexedRequest<T>,
3131
}
3232

3333
// #[repr(transparent)]
@@ -42,14 +42,14 @@ pub(crate) struct Job<C, T> {
4242
// }
4343
// }
4444
//
45-
impl<C, Seg> Job<C, Seg>
45+
impl<Seg> Job<Seg>
4646
where
4747
Seg: Segment,
4848
{
4949
/// Perform the job and return the JobResult. This is not allowed to panic.
50-
pub(crate) async fn perform<B, IO>(self, backend: B, io: IO) -> JobResult<C, Seg>
50+
pub(crate) async fn perform<B, IO>(self, backend: B, io: IO) -> JobResult<Seg>
5151
where
52-
B: Backend<Config = C>,
52+
B: Backend,
5353
IO: Io,
5454
{
5555
let result = self.try_perform(backend, io).await;
@@ -58,7 +58,7 @@ where
5858

5959
async fn try_perform<B, IO>(&self, backend: B, io: IO) -> Result<u64>
6060
where
61-
B: Backend<Config = C>,
61+
B: Backend,
6262
IO: Io,
6363
{
6464
let segment = &self.request.segment;
@@ -81,6 +81,9 @@ where
8181
.request
8282
.storage_config_override
8383
.clone()
84+
.map(|c| c.downcast::<B::Config>())
85+
.transpose()
86+
.map_err(|_| super::Error::InvalidConfigType)?
8487
.unwrap_or_else(|| backend.default_config());
8588

8689
backend.store(&config, meta, tmp, new_index).await?;
@@ -90,9 +93,9 @@ where
9093
}
9194

9295
#[derive(Debug)]
93-
pub(crate) struct JobResult<C, S> {
96+
pub(crate) struct JobResult<S> {
9497
/// The job that was performed
95-
pub(crate) job: Job<C, S>,
98+
pub(crate) job: Job<S>,
9699
/// The outcome of the job: the new durable index, or an error.
97100
pub(crate) result: Result<u64>,
98101
}

libsql-wal/src/storage/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::any::Any;
12
use std::collections::BTreeMap;
23
use std::fmt;
34
use std::future::Future;
@@ -437,7 +438,7 @@ impl<IO: Io> Storage for TestStorage<IO> {
437438
}
438439
}
439440

440-
pub struct StoreSegmentRequest<C, S> {
441+
pub struct StoreSegmentRequest<S> {
441442
namespace: NamespaceName,
442443
/// Path to the segment. Read-only for bottomless
443444
segment: S,
@@ -446,22 +447,20 @@ pub struct StoreSegmentRequest<C, S> {
446447

447448
/// alternative configuration to use with the storage layer.
448449
/// e.g: S3 overrides
449-
storage_config_override: Option<Arc<C>>,
450+
storage_config_override: Option<Arc<dyn Any + Send + Sync>>,
450451
/// Called after the segment was stored, with the new durable index
451452
on_store_callback: OnStoreCallback,
452453
}
453454

454-
impl<C, S> fmt::Debug for StoreSegmentRequest<C, S>
455+
impl<S> fmt::Debug for StoreSegmentRequest<S>
455456
where
456-
C: fmt::Debug,
457457
S: fmt::Debug,
458458
{
459459
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
460460
f.debug_struct("StoreSegmentRequest")
461461
.field("namespace", &self.namespace)
462462
.field("segment", &self.segment)
463463
.field("created_at", &self.created_at)
464-
.field("storage_config_override", &self.storage_config_override)
465464
.finish()
466465
}
467466
}

libsql-wal/src/storage/scheduler.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ use super::job::{IndexedRequest, Job, JobResult};
55
use super::StoreSegmentRequest;
66
use libsql_sys::name::NamespaceName;
77

8-
struct NamespaceRequests<C, F> {
9-
requests: VecDeque<IndexedRequest<C, F>>,
8+
struct NamespaceRequests<F> {
9+
requests: VecDeque<IndexedRequest<F>>,
1010
/// there's work in flight for this namespace
1111
in_flight: bool,
1212
}
1313

14-
impl<C, F> Default for NamespaceRequests<C, F> {
14+
impl<F> Default for NamespaceRequests<F> {
1515
fn default() -> Self {
1616
Self {
1717
requests: Default::default(),
@@ -28,14 +28,14 @@ impl<C, F> Default for NamespaceRequests<C, F> {
2828
/// processed, because only the most recent segment is checked for durability. This property
2929
/// ensures that all segments are present up to the max durable index.
3030
/// It is generic over C: the storage config type (for config overrides), and T, the segment type
31-
pub(crate) struct Scheduler<C, T> {
31+
pub(crate) struct Scheduler<T> {
3232
/// notify new durability index for namespace
33-
requests: HashMap<NamespaceName, NamespaceRequests<C, T>>,
33+
requests: HashMap<NamespaceName, NamespaceRequests<T>>,
3434
queue: priority_queue::PriorityQueue<NamespaceName, Reverse<u64>>,
3535
next_request_id: u64,
3636
}
3737

38-
impl<C, T> Scheduler<C, T> {
38+
impl<T> Scheduler<T> {
3939
pub fn new() -> Self {
4040
Self {
4141
requests: Default::default(),
@@ -46,7 +46,7 @@ impl<C, T> Scheduler<C, T> {
4646

4747
/// Register a new request with the scheduler
4848
#[tracing::instrument(skip_all)]
49-
pub fn register(&mut self, request: StoreSegmentRequest<C, T>) {
49+
pub fn register(&mut self, request: StoreSegmentRequest<T>) {
5050
// invariant: new segment comes immediately after the latest segment for that namespace. This means:
5151
// - immediately after the last registered segment, if there is any
5252
// - immediately after the last durable index
@@ -71,7 +71,7 @@ impl<C, T> Scheduler<C, T> {
7171
/// be scheduled, and returns description of the job to be performed. No other job for this
7272
/// namespace will be scheduled, until the `JobResult` is reported
7373
#[tracing::instrument(skip_all)]
74-
pub fn schedule(&mut self) -> Option<Job<C, T>> {
74+
pub fn schedule(&mut self) -> Option<Job<T>> {
7575
let (name, _) = self.queue.pop()?;
7676
let requests = self
7777
.requests
@@ -90,7 +90,7 @@ impl<C, T> Scheduler<C, T> {
9090
/// Report the job result to the scheduler. If the job result was a success, the request as
9191
/// removed from the queue, else, the job is rescheduled
9292
#[tracing::instrument(skip_all, fields(req_id = result.job.request.id))]
93-
pub async fn report(&mut self, result: JobResult<C, T>) {
93+
pub async fn report(&mut self, result: JobResult<T>) {
9494
// re-schedule, or report new max durable frame_no for segment
9595
let name = result.job.request.request.namespace.clone();
9696
let requests = self
@@ -151,7 +151,7 @@ mod test {
151151

152152
#[tokio::test]
153153
async fn schedule_simple() {
154-
let mut scheduler = Scheduler::<(), ()>::new();
154+
let mut scheduler = Scheduler::<()>::new();
155155

156156
let ns1 = NamespaceName::from("test1");
157157
let ns2 = NamespaceName::from("test2");
@@ -224,7 +224,7 @@ mod test {
224224

225225
#[tokio::test]
226226
async fn job_error_reschedule() {
227-
let mut scheduler = Scheduler::<(), ()>::new();
227+
let mut scheduler = Scheduler::<()>::new();
228228

229229
let ns1 = NamespaceName::from("test1");
230230
let ns2 = NamespaceName::from("test2");
@@ -264,7 +264,7 @@ mod test {
264264

265265
#[tokio::test]
266266
async fn schedule_while_in_flight() {
267-
let mut scheduler = Scheduler::<(), ()>::new();
267+
let mut scheduler = Scheduler::<()>::new();
268268

269269
let ns1 = NamespaceName::from("test1");
270270

0 commit comments

Comments
 (0)