Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.

Commit 5df62de

Browse files
committed
feat: Implement func run and func run log layerdb fallbacks on the new client
1 parent d148711 commit 5df62de

File tree

8 files changed

+146
-23
lines changed

8 files changed

+146
-23
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/dal/src/context.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,15 @@ impl SiDbContext for DalContext {
506506
fn change_set_id(&self) -> ChangeSetId {
507507
self.change_set_id()
508508
}
509+
510+
// TODO get rid of these after we don't need layer db fallbacks
511+
fn func_run_layer_db(&self) -> &si_layer_cache::db::func_run::FuncRunLayerDb {
512+
(&self.services_context.layer_db).func_run()
513+
}
514+
515+
fn func_run_log_layer_db(&self) -> &si_layer_cache::db::func_run_log::FuncRunLogLayerDb {
516+
(&self.services_context.layer_db).func_run_log()
517+
}
509518
}
510519

511520
impl SiDbTransactions for Transactions {

lib/si-db/BUCK

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ rust_library(
1111
"//lib/si-data-pg:si-data-pg",
1212
"//lib/si-events-rs:si-events",
1313
"//lib/si-id:si-id",
14+
"//lib/si-layer-cache:si-layer-cache",
1415
"//lib/telemetry-rs:telemetry",
1516
"//third-party/rust:async-trait",
1617
"//third-party/rust:chrono",

lib/si-db/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ si-data-nats = { path = "../../lib/si-data-nats" }
2323
si-data-pg = { path = "../../lib/si-data-pg" }
2424
si-events = { path = "../../lib/si-events-rs" }
2525
si-id = { path = "../../lib/si-id" }
26+
si-layer-cache = { path = "../../lib/si-layer-cache" }
2627
strum = { workspace = true }
2728
telemetry = { path = "../../lib/telemetry-rs" }
2829
thiserror = { workspace = true }

lib/si-db/src/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
use async_trait::async_trait;
22
use si_id::ChangeSetId;
3+
use si_layer_cache::db::{
4+
func_run::FuncRunLayerDb,
5+
func_run_log::FuncRunLogLayerDb,
6+
};
37
use tokio::sync::MappedMutexGuard;
48

59
use crate::{
@@ -21,4 +25,7 @@ pub trait SiDbContext {
2125
fn tenancy(&self) -> &Tenancy;
2226
fn visibility(&self) -> &Visibility;
2327
fn change_set_id(&self) -> ChangeSetId;
28+
// TODO get rid of these after we don't need layer db fallbacks
29+
fn func_run_layer_db(&self) -> &FuncRunLayerDb;
30+
fn func_run_log_layer_db(&self) -> &FuncRunLogLayerDb;
2431
}

lib/si-db/src/func_run.rs

Lines changed: 98 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use si_events::{
24
ActionId,
35
AttributeValueId,
@@ -84,6 +86,7 @@ impl FuncRunDb {
8486
let postcard_bytes =
8587
postcard::to_stdvec(&func_run).map_err(|e| SiDbError::Postcard(e.to_string()))?;
8688

89+
// Write to si-db
8790
ctx.txns()
8891
.await?
8992
.pg()
@@ -147,9 +150,21 @@ impl FuncRunDb {
147150
)
148151
.await?;
149152

153+
// Also write to layer-db for backward compatibility during migration
154+
ctx.func_run_layer_db()
155+
.write(
156+
Arc::new(func_run.clone()),
157+
None, // web_events
158+
*func_run.tenancy(),
159+
*func_run.actor(),
160+
)
161+
.await
162+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?;
163+
150164
Ok(())
151165
}
152166

167+
// NOTE(victor): This is only used by migration so it does not write to layer-db
153168
/// Write multiple func runs to the database in a single INSERT query.
154169
/// This is more efficient than calling upsert multiple times.
155170
pub async fn upsert_batch(ctx: &impl SiDbContext, func_runs: Vec<FuncRun>) -> SiDbResult<()> {
@@ -270,6 +285,7 @@ impl FuncRunDb {
270285
Ok(())
271286
}
272287

288+
// NOTE(victor): This is only used by migration so it does not fallback to layer-db
273289
/// Returns the IDs from the input batch that do NOT exist in the database.
274290
/// This is useful for determining which func runs need to be migrated.
275291
pub async fn find_missing_ids(
@@ -323,7 +339,11 @@ impl FuncRunDb {
323339
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
324340
Ok(Some(func_run))
325341
} else {
326-
Ok(None)
342+
// Fall back to layer-db if not found in si-db
343+
ctx.func_run_layer_db()
344+
.get_last_run_for_action_id_opt(workspace_pk, action_id)
345+
.await
346+
.map_err(|e| SiDbError::LayerDb(e.to_string()))
327347
}
328348
}
329349

@@ -360,6 +380,18 @@ impl FuncRunDb {
360380
func_runs.push(func_run);
361381
}
362382

383+
// If si-db returned empty, fall back to layer-db
384+
if func_runs.is_empty() {
385+
if let Some(layer_func_runs) = ctx
386+
.func_run_layer_db()
387+
.list_management_history(workspace_pk, change_set_id)
388+
.await
389+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?
390+
{
391+
return Ok(layer_func_runs);
392+
}
393+
}
394+
363395
Ok(func_runs)
364396
}
365397

@@ -386,7 +418,16 @@ impl FuncRunDb {
386418
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
387419
Ok(Some(func_run))
388420
} else {
389-
Ok(None)
421+
// Fall back to layer-db if not found in si-db
422+
ctx.func_run_layer_db()
423+
.get_last_management_run_for_func_and_component_id(
424+
workspace_pk,
425+
change_set_id,
426+
component_id,
427+
func_id,
428+
)
429+
.await
430+
.map_err(|e| SiDbError::LayerDb(e.to_string()))
390431
}
391432
}
392433

@@ -411,7 +452,11 @@ impl FuncRunDb {
411452
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
412453
Ok(Some(func_run))
413454
} else {
414-
Ok(None)
455+
// Fall back to layer-db if not found in si-db
456+
ctx.func_run_layer_db()
457+
.get_last_qualification_for_attribute_value_id(workspace_pk, attribute_value_id)
458+
.await
459+
.map_err(|e| SiDbError::LayerDb(e.to_string()))
415460
}
416461
}
417462

@@ -432,29 +477,19 @@ impl FuncRunDb {
432477
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
433478
Ok(Some(func_run))
434479
} else {
435-
Ok(None)
480+
// Fall back to layer-db if not found in si-db
481+
ctx.func_run_layer_db()
482+
.try_read(key)
483+
.await
484+
.map(|arc_func_run| Some((*arc_func_run).clone()))
485+
.map_err(|e| SiDbError::LayerDb(e.to_string()))
436486
}
437487
}
438488

439489
pub async fn try_read(ctx: &impl SiDbContext, key: FuncRunId) -> SiDbResult<FuncRun> {
440-
let maybe_row = ctx
441-
.txns()
490+
Self::read(ctx, key)
442491
.await?
443-
.pg()
444-
.query_opt(
445-
&format!("SELECT value FROM {DBNAME} WHERE key = $1"),
446-
&[&key.to_string()],
447-
)
448-
.await?;
449-
450-
if let Some(row) = maybe_row {
451-
let value_bytes: Vec<u8> = row.try_get("value")?;
452-
let func_run: FuncRun = postcard::from_bytes(&value_bytes)
453-
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
454-
Ok(func_run)
455-
} else {
456-
Err(SiDbError::MissingFuncRun(key))
457-
}
492+
.ok_or(SiDbError::MissingFuncRun(key))
458493
}
459494

460495
pub async fn read_many_for_workspace(
@@ -476,6 +511,18 @@ impl FuncRunDb {
476511
func_runs.push(func_run);
477512
}
478513

514+
// Fall back to layer-db if si-db returned no results
515+
if func_runs.is_empty() {
516+
if let Some(layer_func_runs) = ctx
517+
.func_run_layer_db()
518+
.read_many_for_workspace(workspace_pk)
519+
.await
520+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?
521+
{
522+
return Ok(layer_func_runs.into_iter().map(|arc| (*arc).clone()).collect());
523+
}
524+
}
525+
479526
Ok(func_runs)
480527
}
481528

@@ -527,6 +574,18 @@ impl FuncRunDb {
527574
func_runs.push(func_run);
528575
}
529576

577+
// Fall back to layer-db if si-db returned no results
578+
if func_runs.is_empty() {
579+
if let Some(layer_func_runs) = ctx
580+
.func_run_layer_db()
581+
.read_many_for_workspace_paginated(workspace_pk, change_set_id, limit, cursor)
582+
.await
583+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?
584+
{
585+
return Ok(layer_func_runs.into_iter().map(|arc| (*arc).clone()).collect());
586+
}
587+
}
588+
530589
Ok(func_runs)
531590
}
532591

