Skip to content

Commit b3a681d

Browse files
authored
s3_scrubber: updates for sharding (#6281)
This is a lightweight change to keep the scrubber providing sensible output when using sharding. - The timeline count was wrong when using sharding - When checking for tenant existence, we didn't re-use results between different shards in the same tenant Closes: #5929
1 parent b5ed6f2 commit b3a681d

File tree

3 files changed

+75
-23
lines changed

3 files changed

+75
-23
lines changed

s3_scrubber/src/garbage.rs

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
//! S3 objects which are either not referenced by any metadata, or are referenced by a
33
//! control plane tenant/timeline in a deleted state.
44
5-
use std::{collections::HashMap, sync::Arc};
5+
use std::{
6+
collections::{HashMap, HashSet},
7+
sync::Arc,
8+
};
69

710
use anyhow::Context;
811
use aws_sdk_s3::{
@@ -118,6 +121,13 @@ const S3_CONCURRENCY: usize = 32;
118121
// How many concurrent API requests to make to the console API.
119122
const CONSOLE_CONCURRENCY: usize = 128;
120123

124+
struct ConsoleCache {
125+
/// Set of tenants found in the control plane API
126+
projects: HashMap<TenantId, ProjectData>,
127+
/// Set of tenants for which the control plane API returned 404
128+
not_found: HashSet<TenantId>,
129+
}
130+
121131
async fn find_garbage_inner(
122132
bucket_config: BucketConfig,
123133
console_config: ConsoleConfig,
@@ -143,23 +153,49 @@ async fn find_garbage_inner(
143153
console_projects.len()
144154
);
145155

146-
// TODO(sharding): batch calls into Console so that we only call once for each TenantId,
147-
// rather than checking the same TenantId for multiple TenantShardId
156+
// Because many tenant shards may look up the same TenantId, we maintain a cache.
157+
let console_cache = Arc::new(std::sync::Mutex::new(ConsoleCache {
158+
projects: console_projects,
159+
not_found: HashSet::new(),
160+
}));
148161

149162
// Enumerate Tenants in S3, and check if each one exists in Console
150163
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
151164
let tenants = stream_tenants(&s3_client, &target);
152165
let tenants_checked = tenants.map_ok(|t| {
153166
let api_client = cloud_admin_api_client.clone();
154-
let console_projects = &console_projects;
167+
let console_cache = console_cache.clone();
155168
async move {
156-
match console_projects.get(&t.tenant_id) {
169+
// Check cache before issuing API call
170+
let project_data = {
171+
let cache = console_cache.lock().unwrap();
172+
let result = cache.projects.get(&t.tenant_id).cloned();
173+
if result.is_none() && cache.not_found.contains(&t.tenant_id) {
174+
return Ok((t, None));
175+
}
176+
result
177+
};
178+
179+
match project_data {
157180
Some(project_data) => Ok((t, Some(project_data.clone()))),
158-
None => api_client
159-
.find_tenant_project(t.tenant_id)
160-
.await
161-
.map_err(|e| anyhow::anyhow!(e))
162-
.map(|r| (t, r)),
181+
None => {
182+
let project_data = api_client
183+
.find_tenant_project(t.tenant_id)
184+
.await
185+
.map_err(|e| anyhow::anyhow!(e));
186+
187+
// Populate cache with result of API call
188+
{
189+
let mut cache = console_cache.lock().unwrap();
190+
if let Ok(Some(project_data)) = &project_data {
191+
cache.projects.insert(t.tenant_id, project_data.clone());
192+
} else if let Ok(None) = &project_data {
193+
cache.not_found.insert(t.tenant_id);
194+
}
195+
}
196+
197+
project_data.map(|r| (t, r))
198+
}
163199
}
164200
}
165201
});

s3_scrubber/src/scan_metadata.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use utils::id::TenantId;
1717

1818
#[derive(Serialize)]
1919
pub struct MetadataSummary {
20-
count: usize,
20+
tenant_count: usize,
21+
timeline_count: usize,
22+
timeline_shard_count: usize,
2123
with_errors: HashSet<TenantShardTimelineId>,
2224
with_warnings: HashSet<TenantShardTimelineId>,
2325
with_orphans: HashSet<TenantShardTimelineId>,
@@ -87,7 +89,9 @@ impl MinMaxHisto {
8789
impl MetadataSummary {
8890
fn new() -> Self {
8991
Self {
90-
count: 0,
92+
tenant_count: 0,
93+
timeline_count: 0,
94+
timeline_shard_count: 0,
9195
with_errors: HashSet::new(),
9296
with_warnings: HashSet::new(),
9397
with_orphans: HashSet::new(),
@@ -112,7 +116,7 @@ impl MetadataSummary {
112116
}
113117

114118
fn update_data(&mut self, data: &S3TimelineBlobData) {
115-
self.count += 1;
119+
self.timeline_shard_count += 1;
116120
if let BlobDataParseResult::Parsed {
117121
index_part,
118122
index_part_generation: _,
@@ -158,16 +162,20 @@ impl MetadataSummary {
158162
);
159163

160164
format!(
161-
"Timelines: {0}
162-
With errors: {1}
163-
With warnings: {2}
164-
With orphan layers: {3}
165+
"Tenants: {}
166+
Timelines: {}
167+
Timeline-shards: {}
168+
With errors: {}
169+
With warnings: {}
170+
With orphan layers: {}
165171
Index versions: {version_summary}
166-
Timeline size bytes: {4}
167-
Layer size bytes: {5}
168-
Timeline layer count: {6}
172+
Timeline size bytes: {}
173+
Layer size bytes: {}
174+
Timeline layer count: {}
169175
",
170-
self.count,
176+
self.tenant_count,
177+
self.timeline_count,
178+
self.timeline_shard_count,
171179
self.with_errors.len(),
172180
self.with_warnings.len(),
173181
self.with_orphans.len(),
@@ -182,7 +190,7 @@ Timeline layer count: {6}
182190
}
183191

184192
pub fn is_empty(&self) -> bool {
185-
self.count == 0
193+
self.timeline_shard_count == 0
186194
}
187195
}
188196

@@ -233,8 +241,12 @@ pub async fn scan_metadata(
233241
mut tenant_objects: TenantObjectListing,
234242
timelines: Vec<(TenantShardTimelineId, S3TimelineBlobData)>,
235243
) {
244+
summary.tenant_count += 1;
245+
246+
let mut timeline_ids = HashSet::new();
236247
let mut timeline_generations = HashMap::new();
237248
for (ttid, data) in timelines {
249+
timeline_ids.insert(ttid.timeline_id);
238250
// Stash the generation of each timeline, for later use identifying orphan layers
239251
if let BlobDataParseResult::Parsed {
240252
index_part: _index_part,
@@ -252,6 +264,8 @@ pub async fn scan_metadata(
252264
summary.update_analysis(&ttid, &analysis);
253265
}
254266

267+
summary.timeline_count += timeline_ids.len();
268+
255269
// Identifying orphan layers must be done on a tenant-wide basis, because individual
256270
// shards' layers may be referenced by other shards.
257271
//

test_runner/regress/test_pageserver_generations.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,9 @@ def parse_generation_suffix(key):
254254
metadata_summary = S3Scrubber(
255255
neon_env_builder.test_output_dir, neon_env_builder
256256
).scan_metadata()
257-
assert metadata_summary["count"] == 1 # Scrubber should have seen our timeline
257+
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
258+
assert metadata_summary["timeline_count"] == 1
259+
assert metadata_summary["timeline_shard_count"] == 1
258260
assert not metadata_summary["with_errors"]
259261
assert not metadata_summary["with_warnings"]
260262

0 commit comments

Comments
 (0)