Skip to content

Commit d40569c

Browse files
committed
Treat logs as remote file backends
Resolves #43.
1 parent 37a2817 commit d40569c

File tree

12 files changed

+405
-43
lines changed

12 files changed

+405
-43
lines changed

src/bin/seamdbd.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use clap::Parser;
2020
use pgwire::tokio::process_socket;
2121
use seamdb::cluster::{ClusterEnv, EtcdClusterMetaDaemon, EtcdNodeRegistry, NodeId};
2222
use seamdb::endpoint::{Endpoint, ServiceUri};
23-
use seamdb::fs::MemoryFileSystemFactory;
2423
use seamdb::log::{KafkaLogFactory, LogManager, MemoryLogFactory};
2524
use seamdb::protos::TableDescriptor;
2625
use seamdb::sql::postgresql::PostgresqlHandlerFactory;
@@ -85,7 +84,7 @@ async fn main() {
8584
let (nodes, lease) =
8685
EtcdNodeRegistry::join(cluster_uri.clone(), node_id.clone(), Some(endpoint.to_owned())).await.unwrap();
8786
let log_manager = new_log_manager(log_uri).await.unwrap();
88-
let cluster_env = ClusterEnv::new(log_manager.into(), MemoryFileSystemFactory.into(), nodes).with_replicas(1);
87+
let cluster_env = ClusterEnv::new(log_manager.into(), nodes).with_replicas(1);
8988
let mut cluster_meta_handle =
9089
EtcdClusterMetaDaemon::start(args.cluster_name, cluster_uri.clone(), cluster_env.clone()).await.unwrap();
9190
let descriptor_watcher = cluster_meta_handle.watch_descriptor(None).await.unwrap();

src/cluster/env.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub struct ClusterEnv {
3535
}
3636

3737
impl ClusterEnv {
38-
pub fn new(log: Arc<LogManager>, fs: Arc<FileSystemManager>, nodes: Arc<dyn NodeRegistry>) -> Self {
38+
pub fn with_filesystem(log: Arc<LogManager>, fs: Arc<FileSystemManager>, nodes: Arc<dyn NodeRegistry>) -> Self {
3939
Self {
4040
log,
4141
fs,
@@ -48,6 +48,11 @@ impl ClusterEnv {
4848
}
4949
}
5050

51+
pub fn new(log: Arc<LogManager>, nodes: Arc<dyn NodeRegistry>) -> Self {
52+
let fs = Arc::new(FileSystemManager::from(log.clone()));
53+
Self::with_filesystem(log, fs, nodes)
54+
}
55+
5156
pub fn with_replicas(self, replicas: usize) -> Self {
5257
Self { replicas: replicas.max(1), ..self }
5358
}

src/cluster/meta.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,6 @@ mod tests {
840840
use crate::cluster::etcd::EtcdHelper;
841841
use crate::cluster::{EtcdNodeRegistry, *};
842842
use crate::endpoint::{Endpoint, ServiceUri};
843-
use crate::fs::MemoryFileSystemFactory;
844843
use crate::log::tests::*;
845844
use crate::log::LogRegistry;
846845
use crate::protos::{
@@ -1029,7 +1028,7 @@ mod tests {
10291028

10301029
let (nodes, _cluster_lease) =
10311030
EtcdNodeRegistry::join(cluster_uri.clone(), NodeId::new_random(), None).await.unwrap();
1032-
let cluster_env = ClusterEnv::new(log_manager.clone(), MemoryFileSystemFactory.into(), nodes).with_replicas(2);
1031+
let cluster_env = ClusterEnv::new(log_manager.clone(), nodes).with_replicas(2);
10331032

10341033
let mut cluster_meta_handle =
10351034
EtcdClusterMetaDaemon::start("seamdb1", cluster_uri.clone(), cluster_env).await.unwrap();
@@ -1095,7 +1094,7 @@ mod tests {
10951094

10961095
let (nodes2, _cluster_lease2) =
10971096
EtcdNodeRegistry::join(cluster_uri.clone(), NodeId::new_random(), None).await.unwrap();
1098-
let cluster_env2 = ClusterEnv::new(log_manager, MemoryFileSystemFactory.into(), nodes2).with_replicas(2);
1097+
let cluster_env2 = ClusterEnv::new(log_manager, nodes2).with_replicas(2);
10991098
let mut cluster_meta_handle2 =
11001099
EtcdClusterMetaDaemon::start("seamdb1", cluster_uri.clone(), cluster_env2).await.unwrap();
11011100

src/fs/mod.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use thiserror::Error;
2929

3030
pub use self::memory::MemoryFileSystemFactory;
3131
use crate::endpoint::{OwnedResourceUri, OwnedServiceUri, ResourceUri};
32+
use crate::log::fs::{LogFileSystem, LogFileSystemFactory};
33+
use crate::log::LogManager;
3234

3335
pub struct FileMeta {
3436
pub last_modified: SystemTime,
@@ -132,6 +134,20 @@ impl From<MemoryFileSystemFactory> for Arc<FileSystemManager> {
132134
}
133135
}
134136

137+
impl From<Arc<LogManager>> for FileSystemManager {
138+
fn from(logs: Arc<LogManager>) -> Self {
139+
let filesystems = logs
140+
.clients()
141+
.cloned()
142+
.map(|client| (client.location().into_owned(), Arc::new(LogFileSystem::new(client)) as Arc<dyn FileSystem>))
143+
.collect();
144+
let default = Arc::new(LogFileSystem::new(logs.default_client().clone()));
145+
let mut factories = FileSystemFactories::default();
146+
factories.register(Arc::new(LogFileSystemFactory::new(logs))).unwrap();
147+
Self { factories, default, filesystems }
148+
}
149+
}
150+
135151
impl FileSystemManager {
136152
pub async fn new(factories: FileSystemFactories, filesystem: OwnedServiceUri) -> Result<Self> {
137153
let default = factories.open(filesystem).await?;
@@ -140,6 +156,15 @@ impl FileSystemManager {
140156
Ok(manager)
141157
}
142158

159+
pub fn add_log_filesystems(&mut self, logs: Arc<LogManager>) -> Result<()> {
160+
self.factories.register(Arc::new(LogFileSystemFactory::new(logs.clone())))?;
161+
for client in logs.clients() {
162+
let fs = Arc::new(LogFileSystem::new(client.clone()));
163+
self.mount_fs(fs);
164+
}
165+
Ok(())
166+
}
167+
143168
pub async fn open(&self, uri: &ResourceUri<'_>) -> Result<Box<dyn FileReader>> {
144169
let (fs, path) = self.locate_file(uri)?;
145170
fs.open(path).await

0 commit comments

Comments
 (0)