Skip to content

Commit 150a635

Browse files
ldanilekConvex, Inc.
authored andcommitted
pass modules_storage to module source fetch (#24963)
this PR does the plumbing to make the `modules_storage: Arc<dyn Storage>` available in the functions that fetch module source code. Module source code is (still) fetched only from `_module_versions`, but we eventually want it to be fetched from S3 instead. this PR is piping the storage through the funrun and application structs, and also adding some Debug impls which i found useful while figuring this out. in particular is the `StorageForInstanceImpl` naming and pattern okay? GitOrigin-RevId: 3aaeab09e43c64ee76c3ea22bc2197a610363a93
1 parent bf58529 commit 150a635

File tree

9 files changed

+95
-28
lines changed

9 files changed

+95
-28
lines changed

crates/application/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,9 @@ impl<RT: Runtime> Application<RT> {
464464
snapshot_import_pause_client: PauseClient,
465465
scheduled_jobs_pause_client: PauseClient,
466466
) -> anyhow::Result<Self> {
467-
let module_cache = ModuleCacheWorker::start(runtime.clone(), database.clone()).await;
467+
let module_cache =
468+
ModuleCacheWorker::start(runtime.clone(), database.clone(), modules_storage.clone())
469+
.await;
468470
let module_loader = Arc::new(module_cache.clone());
469471

470472
let system_env_vars = btreemap! {

crates/application/src/module_cache/mod.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use model::modules::{
3636
MODULE_VERSIONS_TABLE,
3737
};
3838
use parking_lot::Mutex;
39+
use storage::Storage;
3940
use value::ResolvedDocumentId;
4041

4142
mod metrics;
@@ -46,11 +47,16 @@ const MAX_BACKOFF: Duration = Duration::from_secs(30);
4647
pub struct ModuleCacheWorker<RT: Runtime> {
4748
rt: RT,
4849
database: Database<RT>,
50+
modules_storage: Arc<dyn Storage>,
4951
cache: AsyncLru<RT, (ResolvedDocumentId, ModuleVersion), ModuleVersionMetadata>,
5052
}
5153

5254
impl<RT: Runtime> ModuleCacheWorker<RT> {
53-
pub async fn start(rt: RT, database: Database<RT>) -> ModuleCache<RT> {
55+
pub async fn start(
56+
rt: RT,
57+
database: Database<RT>,
58+
modules_storage: Arc<dyn Storage>,
59+
) -> ModuleCache<RT> {
5460
let cache = AsyncLru::new(
5561
rt.clone(),
5662
*MODULE_CACHE_MAX_SIZE_BYTES,
@@ -60,12 +66,14 @@ impl<RT: Runtime> ModuleCacheWorker<RT> {
6066
let worker = Self {
6167
rt: rt.clone(),
6268
database: database.clone(),
69+
modules_storage: modules_storage.clone(),
6370
cache: cache.clone(),
6471
};
6572

6673
let worker_handle = rt.spawn("module_cache_worker", worker.go());
6774
ModuleCache {
6875
database,
76+
modules_storage,
6977
cache,
7078
worker: Arc::new(Mutex::new(worker_handle)),
7179
}
@@ -103,6 +111,7 @@ impl<RT: Runtime> ModuleCacheWorker<RT> {
103111
for key in referenced_versions {
104112
let fetcher = ModuleVersionFetcher {
105113
database: self.database.clone(),
114+
modules_storage: self.modules_storage.clone(),
106115
};
107116
self.cache
108117
.get(key, fetcher.generate_value(key).boxed())
@@ -127,6 +136,9 @@ impl<RT: Runtime> ModuleCacheWorker<RT> {
127136
#[derive(Clone)]
128137
pub struct ModuleVersionFetcher<RT: Runtime> {
129138
database: Database<RT>,
139+
// TODO(lee) read module source from storage.
140+
#[allow(unused)]
141+
modules_storage: Arc<dyn Storage>,
130142
}
131143

132144
impl<RT: Runtime> ModuleVersionFetcher<RT> {
@@ -145,6 +157,8 @@ impl<RT: Runtime> ModuleVersionFetcher<RT> {
145157
pub struct ModuleCache<RT: Runtime> {
146158
database: Database<RT>,
147159

160+
modules_storage: Arc<dyn Storage>,
161+
148162
cache: AsyncLru<RT, (ResolvedDocumentId, ModuleVersion), ModuleVersionMetadata>,
149163

150164
worker: Arc<Mutex<RT::Handle>>,
@@ -160,6 +174,7 @@ impl<RT: Runtime> Clone for ModuleCache<RT> {
160174
fn clone(&self) -> Self {
161175
Self {
162176
database: self.database.clone(),
177+
modules_storage: self.modules_storage.clone(),
163178
cache: self.cache.clone(),
164179
worker: self.worker.clone(),
165180
}
@@ -189,6 +204,7 @@ impl<RT: Runtime> ModuleLoader<RT> for ModuleCache<RT> {
189204
let key = (module_metadata.id(), module_metadata.latest_version);
190205
let fetcher = ModuleVersionFetcher {
191206
database: self.database.clone(),
207+
modules_storage: self.modules_storage.clone(),
192208
};
193209
let result = self
194210
.cache

crates/application/src/test_helpers.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ use file_storage::{
3434
FileStorage,
3535
TransactionalFileStorage,
3636
};
37-
use function_runner::server::InProcessFunctionRunner;
37+
use function_runner::server::{
38+
InProcessFunctionRunner,
39+
InstanceStorage,
40+
};
3841
use isolate::test_helpers::{
3942
TEST_SOURCE,
4043
TEST_SOURCE_ISOLATE_ONLY,
@@ -188,7 +191,10 @@ impl<RT: Runtime> ApplicationTestExt<RT> for Application<RT> {
188191
convex_origin.clone(),
189192
rt.clone(),
190193
persistence.reader(),
191-
files_storage.clone(),
194+
InstanceStorage {
195+
files_storage: files_storage.clone(),
196+
modules_storage: modules_storage.clone(),
197+
},
192198
database.clone(),
193199
fetch_client.clone(),
194200
)

crates/function_runner/src/module_cache.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use model::modules::{
2525
ModuleModel,
2626
MODULE_VERSIONS_TABLE,
2727
};
28+
use storage::Storage;
2829
use value::ResolvedDocumentId;
2930

3031
use crate::in_memory_indexes::TransactionIngredients;
@@ -54,6 +55,7 @@ pub(crate) struct FunctionRunnerModuleLoader<RT: Runtime> {
5455
pub cache: ModuleCache<RT>,
5556
pub instance_name: String,
5657
pub transaction_ingredients: TransactionIngredients<RT>,
58+
pub modules_storage: Arc<dyn Storage>,
5759
}
5860

5961
#[async_trait]
@@ -89,7 +91,7 @@ impl<RT: Runtime> ModuleLoader<RT> for FunctionRunnerModuleLoader<RT> {
8991
.0
9092
.get(
9193
key.clone(),
92-
get_module(transaction, module_metadata).boxed(),
94+
get_module(transaction, self.modules_storage.clone(), module_metadata).boxed(),
9395
)
9496
.await?;
9597
// Record read dependency on the module version so the transactions

crates/function_runner/src/server.rs

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{
22
collections::BTreeMap,
3+
fmt::Debug,
34
sync::{
45
Arc,
56
Weak,
@@ -91,7 +92,10 @@ use parking_lot::{
9192
Mutex,
9293
RwLock,
9394
};
94-
use storage::Storage;
95+
use storage::{
96+
Storage,
97+
StorageUseCase,
98+
};
9599
use sync_types::Timestamp;
96100
use usage_tracking::{
97101
FunctionUsageStats,
@@ -118,23 +122,35 @@ const MAX_ISOLATE_WORKERS: usize = 128;
118122
const ACTIVE_CONCURRENCY_PERMITS_LOG_FREQUENCY: Duration = Duration::from_secs(10);
119123

120124
#[async_trait]
121-
pub trait StorageForInstance<RT: Runtime>: Clone + Send + Sync + 'static {
125+
pub trait StorageForInstance<RT: Runtime>: Debug + Clone + Send + Sync + 'static {
122126
/// Gets a storage impl for a instance. Agnostic to what kind of storage -
123127
/// local or s3, or how it was loaded (e.g. passed directly within backend,
124128
/// loaded from a transaction created in Funrun)
125129
async fn storage_for_instance(
126130
&self,
127131
transaction: &mut Transaction<RT>,
132+
use_case: StorageUseCase,
128133
) -> anyhow::Result<Arc<dyn Storage>>;
129134
}
130135

136+
#[derive(Clone, Debug)]
137+
pub struct InstanceStorage {
138+
pub files_storage: Arc<dyn Storage>,
139+
pub modules_storage: Arc<dyn Storage>,
140+
}
141+
131142
#[async_trait]
132-
impl<RT: Runtime> StorageForInstance<RT> for Arc<dyn Storage> {
143+
impl<RT: Runtime> StorageForInstance<RT> for InstanceStorage {
133144
async fn storage_for_instance(
134145
&self,
135146
_transaction: &mut Transaction<RT>,
147+
use_case: StorageUseCase,
136148
) -> anyhow::Result<Arc<dyn Storage>> {
137-
Ok(self.clone())
149+
match use_case {
150+
StorageUseCase::Files => Ok(self.files_storage.clone()),
151+
StorageUseCase::Modules => Ok(self.modules_storage.clone()),
152+
_ => anyhow::bail!("function runner storage does not support {use_case}"),
153+
}
138154
}
139155
}
140156

@@ -357,8 +373,15 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
357373
)
358374
.await?;
359375
let mut transaction = transaction_ingredients.clone().try_into()?;
360-
let storage = self.storage.storage_for_instance(&mut transaction).await?;
376+
let storage = self
377+
.storage
378+
.storage_for_instance(&mut transaction, StorageUseCase::Files)
379+
.await?;
361380
let file_storage = TransactionalFileStorage::new(self.rt.clone(), storage, convex_origin);
381+
let modules_storage = self
382+
.storage
383+
.storage_for_instance(&mut transaction, StorageUseCase::Modules)
384+
.await?;
362385

363386
let key_broker = KeyBroker::new(&instance_name, instance_secret)?;
364387
let environment_data = EnvironmentData {
@@ -369,6 +392,7 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
369392
instance_name: instance_name.clone(),
370393
cache: self.module_cache.clone(),
371394
transaction_ingredients,
395+
modules_storage,
372396
}),
373397
};
374398

@@ -434,7 +458,7 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
434458
}
435459

436460
pub struct InProcessFunctionRunner<RT: Runtime> {
437-
server: FunctionRunnerCore<RT, Arc<dyn Storage>>,
461+
server: FunctionRunnerCore<RT, InstanceStorage>,
438462
persistence_reader: Arc<dyn PersistenceReader>,
439463

440464
// Static information about the backend.
@@ -455,7 +479,7 @@ impl<RT: Runtime> InProcessFunctionRunner<RT> {
455479
convex_origin: ConvexOrigin,
456480
rt: RT,
457481
persistence_reader: Arc<dyn PersistenceReader>,
458-
storage: Arc<dyn Storage>,
482+
storage: InstanceStorage,
459483
database: Database<RT>,
460484
fetch_client: Arc<dyn FetchClient>,
461485
) -> anyhow::Result<Self> {
@@ -567,16 +591,19 @@ mod tests {
567591
};
568592
use model::initialize_application_system_tables;
569593
use runtime::testing::TestRuntime;
570-
use storage::{
571-
LocalDirStorage,
572-
Storage,
573-
};
594+
use storage::LocalDirStorage;
574595

575-
use crate::server::FunctionRunnerCore;
596+
use crate::server::{
597+
FunctionRunnerCore,
598+
InstanceStorage,
599+
};
576600
#[convex_macro::test_runtime]
577601
async fn test_scheduler_workers_limit_requests(rt: TestRuntime) -> anyhow::Result<()> {
578602
initialize_v8();
579-
let storage = Arc::new(LocalDirStorage::new(rt.clone())?) as Arc<dyn Storage>;
603+
let storage = InstanceStorage {
604+
files_storage: Arc::new(LocalDirStorage::new(rt.clone())?),
605+
modules_storage: Arc::new(LocalDirStorage::new(rt.clone())?),
606+
};
580607
let function_runner_core = FunctionRunnerCore::_new(rt.clone(), storage, 100, 1).await?;
581608
let (mut pause1, pause_client1) = PauseController::new([PAUSE_REQUEST]);
582609
let DbFixtures { db, .. } = DbFixtures::new(&rt).await?;
@@ -591,7 +618,7 @@ mod tests {
591618
let request2 = bogus_udf_request(&db, client1, None, sender).await?;
592619
function_runner_core.send_request(request2)?;
593620
let response =
594-
FunctionRunnerCore::<TestRuntime, Arc<dyn Storage>>::receive_response(rx2).await?;
621+
FunctionRunnerCore::<TestRuntime, InstanceStorage>::receive_response(rx2).await?;
595622
let err = response.unwrap_err();
596623
assert!(err.is_rejected_before_execution());
597624
assert!(err.to_string().contains(NO_AVAILABLE_WORKERS));
@@ -603,7 +630,10 @@ mod tests {
603630
rt: TestRuntime,
604631
) -> anyhow::Result<()> {
605632
initialize_v8();
606-
let storage = Arc::new(LocalDirStorage::new(rt.clone())?) as Arc<dyn Storage>;
633+
let storage = InstanceStorage {
634+
files_storage: Arc::new(LocalDirStorage::new(rt.clone())?),
635+
modules_storage: Arc::new(LocalDirStorage::new(rt.clone())?),
636+
};
607637
let function_runner_core = FunctionRunnerCore::_new(rt.clone(), storage, 50, 2).await?;
608638
let (mut pause1, pause_client1) = PauseController::new([PAUSE_REQUEST]);
609639
let DbFixtures { db, .. } = DbFixtures::new(&rt).await?;
@@ -619,14 +649,17 @@ mod tests {
619649
let client2 = "client2";
620650
let request2 = bogus_udf_request(&db, client2, None, sender).await?;
621651
function_runner_core.send_request(request2)?;
622-
FunctionRunnerCore::<TestRuntime, Arc<dyn Storage>>::receive_response(rx2).await??;
652+
FunctionRunnerCore::<TestRuntime, InstanceStorage>::receive_response(rx2).await??;
623653
Ok(())
624654
}
625655

626656
#[convex_macro::test_runtime]
627657
async fn test_scheduler_throttles_same_client(rt: TestRuntime) -> anyhow::Result<()> {
628658
initialize_v8();
629-
let storage = Arc::new(LocalDirStorage::new(rt.clone())?) as Arc<dyn Storage>;
659+
let storage = InstanceStorage {
660+
files_storage: Arc::new(LocalDirStorage::new(rt.clone())?),
661+
modules_storage: Arc::new(LocalDirStorage::new(rt.clone())?),
662+
};
630663
let function_runner_core = FunctionRunnerCore::_new(rt.clone(), storage, 50, 2).await?;
631664
let (mut pause1, pause_client1) = PauseController::new([PAUSE_REQUEST]);
632665
let DbFixtures { db, .. } = DbFixtures::new(&rt).await?;
@@ -642,7 +675,7 @@ mod tests {
642675
let request2 = bogus_udf_request(&db, client, None, sender).await?;
643676
function_runner_core.send_request(request2)?;
644677
let response =
645-
FunctionRunnerCore::<TestRuntime, Arc<dyn Storage>>::receive_response(rx2).await?;
678+
FunctionRunnerCore::<TestRuntime, InstanceStorage>::receive_response(rx2).await?;
646679
let err = response.unwrap_err();
647680
assert!(err.is_rejected_before_execution());
648681
assert!(err.to_string().contains(NO_AVAILABLE_WORKERS));

crates/isolate/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ sha1 = { workspace = true }
7272
sha2 = { workspace = true }
7373
sourcemap = { workspace = true }
7474
spki = { workspace = true }
75-
storage = { path = "../storage", optional = true }
75+
storage = { path = "../storage" }
7676
sync_types = { package = "convex_sync_types", path = "../convex/sync_types" }
7777
thiserror = { workspace = true }
7878
tokio = { workspace = true }
@@ -128,7 +128,6 @@ testing = [
128128
"proptest-http",
129129
"runtime/testing",
130130
"search/testing",
131-
"storage",
132131
"storage/testing",
133132
"usage_tracking/testing",
134133
"value/testing",

crates/isolate/src/environment/helpers/module_loader.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use model::modules::{
1717
types::ModuleMetadata,
1818
ModuleModel,
1919
};
20+
use storage::Storage;
2021
use sync_types::{
2122
CanonicalizedModulePath,
2223
CanonicalizedUdfPath,
@@ -114,6 +115,8 @@ impl<RT: Runtime> ModuleLoader<RT> for TransactionModuleLoader {
114115

115116
pub async fn get_module<RT: Runtime>(
116117
mut tx: Transaction<RT>,
118+
// TODO(lee) fetch from module storage
119+
_modules_storage: Arc<dyn Storage>,
117120
module_metadata: ParsedDocument<ModuleMetadata>,
118121
) -> anyhow::Result<ModuleVersionMetadata> {
119122
let _timer = module_load_timer();

crates/local_backend/src/lib.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ use file_storage::{
4545
TransactionalFileStorage,
4646
};
4747
use function_runner::{
48-
server::InProcessFunctionRunner,
48+
server::{
49+
InProcessFunctionRunner,
50+
InstanceStorage,
51+
},
4952
FunctionRunner,
5053
};
5154
use model::{
@@ -209,7 +212,10 @@ pub async fn make_app(
209212
config.convex_origin_url(),
210213
runtime.clone(),
211214
persistence.reader(),
212-
files_storage.clone(),
215+
InstanceStorage {
216+
files_storage: files_storage.clone(),
217+
modules_storage: modules_storage.clone(),
218+
},
213219
database.clone(),
214220
fetch_client.clone(),
215221
)

crates/storage/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ impl StorageExt for Arc<dyn Storage> {
521521
let full_size = self
522522
.get_object_attributes(object_key)
523523
.await?
524-
.context("object does not exist")?
524+
.with_context(|| format!("object {object_key:?} does not exist in {self:?}"))?
525525
.size;
526526
let inner =
527527
StorageObjectReader::new_inner_reader_starting_at(self.clone(), object_key.clone(), 0);

0 commit comments

Comments
 (0)