Skip to content

Commit 1abbb52

Browse files
authored
fix: Ensure series key metadata is persisted to Parquet snapshots (#26449)
* chore: Ensure Parquet sort key is serialised with snapshots * chore: PR feedback, rename state variable to match intent * chore: Use `Default` trait to implement `TableBuffer::new` * chore: Fix change in file size with extra metadata * chore: Add rustdoc for `sort_key` field
1 parent 760c898 commit 1abbb52

File tree

5 files changed

+148
-63
lines changed

5 files changed

+148
-63
lines changed

influxdb3_catalog/src/catalog.rs

Lines changed: 95 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ use std::borrow::Cow;
3131
use std::cmp::Ordering;
3232
use std::collections::BTreeMap;
3333
use std::hash::Hash;
34+
use std::iter;
3435
use std::sync::Arc;
3536
use std::time::Duration;
3637
use tokio::sync::{Mutex, MutexGuard};
3738
use uuid::Uuid;
3839

3940
mod metrics;
4041
mod update;
42+
use schema::sort::SortKey;
4143
pub use schema::{InfluxColumnType, InfluxFieldType};
4244
pub use update::{CatalogUpdate, DatabaseCatalogTransaction, Prompt};
4345

@@ -1579,6 +1581,8 @@ pub struct TableDefinition {
15791581
pub series_key: Vec<ColumnId>,
15801582
/// The names of the columns in the table's series key
15811583
pub series_key_names: Vec<Arc<str>>,
1584+
/// The sort key for the table when persisted to storage.
1585+
pub sort_key: SortKey,
15821586
/// Last cache definitions for the table
15831587
pub last_caches: Repository<LastCacheId, LastCacheDefinition>,
15841588
/// Distinct cache definitions for the table
@@ -1642,19 +1646,32 @@ impl TableDefinition {
16421646
schema_builder.with_series_key(&series_key_names);
16431647
let schema = schema_builder.build().expect("schema should be valid");
16441648

1649+
let sort_key =
1650+
Self::make_sort_key(&series_key_names, columns.contains_name(TIME_COLUMN_NAME));
1651+
16451652
Ok(Self {
16461653
table_id,
16471654
table_name,
16481655
schema,
16491656
columns,
16501657
series_key,
16511658
series_key_names,
1659+
sort_key,
16521660
last_caches: Repository::new(),
16531661
distinct_caches: Repository::new(),
16541662
deleted: false,
16551663
})
16561664
}
16571665

1666+
fn make_sort_key(series_key_names: &[Arc<str>], add_time: bool) -> SortKey {
1667+
let iter = series_key_names.iter().cloned();
1668+
if add_time {
1669+
SortKey::from_columns(iter.chain(iter::once(TIME_COLUMN_NAME.into())))
1670+
} else {
1671+
SortKey::from_columns(iter)
1672+
}
1673+
}
1674+
16581675
/// Create a new table definition from a catalog op
16591676
pub fn new_from_op(table_definition: &CreateTableLog) -> Self {
16601677
let mut columns = Vec::with_capacity(table_definition.field_definitions.len());
@@ -1745,29 +1762,42 @@ impl TableDefinition {
17451762
for col_def in self.columns.resource_iter().cloned() {
17461763
cols.insert(Arc::clone(&col_def.name), col_def);
17471764
}
1765+
1766+
let mut sort_key_changed = false;
1767+
17481768
for (id, name, column_type) in columns {
17491769
let nullable = name.as_ref() != TIME_COLUMN_NAME;
17501770
assert!(
17511771
cols.insert(
17521772
Arc::clone(&name),
1753-
Arc::new(ColumnDefinition::new(id, name, column_type, nullable))
1773+
Arc::new(ColumnDefinition::new(
1774+
id,
1775+
Arc::clone(&name),
1776+
column_type,
1777+
nullable
1778+
))
17541779
)
17551780
.is_none(),
17561781
"attempted to add existing column"
17571782
);
17581783
// add new tags to the series key in the order provided
17591784
if matches!(column_type, InfluxColumnType::Tag) && !self.series_key.contains(&id) {
17601785
self.series_key.push(id);
1786+
self.series_key_names.push(name);
1787+
sort_key_changed = true;
1788+
} else if matches!(column_type, InfluxColumnType::Timestamp)
1789+
&& !self.series_key.contains(&id)
1790+
{
1791+
sort_key_changed = true;
17611792
}
17621793
}
17631794

17641795
let mut schema_builder = SchemaBuilder::with_capacity(cols.len());
1765-
// TODO: may need to capture some schema-level metadata, currently, this causes trouble in
1766-
// tests, so I am omitting this for now:
1767-
// schema_builder.measurement(&self.name);
1796+
schema_builder.measurement(self.table_name.as_ref());
17681797
for (name, col_def) in &cols {
17691798
schema_builder.influx_column(name.as_ref(), col_def.data_type);
17701799
}
1800+
schema_builder.with_series_key(&self.series_key_names);
17711801
let schema = schema_builder.build().expect("schema should be valid");
17721802
self.schema = schema;
17731803

@@ -1779,6 +1809,13 @@ impl TableDefinition {
17791809
}
17801810
self.columns = new_columns;
17811811

1812+
if sort_key_changed {
1813+
self.sort_key = Self::make_sort_key(
1814+
&self.series_key_names,
1815+
self.columns.contains_name(TIME_COLUMN_NAME),
1816+
);
1817+
}
1818+
17821819
Ok(())
17831820
}
17841821

@@ -2183,40 +2220,82 @@ mod tests {
21832220
TableDefinition::new(
21842221
TableId::from(0),
21852222
"test".into(),
2186-
vec![(
2187-
ColumnId::from(0),
2188-
"test".into(),
2189-
InfluxColumnType::Field(InfluxFieldType::String),
2190-
)],
2191-
vec![],
2223+
vec![
2224+
(
2225+
ColumnId::from(0),
2226+
"test".into(),
2227+
InfluxColumnType::Field(InfluxFieldType::String),
2228+
),
2229+
(ColumnId::from(1), "test999".into(), InfluxColumnType::Tag),
2230+
],
2231+
vec![ColumnId::from(1)],
21922232
)
21932233
.unwrap(),
21942234
),
21952235
)
21962236
.unwrap();
21972237

21982238
let mut table = database.tables.get_by_id(&TableId::from(0)).unwrap();
2199-
println!("table: {table:#?}");
2200-
assert_eq!(table.columns.len(), 1);
2239+
assert_eq!(table.columns.len(), 2);
22012240
assert_eq!(table.column_id_to_name_unchecked(&0.into()), "test".into());
2241+
assert_eq!(
2242+
table.column_id_to_name_unchecked(&1.into()),
2243+
"test999".into()
2244+
);
2245+
assert_eq!(table.series_key.len(), 1);
2246+
assert_eq!(table.series_key_names.len(), 1);
2247+
assert_eq!(table.sort_key, SortKey::from_columns(vec!["test999"]));
2248+
assert_eq!(table.schema.primary_key(), &["test999"]);
22022249

2250+
// add time and verify key is updated
22032251
Arc::make_mut(&mut table)
22042252
.add_columns(vec![(
2205-
ColumnId::from(1),
2253+
ColumnId::from(2),
2254+
TIME_COLUMN_NAME.into(),
2255+
InfluxColumnType::Timestamp,
2256+
)])
2257+
.unwrap();
2258+
assert_eq!(table.series_key.len(), 1);
2259+
assert_eq!(table.series_key_names.len(), 1);
2260+
assert_eq!(
2261+
table.sort_key,
2262+
SortKey::from_columns(vec!["test999", TIME_COLUMN_NAME])
2263+
);
2264+
assert_eq!(table.schema.primary_key(), &["test999", TIME_COLUMN_NAME]);
2265+
2266+
Arc::make_mut(&mut table)
2267+
.add_columns(vec![(
2268+
ColumnId::from(3),
22062269
"test2".into(),
22072270
InfluxColumnType::Tag,
22082271
)])
22092272
.unwrap();
2273+
2274+
// Verify the series key, series key names and sort key are updated when a tag column is added,
2275+
// and that the "time" column is still at the end.
2276+
assert_eq!(table.series_key.len(), 2);
2277+
assert_eq!(table.series_key_names, &["test999".into(), "test2".into()]);
2278+
assert_eq!(
2279+
table.sort_key,
2280+
SortKey::from_columns(vec!["test999", "test2", TIME_COLUMN_NAME])
2281+
);
2282+
22102283
let schema = table.influx_schema();
22112284
assert_eq!(
22122285
schema.field(0).0,
22132286
InfluxColumnType::Field(InfluxFieldType::String)
22142287
);
22152288
assert_eq!(schema.field(1).0, InfluxColumnType::Tag);
2289+
assert_eq!(schema.field(2).0, InfluxColumnType::Tag);
22162290

2217-
println!("table: {table:#?}");
2218-
assert_eq!(table.columns.len(), 2);
2219-
assert_eq!(table.column_name_to_id_unchecked("test2"), 1.into());
2291+
assert_eq!(table.columns.len(), 4);
2292+
assert_eq!(table.column_name_to_id_unchecked("test2"), 3.into());
2293+
2294+
// Verify the schema is updated.
2295+
assert_eq!(table.schema.len(), 4);
2296+
assert_eq!(table.schema.measurement(), Some(&"test".to_owned()));
2297+
let pk = table.schema.primary_key();
2298+
assert_eq!(pk, &["test999", "test2", TIME_COLUMN_NAME]);
22202299
}
22212300

22222301
#[tokio::test]

influxdb3_catalog/src/snapshot/versions/v2.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ impl Snapshot for TableDefinition {
423423
columns: table_def.columns,
424424
series_key: table_def.series_key,
425425
series_key_names: table_def.series_key_names,
426+
sort_key: table_def.sort_key,
426427
last_caches: Repository::from_snapshot(snap.last_caches),
427428
distinct_caches: Repository::from_snapshot(snap.distinct_caches),
428429
deleted: snap.deleted,

influxdb3_server/src/query_executor/mod.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -942,9 +942,9 @@ mod tests {
942942
"+------------+------------+-----------+----------+----------+",
943943
"| table_name | size_bytes | row_count | min_time | max_time |",
944944
"+------------+------------+-----------+----------+----------+",
945-
"| cpu | 1961 | 3 | 0 | 20 |",
946-
"| cpu | 1961 | 3 | 30 | 50 |",
947-
"| cpu | 1961 | 3 | 60 | 80 |",
945+
"| cpu | 2061 | 3 | 0 | 20 |",
946+
"| cpu | 2061 | 3 | 30 | 50 |",
947+
"| cpu | 2061 | 3 | 60 | 80 |",
948948
"+------------+------------+-----------+----------+----------+",
949949
],
950950
},
@@ -957,9 +957,9 @@ mod tests {
957957
"+------------+------------+-----------+----------+----------+",
958958
"| table_name | size_bytes | row_count | min_time | max_time |",
959959
"+------------+------------+-----------+----------+----------+",
960-
"| mem | 1961 | 3 | 0 | 20 |",
961-
"| mem | 1961 | 3 | 30 | 50 |",
962-
"| mem | 1961 | 3 | 60 | 80 |",
960+
"| mem | 2061 | 3 | 0 | 20 |",
961+
"| mem | 2061 | 3 | 30 | 50 |",
962+
"| mem | 2061 | 3 | 60 | 80 |",
963963
"+------------+------------+-----------+----------+----------+",
964964
],
965965
},
@@ -971,12 +971,12 @@ mod tests {
971971
"+------------+------------+-----------+----------+----------+",
972972
"| table_name | size_bytes | row_count | min_time | max_time |",
973973
"+------------+------------+-----------+----------+----------+",
974-
"| cpu | 1961 | 3 | 0 | 20 |",
975-
"| cpu | 1961 | 3 | 30 | 50 |",
976-
"| cpu | 1961 | 3 | 60 | 80 |",
977-
"| mem | 1961 | 3 | 0 | 20 |",
978-
"| mem | 1961 | 3 | 30 | 50 |",
979-
"| mem | 1961 | 3 | 60 | 80 |",
974+
"| cpu | 2061 | 3 | 0 | 20 |",
975+
"| cpu | 2061 | 3 | 30 | 50 |",
976+
"| cpu | 2061 | 3 | 60 | 80 |",
977+
"| mem | 2061 | 3 | 0 | 20 |",
978+
"| mem | 2061 | 3 | 30 | 50 |",
979+
"| mem | 2061 | 3 | 60 | 80 |",
980980
"+------------+------------+-----------+----------+----------+",
981981
],
982982
},
@@ -989,10 +989,10 @@ mod tests {
989989
"+------------+------------+-----------+----------+----------+",
990990
"| table_name | size_bytes | row_count | min_time | max_time |",
991991
"+------------+------------+-----------+----------+----------+",
992-
"| cpu | 1961 | 3 | 0 | 20 |",
993-
"| cpu | 1961 | 3 | 30 | 50 |",
994-
"| cpu | 1961 | 3 | 60 | 80 |",
995-
"| mem | 1961 | 3 | 60 | 80 |",
992+
"| cpu | 2061 | 3 | 0 | 20 |",
993+
"| cpu | 2061 | 3 | 30 | 50 |",
994+
"| cpu | 2061 | 3 | 60 | 80 |",
995+
"| mem | 2061 | 3 | 60 | 80 |",
996996
"+------------+------------+-----------+----------+----------+",
997997
],
998998
},

