Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.

Commit c40d4df

Browse files
committed
feat: Implement func run and func run log layerdb fallbacks on the new client
1 parent d148711 commit c40d4df

File tree

10 files changed

+171
-46
lines changed

10 files changed

+171
-46
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/dal/src/context.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,15 @@ impl SiDbContext for DalContext {
506506
fn change_set_id(&self) -> ChangeSetId {
507507
self.change_set_id()
508508
}
509+
510+
// TODO get rid of these after we don't need layer db fallbacks
511+
fn func_run_layer_db(&self) -> &si_layer_cache::db::func_run::FuncRunLayerDb {
512+
self.services_context.layer_db.func_run()
513+
}
514+
515+
fn func_run_log_layer_db(&self) -> &si_layer_cache::db::func_run_log::FuncRunLogLayerDb {
516+
self.services_context.layer_db.func_run_log()
517+
}
509518
}
510519

511520
impl SiDbTransactions for Transactions {

lib/si-db/BUCK

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ rust_library(
1111
"//lib/si-data-pg:si-data-pg",
1212
"//lib/si-events-rs:si-events",
1313
"//lib/si-id:si-id",
14+
"//lib/si-layer-cache:si-layer-cache",
1415
"//lib/telemetry-rs:telemetry",
1516
"//third-party/rust:async-trait",
1617
"//third-party/rust:chrono",

lib/si-db/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ si-data-nats = { path = "../../lib/si-data-nats" }
2323
si-data-pg = { path = "../../lib/si-data-pg" }
2424
si-events = { path = "../../lib/si-events-rs" }
2525
si-id = { path = "../../lib/si-id" }
26+
si-layer-cache = { path = "../../lib/si-layer-cache" }
2627
strum = { workspace = true }
2728
telemetry = { path = "../../lib/telemetry-rs" }
2829
thiserror = { workspace = true }

lib/si-db/src/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
use async_trait::async_trait;
22
use si_id::ChangeSetId;
3+
use si_layer_cache::db::{
4+
func_run::FuncRunLayerDb,
5+
func_run_log::FuncRunLogLayerDb,
6+
};
37
use tokio::sync::MappedMutexGuard;
48

59
use crate::{
@@ -21,4 +25,7 @@ pub trait SiDbContext {
2125
fn tenancy(&self) -> &Tenancy;
2226
fn visibility(&self) -> &Visibility;
2327
fn change_set_id(&self) -> ChangeSetId;
28+
// TODO get rid of these after we don't need layer db fallbacks
29+
fn func_run_layer_db(&self) -> &FuncRunLayerDb;
30+
fn func_run_log_layer_db(&self) -> &FuncRunLogLayerDb;
2431
}

lib/si-db/src/func_run.rs

Lines changed: 114 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use si_events::{
24
ActionId,
35
AttributeValueId,
@@ -84,13 +86,13 @@ impl FuncRunDb {
8486
let postcard_bytes =
8587
postcard::to_stdvec(&func_run).map_err(|e| SiDbError::Postcard(e.to_string()))?;
8688

89+
// Write to si-db
8790
ctx.txns()
8891
.await?
8992
.pg()
9093
.execute(
9194
"INSERT INTO func_runs (
9295
key,
93-
sort_key,
9496
created_at,
9597
updated_at,
9698
state,
@@ -118,16 +120,14 @@ impl FuncRunDb {
118120
$11,
119121
$12,
120122
$13,
121-
$14,
122-
$15
123+
$14
123124
) ON CONFLICT (key) DO UPDATE SET
124125
updated_at = EXCLUDED.updated_at,
125126
state = EXCLUDED.state,
126127
json_value = EXCLUDED.json_value,
127128
value = EXCLUDED.value",
128129
&[
129130
&func_run.id().to_string(),
130-
&func_run.tenancy().workspace_pk.to_string(),
131131
&func_run.created_at(),
132132
&func_run.updated_at(),
133133
&func_run.state().to_string(),
@@ -147,9 +147,21 @@ impl FuncRunDb {
147147
)
148148
.await?;
149149

150+
// Also write to layer-db for backward compatibility during migration
151+
ctx.func_run_layer_db()
152+
.write(
153+
Arc::new(func_run.clone()),
154+
None, // web_events
155+
*func_run.tenancy(),
156+
*func_run.actor(),
157+
)
158+
.await
159+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?;
160+
150161
Ok(())
151162
}
152163

164+
// NOTE(victor): This is only used by migration so it does not write to layer-db
153165
/// Write multiple func runs to the database in a single INSERT query.
154166
/// This is more efficient than calling upsert multiple times.
155167
pub async fn upsert_batch(ctx: &impl SiDbContext, func_runs: Vec<FuncRun>) -> SiDbResult<()> {
@@ -179,14 +191,15 @@ impl FuncRunDb {
179191
let mut values_clauses = Vec::new();
180192
let mut row_data_vec = Vec::new();
181193
let mut param_index = 1;
194+
const COL_COUNT: usize = 14;
182195

183196
for func_run in &func_runs {
184197
let json: serde_json::Value = serde_json::to_value(func_run)?;
185198
let postcard_bytes =
186199
postcard::to_stdvec(func_run).map_err(|e| SiDbError::Postcard(e.to_string()))?;
187200

188-
// Create placeholders for this row ($1, $2, ... $15)
189-
let placeholders: Vec<String> = (param_index..param_index + 15)
201+
// Create placeholders for this row ($1, $2, ... $COL_COUNT)
202+
let placeholders: Vec<String> = (param_index..param_index + COL_COUNT)
190203
.map(|i| format!("${i}"))
191204
.collect();
192205
values_clauses.push(format!("({})", placeholders.join(", ")));
@@ -210,13 +223,12 @@ impl FuncRunDb {
210223
postcard_bytes,
211224
});
212225

213-
param_index += 15;
226+
param_index += COL_COUNT;
214227
}
215228

216229
let query = format!(
217230
"INSERT INTO func_runs (
218231
key,
219-
sort_key,
220232
created_at,
221233
updated_at,
222234
state,
@@ -240,7 +252,8 @@ impl FuncRunDb {
240252
);
241253

242254
// Build the parameter array dynamically
243-
// We need to store the slices separately to keep them alive
255+
// This looks like extra work, but since the pg library expects refs of everything,
256+
// we had to create the row_data_vec to own the values while we pass them down
244257
let postcard_slices: Vec<&[u8]> = row_data_vec
245258
.iter()
246259
.map(|rd| rd.postcard_bytes.as_slice())
@@ -249,7 +262,6 @@ impl FuncRunDb {
249262
let mut params: Vec<&(dyn postgres_types::ToSql + Sync)> = Vec::new();
250263
for (idx, row_data) in row_data_vec.iter().enumerate() {
251264
params.push(&row_data.id);
252-
params.push(&row_data.workspace_pk);
253265
params.push(&row_data.created_at);
254266
params.push(&row_data.updated_at);
255267
params.push(&row_data.state);
@@ -270,6 +282,7 @@ impl FuncRunDb {
270282
Ok(())
271283
}
272284

285+
// NOTE(victor): This is only used by migration so it does not fallback to layer-db
273286
/// Returns the IDs from the input batch that do NOT exist in the database.
274287
/// This is useful for determining which func runs need to be migrated.
275288
pub async fn find_missing_ids(
@@ -323,7 +336,11 @@ impl FuncRunDb {
323336
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
324337
Ok(Some(func_run))
325338
} else {
326-
Ok(None)
339+
// Fall back to layer-db if not found in si-db
340+
ctx.func_run_layer_db()
341+
.get_last_run_for_action_id_opt(workspace_pk, action_id)
342+
.await
343+
.map_err(|e| SiDbError::LayerDb(e.to_string()))
327344
}
328345
}
329346

@@ -360,6 +377,18 @@ impl FuncRunDb {
360377
func_runs.push(func_run);
361378
}
362379

380+
// If si-db returned empty, fall back to layer-db
381+
if func_runs.is_empty() {
382+
if let Some(layer_func_runs) = ctx
383+
.func_run_layer_db()
384+
.list_management_history(workspace_pk, change_set_id)
385+
.await
386+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?
387+
{
388+
return Ok(layer_func_runs);
389+
}
390+
}
391+
363392
Ok(func_runs)
364393
}
365394

@@ -386,7 +415,16 @@ impl FuncRunDb {
386415
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
387416
Ok(Some(func_run))
388417
} else {
389-
Ok(None)
418+
// Fall back to layer-db if not found in si-db
419+
ctx.func_run_layer_db()
420+
.get_last_management_run_for_func_and_component_id(
421+
workspace_pk,
422+
change_set_id,
423+
component_id,
424+
func_id,
425+
)
426+
.await
427+
.map_err(|e| SiDbError::LayerDb(e.to_string()))
390428
}
391429
}
392430

@@ -411,7 +449,11 @@ impl FuncRunDb {
411449
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
412450
Ok(Some(func_run))
413451
} else {
414-
Ok(None)
452+
// Fall back to layer-db if not found in si-db
453+
ctx.func_run_layer_db()
454+
.get_last_qualification_for_attribute_value_id(workspace_pk, attribute_value_id)
455+
.await
456+
.map_err(|e| SiDbError::LayerDb(e.to_string()))
415457
}
416458
}
417459

@@ -432,29 +474,19 @@ impl FuncRunDb {
432474
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
433475
Ok(Some(func_run))
434476
} else {
435-
Ok(None)
477+
// Fall back to layer-db if not found in si-db
478+
ctx.func_run_layer_db()
479+
.try_read(key)
480+
.await
481+
.map(|arc_func_run| Some((*arc_func_run).clone()))
482+
.map_err(|e| SiDbError::LayerDb(e.to_string()))
436483
}
437484
}
438485

439486
pub async fn try_read(ctx: &impl SiDbContext, key: FuncRunId) -> SiDbResult<FuncRun> {
440-
let maybe_row = ctx
441-
.txns()
487+
Self::read(ctx, key)
442488
.await?
443-
.pg()
444-
.query_opt(
445-
&format!("SELECT value FROM {DBNAME} WHERE key = $1"),
446-
&[&key.to_string()],
447-
)
448-
.await?;
449-
450-
if let Some(row) = maybe_row {
451-
let value_bytes: Vec<u8> = row.try_get("value")?;
452-
let func_run: FuncRun = postcard::from_bytes(&value_bytes)
453-
.map_err(|e| SiDbError::Postcard(e.to_string()))?;
454-
Ok(func_run)
455-
} else {
456-
Err(SiDbError::MissingFuncRun(key))
457-
}
489+
.ok_or(SiDbError::MissingFuncRun(key))
458490
}
459491

460492
pub async fn read_many_for_workspace(
@@ -476,6 +508,21 @@ impl FuncRunDb {
476508
func_runs.push(func_run);
477509
}
478510

511+
// Fall back to layer-db if si-db returned no results
512+
if func_runs.is_empty() {
513+
if let Some(layer_func_runs) = ctx
514+
.func_run_layer_db()
515+
.read_many_for_workspace(workspace_pk)
516+
.await
517+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?
518+
{
519+
return Ok(layer_func_runs
520+
.into_iter()
521+
.map(|arc| (*arc).clone())
522+
.collect());
523+
}
524+
}
525+
479526
Ok(func_runs)
480527
}
481528

@@ -527,6 +574,21 @@ impl FuncRunDb {
527574
func_runs.push(func_run);
528575
}
529576

577+
// Fall back to layer-db if si-db returned no results
578+
if func_runs.is_empty() {
579+
if let Some(layer_func_runs) = ctx
580+
.func_run_layer_db()
581+
.read_many_for_workspace_paginated(workspace_pk, change_set_id, limit, cursor)
582+
.await
583+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?
584+
{
585+
return Ok(layer_func_runs
586+
.into_iter()
587+
.map(|arc| (*arc).clone())
588+
.collect());
589+
}
590+
}
591+
530592
Ok(func_runs)
531593
}
532594

@@ -585,6 +647,27 @@ impl FuncRunDb {
585647
func_runs.push(func_run);
586648
}
587649

650+
// Fall back to layer-db if si-db returned no results
651+
if func_runs.is_empty() {
652+
if let Some(layer_func_runs) = ctx
653+
.func_run_layer_db()
654+
.read_many_for_component_paginated(
655+
workspace_pk,
656+
change_set_id,
657+
component_id,
658+
limit,
659+
cursor,
660+
)
661+
.await
662+
.map_err(|e| SiDbError::LayerDb(e.to_string()))?
663+
{
664+
return Ok(layer_func_runs
665+
.into_iter()
666+
.map(|arc| (*arc).clone())
667+
.collect());
668+
}
669+
}
670+
588671
Ok(func_runs)
589672
}
590673
}

0 commit comments

Comments
 (0)