@@ -6,26 +6,29 @@ use std::time::Duration;
6
6
use anyhow:: Context as _;
7
7
use bottomless:: replicator:: Options ;
8
8
use bytes:: Bytes ;
9
+ use enclose:: enclose;
9
10
use futures:: Stream ;
10
11
use libsql_sys:: wal:: Sqlite3WalManager ;
11
12
use tokio:: io:: AsyncBufReadExt as _;
12
13
use tokio:: sync:: watch;
13
14
use tokio:: task:: JoinSet ;
14
15
use tokio_util:: io:: StreamReader ;
15
- use enclose:: enclose;
16
16
17
17
use crate :: connection:: config:: DatabaseConfig ;
18
18
use crate :: connection:: connection_manager:: InnerWalManager ;
19
19
use crate :: connection:: libsql:: { open_conn, MakeLibSqlConn } ;
20
20
use crate :: connection:: { Connection as _, MakeConnection as _} ;
21
+ use crate :: database:: { PrimaryConnection , PrimaryConnectionMaker } ;
21
22
use crate :: error:: LoadDumpError ;
23
+ use crate :: namespace:: broadcasters:: BroadcasterHandle ;
24
+ use crate :: namespace:: meta_store:: MetaStoreHandle ;
25
+ use crate :: namespace:: replication_wal:: { make_replication_wal_wrapper, ReplicationWalWrapper } ;
26
+ use crate :: namespace:: {
27
+ NamespaceBottomlessDbId , NamespaceBottomlessDbIdInit , NamespaceName , ResolveNamespacePathFn ,
28
+ RestoreOption ,
29
+ } ;
22
30
use crate :: replication:: { FrameNo , ReplicationLogger } ;
23
31
use crate :: stats:: Stats ;
24
- use crate :: namespace:: { NamespaceBottomlessDbId , NamespaceBottomlessDbIdInit , NamespaceName , ResolveNamespacePathFn , RestoreOption } ;
25
- use crate :: namespace:: replication_wal:: { make_replication_wal_wrapper, ReplicationWalWrapper } ;
26
- use crate :: namespace:: meta_store:: MetaStoreHandle ;
27
- use crate :: namespace:: broadcasters:: BroadcasterHandle ;
28
- use crate :: database:: { PrimaryConnection , PrimaryConnectionMaker } ;
29
32
use crate :: { StatsSender , BLOCKING_RT , DB_CREATE_TIMEOUT , DEFAULT_AUTO_CHECKPOINT } ;
30
33
31
34
use super :: { BaseNamespaceConfig , PrimaryExtraConfig } ;
@@ -74,8 +77,7 @@ pub(super) async fn make_primary_connection_maker(
74
77
tracing:: debug!( "Checkpointed before initializing bottomless" ) ;
75
78
let options = make_bottomless_options ( options, bottomless_db_id, name. clone ( ) ) ;
76
79
let ( replicator, did_recover) =
77
- init_bottomless_replicator ( db_path. join ( "data" ) , options, & restore_option)
78
- . await ?;
80
+ init_bottomless_replicator ( db_path. join ( "data" ) , options, & restore_option) . await ?;
79
81
tracing:: debug!( "Completed init of bottomless replicator" ) ;
80
82
is_dirty |= did_recover;
81
83
Some ( replicator)
@@ -93,14 +95,14 @@ pub(super) async fn make_primary_connection_maker(
93
95
} ;
94
96
95
97
let logger = Arc :: new ( ReplicationLogger :: open (
96
- & db_path,
97
- primary_config. max_log_size ,
98
- primary_config. max_log_duration ,
99
- is_dirty,
100
- auto_checkpoint,
101
- primary_config. scripted_backup . clone ( ) ,
102
- name. clone ( ) ,
103
- None ,
98
+ & db_path,
99
+ primary_config. max_log_size ,
100
+ primary_config. max_log_duration ,
101
+ is_dirty,
102
+ auto_checkpoint,
103
+ primary_config. scripted_backup . clone ( ) ,
104
+ name. clone ( ) ,
105
+ None ,
104
106
) ?) ;
105
107
106
108
tracing:: debug!( "sending stats" ) ;
@@ -113,7 +115,7 @@ pub(super) async fn make_primary_connection_maker(
113
115
name. clone ( ) ,
114
116
logger. new_frame_notifier . subscribe ( ) ,
115
117
)
116
- . await ?;
118
+ . await ?;
117
119
118
120
tracing:: debug!( "Making replication wal wrapper" ) ;
119
121
let wal_wrapper = make_replication_wal_wrapper ( bottomless_replicator, logger. clone ( ) ) ;
@@ -136,13 +138,13 @@ pub(super) async fn make_primary_connection_maker(
136
138
resolve_attach_path,
137
139
make_wal_manager. clone ( ) ,
138
140
)
139
- . await ?
140
- . throttled (
141
- base_config. max_concurrent_connections . clone ( ) ,
142
- Some ( DB_CREATE_TIMEOUT ) ,
143
- base_config. max_total_response_size ,
144
- base_config. max_concurrent_requests ,
145
- ) ;
141
+ . await ?
142
+ . throttled (
143
+ base_config. max_concurrent_connections . clone ( ) ,
144
+ Some ( DB_CREATE_TIMEOUT ) ,
145
+ base_config. max_total_response_size ,
146
+ base_config. max_concurrent_requests ,
147
+ ) ;
146
148
147
149
tracing:: debug!( "Completed opening libsql connection" ) ;
148
150
@@ -356,10 +358,7 @@ pub(super) async fn make_stats(
356
358
}
357
359
} ) ;
358
360
359
- join_set. spawn ( run_storage_monitor (
360
- db_path. into ( ) ,
361
- Arc :: downgrade ( & stats) ,
362
- ) ) ;
361
+ join_set. spawn ( run_storage_monitor ( db_path. into ( ) , Arc :: downgrade ( & stats) ) ) ;
363
362
364
363
tracing:: debug!( "done sending stats, and creating bg tasks" ) ;
365
364
@@ -369,10 +368,7 @@ pub(super) async fn make_stats(
369
368
// Periodically check the storage used by the database and save it in the Stats structure.
370
369
// TODO: Once we have a separate fiber that does WAL checkpoints, running this routine
371
370
// right after checkpointing is exactly where it should be done.
372
- async fn run_storage_monitor (
373
- db_path : PathBuf ,
374
- stats : Weak < Stats > ,
375
- ) -> anyhow:: Result < ( ) > {
371
+ async fn run_storage_monitor ( db_path : PathBuf , stats : Weak < Stats > ) -> anyhow:: Result < ( ) > {
376
372
// on initialization, the database file doesn't exist yet, so we wait a bit for it to be
377
373
// created
378
374
tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
0 commit comments