Skip to content

Commit 6e9446a

Browse files
authored
test: reproduce problem with NULL backfill of omitted tag cols (#26448)
* test: reproduce problem with NULL backfill of omitted tag cols * fix: do not fill tag columns with empty string on persist * chore: clippy
1 parent 56df158 commit 6e9446a

File tree

2 files changed

+120
-18
lines changed

2 files changed

+120
-18
lines changed

influxdb3_write/src/write_buffer/mod.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,8 @@ mod tests {
660660
use crate::paths::SnapshotInfoFilePath;
661661
use crate::persister::Persister;
662662
use crate::test_helpers::WriteBufferTester;
663+
use arrow::array::AsArray;
664+
use arrow::datatypes::Int32Type;
663665
use arrow::record_batch::RecordBatch;
664666
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
665667
use bytes::Bytes;
@@ -3007,6 +3009,124 @@ mod tests {
30073009
let _buf = init().await;
30083010
}
30093011

3012+
/// Check for the case where tags that exist in a table, but have not been written to since
3013+
/// the most recent server start, are perstisted as NULL and not an empty string
3014+
#[test_log::test(tokio::test)]
3015+
async fn test_null_tags_persisted_as_null_not_empty_string() {
3016+
// Setup object store and write buffer for first time
3017+
let object_store = Arc::new(InMemory::new());
3018+
let wal_config = WalConfig {
3019+
gen1_duration: influxdb3_wal::Gen1Duration::new_1m(),
3020+
max_write_buffer_size: 100,
3021+
flush_interval: Duration::from_millis(10),
3022+
snapshot_size: 1,
3023+
};
3024+
let (wb, _ctx, _tp) = setup(
3025+
Time::from_timestamp_nanos(0),
3026+
Arc::clone(&object_store) as _,
3027+
wal_config,
3028+
)
3029+
.await;
3030+
3031+
// Do enough writes to trigger a snapshot, so the row containing `tag` is flushed out
3032+
// of the write buffer, and will not be brought back when it is restarted:
3033+
do_writes(
3034+
"foo",
3035+
wb.as_ref(),
3036+
&[
3037+
// first write has `tag`, but all subsequent writes do not
3038+
TestWrite {
3039+
lp: "bar,tag=a val=1",
3040+
time_seconds: 1,
3041+
},
3042+
TestWrite {
3043+
lp: "bar val=2",
3044+
time_seconds: 2,
3045+
},
3046+
TestWrite {
3047+
lp: "bar val=3",
3048+
time_seconds: 3,
3049+
},
3050+
],
3051+
)
3052+
.await;
3053+
3054+
// Wait until there is a snapshot; this will ensure that the row with the tag value
3055+
// is flushed out of the write buffer, and will not be replayed from the WAL on the
3056+
// next startup:
3057+
verify_snapshot_count(1, &wb.persister).await;
3058+
3059+
// Drop the write buffer so we can re-initialize:
3060+
drop(wb);
3061+
3062+
// Re-initialize the write buffer:
3063+
let (wb, ctx, _tp) = setup(
3064+
Time::from_timestamp_nanos(0),
3065+
Arc::clone(&object_store) as _,
3066+
wal_config,
3067+
)
3068+
.await;
3069+
3070+
// Do enough writes again to trigger a snapshot; the `tag` column was never written here
3071+
// so the persistence step will be responsible for filling in that column with NULL:
3072+
do_writes(
3073+
"foo",
3074+
wb.as_ref(),
3075+
&[
3076+
TestWrite {
3077+
lp: "bar val=4",
3078+
time_seconds: 4,
3079+
},
3080+
TestWrite {
3081+
lp: "bar val=5",
3082+
time_seconds: 5,
3083+
},
3084+
TestWrite {
3085+
lp: "bar val=6",
3086+
time_seconds: 6,
3087+
},
3088+
],
3089+
)
3090+
.await;
3091+
3092+
// Wait for snapshot again to make sure when we query below, we are drawing from parquet
3093+
// and not in-memory data:
3094+
verify_snapshot_count(2, &wb.persister).await;
3095+
3096+
// Get batches from the buffer, including what is still in the queryable buffer in
3097+
// memory as well as what has been persisted as parquet:
3098+
let batches = wb.get_record_batches_unchecked("foo", "bar", &ctx).await;
3099+
assert_batches_sorted_eq!(
3100+
[
3101+
"+-----+----------------------+-----+",
3102+
"| tag | time | val |",
3103+
"+-----+----------------------+-----+",
3104+
"| | 1970-01-01T00:00:02Z | 2.0 |",
3105+
"| | 1970-01-01T00:00:03Z | 3.0 |",
3106+
"| | 1970-01-01T00:00:04Z | 4.0 |",
3107+
"| | 1970-01-01T00:00:05Z | 5.0 |",
3108+
"| | 1970-01-01T00:00:06Z | 6.0 |",
3109+
"| a | 1970-01-01T00:00:01Z | 1.0 |",
3110+
"+-----+----------------------+-----+",
3111+
],
3112+
&batches
3113+
);
3114+
3115+
debug!("record batches:\n\n{batches:#?}");
3116+
3117+
// Iterate over the `tag` column values and check that none of them are an empty string
3118+
for batch in batches {
3119+
let tag_col = batch
3120+
.column_by_name("tag")
3121+
.unwrap()
3122+
.as_dictionary::<Int32Type>();
3123+
let tag_vals = tag_col.values().as_string::<i32>();
3124+
for s in tag_vals.iter().flatten() {
3125+
assert!(!s.is_empty(), "there should not be any empty strings");
3126+
}
3127+
}
3128+
}
3129+
30103130
struct TestWrite<LP> {
30113131
lp: LP,
30123132
time_seconds: i64,

influxdb3_write/src/write_buffer/table_buffer.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -386,24 +386,6 @@ impl MutableTableChunk {
386386
cols.push(col);
387387
}
388388

389-
// ensure that every series key column is present in the batch
390-
for col_id in &table_def.series_key {
391-
if !cols_in_batch.contains(col_id) {
392-
let col_name = table_def
393-
.column_id_to_name(col_id)
394-
.expect("valid column id");
395-
schema_builder.influx_column(col_name.as_ref(), InfluxColumnType::Tag);
396-
let mut tag_builder: StringDictionaryBuilder<Int32Type> =
397-
StringDictionaryBuilder::new();
398-
for _ in 0..self.row_count {
399-
tag_builder.append_value("");
400-
}
401-
402-
cols.push(Arc::new(tag_builder.finish()));
403-
cols_in_batch.insert(*col_id);
404-
}
405-
}
406-
407389
// ensure that every field column is present in the batch
408390
for (col_id, col_def) in table_def.columns.iter() {
409391
if !cols_in_batch.contains(col_id) {

0 commit comments

Comments
 (0)