Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.

Commit b9cd8d4

Browse files
authored
Merge pull request #8308 from systeminit/jhelwig/layer-cache-s3-to-pg-fallback-populates-s3
feat(layer-cache) Populate S3 with any data found through S3->PG fallback
2 parents 7ed9656 + 0b8c571 commit b9cd8d4

File tree

2 files changed

+87
-6
lines changed

2 files changed

+87
-6
lines changed

lib/dal/tests/integration_test/policy_report.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ async fn grouped_by_name(ctx: &mut DalContext) -> Result<()> {
384384
assert_eq!(&policy_many_reports, &groups[0].name);
385385
assert_eq!(
386386
MAX_REPORTS_PER_GROUP as usize, // expected
387-
groups[0].results.len() // actual
387+
groups[0].results.len() // actual
388388
);
389389
// Verify all reports have the correct name
390390
for report in &groups[0].results {
@@ -405,16 +405,16 @@ async fn grouped_by_name(ctx: &mut DalContext) -> Result<()> {
405405
// Verify group 2: policy with one report
406406
assert_eq!(&policy_one_report, &groups[1].name);
407407
assert_eq!(
408-
1, // expected
409-
groups[1].results.len() // actual
408+
1, // expected
409+
groups[1].results.len() // actual
410410
);
411411
assert_eq!(&policy_one_report, &groups[1].results[0].name);
412412

413413
// Verify group 3: policy with two reports
414414
assert_eq!(&policy_two_reports, &groups[2].name);
415415
assert_eq!(
416-
2, // expected
417-
groups[2].results.len() // actual
416+
2, // expected
417+
groups[2].results.len() // actual
418418
);
419419
for report in &groups[2].results {
420420
assert_eq!(&policy_two_reports, &report.name);

lib/si-layer-cache/src/layer_cache.rs

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ use serde::{
1111
de::DeserializeOwned,
1212
};
1313
use si_data_pg::PgPool;
14+
use si_events::{
15+
Actor,
16+
ChangeSetId,
17+
Tenancy,
18+
WorkspacePk,
19+
};
1420
use si_runtime::DedicatedExecutor;
1521
use telemetry::prelude::*;
1622
use telemetry_utils::monotonic;
@@ -24,6 +30,10 @@ use crate::{
2430
LayerDbError,
2531
db::serialize,
2632
error::LayerDbResult,
33+
event::{
34+
LayeredEvent,
35+
LayeredEventKind,
36+
},
2737
hybrid_cache::{
2838
Cache,
2939
CacheConfig,
@@ -272,6 +282,11 @@ where
272282

273283
let result = self.pg.get(&key).await?;
274284

285+
// Write back to S3 if found in PG
286+
if let Some(ref bytes) = result {
287+
self.queue_s3_writeback(key.clone(), bytes.clone());
288+
}
289+
275290
let result_label: &'static str =
276291
if result.is_some() { "hit" } else { "miss" };
277292
monotonic!(
@@ -447,6 +462,11 @@ where
447462

448463
let result = self.pg.get(&key).await?;
449464

465+
// Write back to S3 if found in PG
466+
if let Some(ref bytes) = result {
467+
self.queue_s3_writeback(key.clone(), bytes.clone());
468+
}
469+
450470
let result_label: &'static str =
451471
if result.is_some() { "hit" } else { "miss" };
452472
monotonic!(
@@ -557,6 +577,11 @@ where
557577
);
558578

559579
if let Some(pg_results) = self.pg.get_many(&still_not_found).await? {
580+
// Queue write-backs for all PG-sourced data
581+
for (key, bytes) in &pg_results {
582+
self.queue_s3_writeback(Arc::from(key.as_str()), bytes.clone());
583+
}
584+
560585
// Merge S3 and PG results
561586
let mut combined = s3_results;
562587
combined.extend(pg_results);
@@ -576,7 +601,16 @@ where
576601
to_backend = BackendType::Postgres.as_ref()
577602
);
578603

579-
self.pg.get_many(&not_found).await?
604+
let pg_results = self.pg.get_many(&not_found).await?;
605+
606+
// Queue write-backs for all PG-sourced data
607+
if let Some(ref results) = pg_results {
608+
for (key, bytes) in results {
609+
self.queue_s3_writeback(Arc::from(key.as_str()), bytes.clone());
610+
}
611+
}
612+
613+
pg_results
580614
}
581615
}
582616

@@ -688,4 +722,51 @@ where
688722

689723
s3_layer.put_direct(key, value).await
690724
}
725+
726+
/// Queue an async write-back to S3 for data found during PG fallback.
727+
/// This is a fire-and-forget operation - failures are logged but don't affect the read.
728+
fn queue_s3_writeback(&self, key: Arc<str>, value: Vec<u8>) {
729+
// Only queue if we have S3 layers configured
730+
let Some(s3_layers) = &self.s3_layers else {
731+
return;
732+
};
733+
734+
let Some(s3_layer) = s3_layers.get(self.name.as_str()) else {
735+
return;
736+
};
737+
738+
monotonic!(
739+
layer_cache_fallback_writeback_attempted = 1,
740+
cache_name = self.name.as_str(),
741+
backend = BackendType::S3.as_ref()
742+
);
743+
744+
// Create a write-back event with synthetic metadata.
745+
// This is a caching operation, not a domain event.
746+
// The S3Layer will automatically transform the key to the correct S3 format.
747+
let event = LayeredEvent::new(
748+
LayeredEventKind::Raw,
749+
Arc::new(self.name.clone()),
750+
key, // Original PG key - S3Layer transforms it
751+
Arc::new(value),
752+
Arc::new(String::new()), // Empty sort key - not used by S3Layer
753+
None, // No web events for write-back
754+
Tenancy::new(WorkspacePk::NONE, ChangeSetId::new()),
755+
Actor::System,
756+
);
757+
758+
// Queue for async S3 write - log on failure but don't block read
759+
if let Err(e) = s3_layer.insert(event) {
760+
warn!(
761+
cache.name = self.name.as_str(),
762+
error = ?e,
763+
"failed to queue S3 write-back for fallback data"
764+
);
765+
monotonic!(
766+
layer_cache_fallback_writeback_failed = 1,
767+
cache_name = self.name.as_str(),
768+
backend = BackendType::S3.as_ref()
769+
);
770+
}
771+
}
691772
}

0 commit comments

Comments
 (0)