@@ -585,6 +644,24 @@ impl FuncRunDb {
585644
func_runs.push(func_run);
586645
}
587646

647+
// Fall back to layer-db if si-db returned no results
648+
if func_runs.is_empty() {
649+
if let Some(layer_func_runs) = ctx
650+
.func_run_layer_db()
651+
.read_many_for_component_paginated(
652+
workspace_pk,
653+
change_set_id,
654+
component_id,
655+
limit,
656+
cursor,
657+
)
658+
.await
659+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?
660+
{
661+
return Ok(layer_func_runs.into_iter().map(|arc| (*arc).clone()).collect());
662+
}
663+
}
664+
588665
Ok(func_runs)
589666
}
590667
}

lib/si-db/src/func_run_log.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use si_events::{
24
FuncRunId,
35
FuncRunLog,
@@ -18,11 +20,12 @@ pub struct FuncRunLogDb {}
1820

1921
impl FuncRunLogDb {
2022
/// Write a new func run log to the database.
21-
/// This function can be used to replace the layer-cache write() function.
23+
/// This function writes to both si-db and layer-db for backward compatibility during migration.
2224
pub async fn upsert(ctx: &impl SiDbContext, func_run_log: FuncRunLog) -> SiDbResult<()> {
2325
let postcard_bytes =
2426
postcard::to_stdvec(&func_run_log).map_err(|e| SiDbError::Postcard(e.to_string()))?;
2527

28+
// Write to si-db
2629
ctx.txns()
2730
.await?
2831
.pg()
@@ -63,6 +66,23 @@ impl FuncRunLogDb {
6366
)
6467
.await?;
6568

69+
// Also write to layer-db for backward compatibility during migration
70+
// Convert HistoryActor to Actor
71+
let actor = match ctx.history_actor() {
72+
crate::history_event::HistoryActor::SystemInit => si_events::Actor::System,
73+
crate::history_event::HistoryActor::User(pk) => si_events::Actor::User(*pk),
74+
};
75+
76+
ctx.func_run_log_layer_db()
77+
.write(
78+
Arc::new(func_run_log.clone()),
79+
None, // web_events
80+
func_run_log.tenancy(),
81+
actor,
82+
)
83+
.await
84+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?;
85+
6686
Ok(())
6787
}
6888

@@ -86,7 +106,12 @@ impl FuncRunLogDb {
86106
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
87107
Ok(Some(func_run_log))
88108
} else {
89-
Ok(None)
109+
// Fall back to layer-db if not found in si-db
110+
ctx.func_run_log_layer_db()
111+
.get_for_func_run_id(func_run_id)
112+
.await
113+
.map(|opt_arc| opt_arc.map(|arc| (*arc).clone()))
114+
.map_err(|e| SiDbError::LayerDb(e.to_string()))
90115
}
91116
}
92117

lib/si-db/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ mod embedded {
6161
pub enum SiDbError {
6262
#[error("action id not found: {0}")]
6363
ActionIdNotFound(si_events::ActionId),
64+
#[error("layer db error: {0}")]
65+
LayerDb(String),
6466
#[error("missing func run: {0}")]
6567
MissingFuncRun(si_events::FuncRunId),
6668
#[error("nats error")]

0 commit comments

Comments
 (0)