influxdb3_write/src/write_buffer/queryable_buffer.rs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ impl QueryableBuffer {
183183
let table_def = db_schema
184184
.table_definition_by_id(table_id)
185185
.expect("table exists");
186+
let sort_key = table_def.sort_key.clone();
186187
let snapshot_chunks =
187188
table_buffer.snapshot(table_def, snapshot_details.end_time_marker);
188189

@@ -206,7 +207,7 @@ impl QueryableBuffer {
206207
batch: chunk.record_batch,
207208
schema: chunk.schema,
208209
timestamp_min_max: chunk.timestamp_min_max,
209-
sort_key: table_buffer.sort_key.clone(),
210+
sort_key: sort_key.clone(),
210211
};
211212

212213
persisting_chunks.push(persist_job);
@@ -447,25 +448,10 @@ impl BufferState {
447448
}
448449

449450
fn add_write_batch(&mut self, write_batch: &WriteBatch) {
450-
let db_schema = self
451-
.catalog
452-
.db_schema_by_id(&write_batch.database_id)
453-
.expect("database should exist");
454-
455451
let database_buffer = self.db_to_table.entry(write_batch.database_id).or_default();
456452

457453
for (table_id, table_chunks) in &write_batch.table_chunks {
458-
let table_buffer = database_buffer.entry(*table_id).or_insert_with(|| {
459-
let table_def = db_schema
460-
.table_definition_by_id(table_id)
461-
.expect("table should exist");
462-
let sort_key = table_def
463-
.series_key
464-
.iter()
465-
.map(|c| Arc::clone(&table_def.column_id_to_name_unchecked(c)));
466-
467-
TableBuffer::new(SortKey::from_columns(sort_key))
468-
});
454+
let table_buffer = database_buffer.entry(*table_id).or_default();
469455
for (chunk_time, chunk) in &table_chunks.chunk_time_to_chunk {
470456
table_buffer.buffer_chunk(*chunk_time, &chunk.rows);
471457
}
@@ -618,6 +604,8 @@ mod tests {
618604
use iox_time::{MockProvider, Time, TimeProvider};
619605
use object_store::ObjectStore;
620606
use object_store::memory::InMemory;
607+
use parquet::arrow::arrow_reader;
608+
use parquet::arrow::arrow_reader::ArrowReaderOptions;
621609
use parquet_file::storage::{ParquetStorage, StorageId};
622610
use std::num::NonZeroUsize;
623611

@@ -687,7 +675,7 @@ mod tests {
687675
// create the initial write with two tags
688676
let val = WriteValidator::initialize(db.clone(), Arc::clone(&catalog)).unwrap();
689677
let lp = format!(
690-
"foo,t1=a,t2=b f1=1i {}",
678+
"foo,t2=a,t1=b f1=1i {}",
691679
time_provider.now().timestamp_nanos()
692680
);
693681

@@ -766,6 +754,10 @@ mod tests {
766754
// validate we have a single persisted file
767755
let db = catalog.db_schema("testdb").unwrap();
768756
let table = db.table_definition("foo").unwrap();
757+
assert_eq!(
758+
table.sort_key,
759+
SortKey::from_columns(vec!["t2", "t1", "time"])
760+
);
769761
let files = queryable_buffer
770762
.persisted_files
771763
.get_files(db.id, table.table_id);
@@ -801,5 +793,22 @@ mod tests {
801793
.persisted_files
802794
.get_files(db.id, table.table_id);
803795
assert_eq!(files.len(), 2);
796+
797+
// Verify the `iox::series::key` metadata is present in the parquet file
798+
{
799+
let path = Path::from(files[0].path.as_str());
800+
let res = object_store
801+
.get(&path)
802+
.await
803+
.unwrap()
804+
.bytes()
805+
.await
806+
.unwrap();
807+
let metadata =
808+
arrow_reader::ArrowReaderMetadata::load(&res, ArrowReaderOptions::new()).unwrap();
809+
let schema: Schema = Schema::try_from(Arc::clone(metadata.schema())).unwrap();
810+
let primary_key = schema.primary_key();
811+
assert_eq!(primary_key, &["t2", "t1", "time"]);
812+
}
804813
}
805814
}

0 commit comments

Comments
 (0)