Skip to content

Commit 959555a

Browse files
authored
fix: Distinct Value Cache handles NULL values (#26457)
Closes #26451
1 parent 4dc61df commit 959555a

File tree

2 files changed

+126
-68
lines changed

2 files changed

+126
-68
lines changed

influxdb3_cache/src/distinct_cache/cache.rs

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,12 @@ impl DistinctCache {
9898
}
9999
attempted => return Err(CacheError::NonTagOrStringColumn { attempted }),
100100
};
101-
102-
builder.push(Arc::new(Field::new(col.name.as_ref(), data_type, false)));
101+
let is_nullable = col.nullable;
102+
builder.push(Arc::new(Field::new(
103+
col.name.as_ref(),
104+
data_type,
105+
is_nullable,
106+
)));
103107
}
104108
Ok(Self {
105109
time_provider,
@@ -114,19 +118,15 @@ impl DistinctCache {
114118

115119
/// Push a [`Row`] from the WAL into the cache, if the row contains all of the cached columns.
116120
pub(crate) fn push(&mut self, row: &Row) {
117-
let mut values = Vec::with_capacity(self.column_ids.len());
118-
for id in &self.column_ids {
119-
let Some(value) = row
120-
.fields
121-
.iter()
122-
.find(|f| &f.id == id)
123-
.map(|f| Value::from(&f.value))
124-
else {
125-
// ignore the row if it does not contain all columns in the cache:
126-
return;
127-
};
128-
values.push(value);
129-
}
121+
let values = self
122+
.column_ids
123+
.iter()
124+
.map(|id| {
125+
row.fields
126+
.iter()
127+
.find_map(|f| (f.id == *id).then(|| Value::from(&f.value)))
128+
})
129+
.collect::<Vec<_>>();
130130
let mut target = &mut self.data;
131131
let mut val_iter = values.into_iter().peekable();
132132
let mut is_new = false;
@@ -294,7 +294,7 @@ impl DistinctCache {
294294
/// whose values hold the last seen time as an [`i64`] of each value, and an optional reference to
295295
/// the node in the next level of the tree.
296296
#[derive(Debug, Default)]
297-
pub(crate) struct Node(pub(crate) BTreeMap<Value, (i64, Option<Node>)>);
297+
pub(crate) struct Node(pub(crate) BTreeMap<Option<Value>, (i64, Option<Node>)>);
298298

299299
impl Node {
300300
/// Remove all elements before the given nanosecond timestamp returning `true` if the resulting
@@ -396,20 +396,30 @@ impl Node {
396396
);
397397
if count > 0 {
398398
if let Some(builder) = builder {
399-
// we are not on a terminal node in the cache, so create a block, as this value
400-
// repeated `count` times, i.e., depending on how many values come out of
401-
// subsequent nodes:
402-
let block = builder.append_block(value.0.as_bytes().into());
403-
for _ in 0..count {
404-
builder
405-
.try_append_view(block, 0u32, value.0.len() as u32)
406-
.expect("append view for known valid block, offset and length");
399+
if let Some(value) = &value {
400+
// we are not on a terminal node in the cache, so create a block, as this value
401+
// repeated `count` times, i.e., depending on how many values come out of
402+
// subsequent nodes:
403+
let block = builder.append_block(value.0.as_bytes().into());
404+
for _ in 0..count {
405+
builder
406+
.try_append_view(block, 0u32, value.0.len() as u32)
407+
.expect("append view for known valid block, offset and length");
408+
}
409+
} else {
410+
for _ in 0..count {
411+
builder.append_null();
412+
}
407413
}
408414
}
409415
total_count += count;
410416
} else if next_predicates.is_empty() && next_builders.is_empty() {
411417
if let Some(builder) = builder {
412-
builder.append_value(value.0);
418+
if let Some(value) = value {
419+
builder.append_value(value.0);
420+
} else {
421+
builder.append_null();
422+
}
413423
total_count += 1;
414424
}
415425
}
@@ -419,7 +429,12 @@ impl Node {
419429
break;
420430
}
421431
} else if let Some(builder) = builder {
422-
builder.append_value(value.0);
432+
if let Some(value) = value {
433+
// if we are at a terminal node in the cache, just append the value:
434+
builder.append_value(value.0);
435+
} else {
436+
builder.append_null();
437+
}
423438
total_count += 1;
424439
}
425440
}
@@ -433,21 +448,27 @@ impl Node {
433448
expired_time_ns: i64,
434449
predicate: &Predicate,
435450
limit: usize,
436-
) -> Vec<(Value, Option<&Node>)> {
451+
) -> Vec<(Option<Value>, Option<&Node>)> {
437452
match &predicate {
438453
Predicate::In(in_list) => in_list
439454
.iter()
440455
.filter_map(|v| {
441-
self.0.get_key_value(v).and_then(|(v, (t, n))| {
442-
(t > &expired_time_ns).then(|| (v.clone(), n.as_ref()))
443-
})
456+
self.0
457+
.get_key_value(&Some(v.clone()))
458+
.and_then(|(v, (t, n))| {
459+
(t > &expired_time_ns).then(|| (v.clone(), n.as_ref()))
460+
})
444461
})
445462
.take(limit)
446463
.collect(),
447464
Predicate::NotIn(not_in_set) => self
448465
.0
449466
.iter()
450-
.filter(|(v, (t, _))| t > &expired_time_ns && !not_in_set.contains(v))
467+
.filter(|(v, (t, _))| {
468+
t > &expired_time_ns
469+
// If the value is None or the value is not in the set, include it
470+
&& (v.is_none() || !not_in_set.contains(v.as_ref().unwrap()))
471+
})
451472
.map(|(v, (_, n))| (v.clone(), n.as_ref()))
452473
.take(limit)
453474
.collect(),

influxdb3_cache/src/distinct_cache/mod.rs

Lines changed: 74 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,25 @@ mod tests {
3434
// write some data to get a set of rows destined for the WAL, and an updated catalog:
3535
let rows = writer
3636
.write_lp_to_rows(
37-
"\
38-
cpu,region=us-east,host=a usage=100\n\
39-
cpu,region=us-east,host=b usage=100\n\
40-
cpu,region=us-west,host=c usage=100\n\
41-
cpu,region=us-west,host=d usage=100\n\
42-
cpu,region=ca-east,host=e usage=100\n\
43-
cpu,region=ca-east,host=f usage=100\n\
44-
cpu,region=ca-cent,host=g usage=100\n\
45-
cpu,region=ca-cent,host=h usage=100\n\
46-
cpu,region=eu-east,host=i usage=100\n\
47-
cpu,region=eu-east,host=j usage=100\n\
48-
cpu,region=eu-cent,host=k usage=100\n\
49-
cpu,region=eu-cent,host=l usage=100\n\
50-
",
37+
r#"\
38+
cpu,region=us-east,host=a usage=100
39+
cpu,region=us-east,host=b usage=100
40+
cpu,region=us-west,host=c usage=100
41+
cpu,region=us-west,host=d usage=100
42+
cpu,region=ca-east,host=e usage=100
43+
cpu,region=ca-east,host=f usage=100
44+
cpu,region=ca-cent,host=g usage=100
45+
cpu,region=ca-cent,host=h usage=100
46+
cpu,region=eu-east,host=i usage=100
47+
cpu,region=eu-east,host=j usage=100
48+
cpu,region=eu-cent,host=k usage=100
49+
cpu,region=eu-cent,host=l usage=100
50+
cpu,region=us-east usage=200
51+
cpu,region=us-west usage=200
52+
cpu,region=ca-cent usage=200
53+
cpu,host=m usage=300
54+
cpu,host=n usage=300
55+
"#,
5156
0,
5257
)
5358
.await;
@@ -97,6 +102,9 @@ mod tests {
97102
"+---------+------+",
98103
"| region | host |",
99104
"+---------+------+",
105+
"| | m |",
106+
"| | n |",
107+
"| ca-cent | |",
100108
"| ca-cent | g |",
101109
"| ca-cent | h |",
102110
"| ca-east | e |",
@@ -105,8 +113,10 @@ mod tests {
105113
"| eu-cent | l |",
106114
"| eu-east | i |",
107115
"| eu-east | j |",
116+
"| us-east | |",
108117
"| us-east | a |",
109118
"| us-east | b |",
119+
"| us-west | |",
110120
"| us-west | c |",
111121
"| us-west | d |",
112122
"+---------+------+",
@@ -122,6 +132,7 @@ mod tests {
122132
"+---------+------+",
123133
"| region | host |",
124134
"+---------+------+",
135+
"| ca-cent | |",
125136
"| ca-cent | g |",
126137
"| ca-cent | h |",
127138
"| ca-east | e |",
@@ -144,6 +155,21 @@ mod tests {
144155
"+---------+------+",
145156
],
146157
},
158+
TestCase {
159+
desc: "in predicate on region and not in host",
160+
predicates: create_predicate_map(&[
161+
(region_col_id, Predicate::new_in(["ca-cent", "ca-east"])),
162+
(host_col_id, Predicate::new_not_in(["g", "e", "h"])),
163+
]),
164+
expected: &[
165+
"+---------+------+",
166+
"| region | host |",
167+
"+---------+------+",
168+
"| ca-cent | |",
169+
"| ca-east | f |",
170+
"+---------+------+",
171+
],
172+
},
147173
TestCase {
148174
desc: "not in predicate on region",
149175
predicates: create_predicate_map(&[(
@@ -154,12 +180,16 @@ mod tests {
154180
"+---------+------+",
155181
"| region | host |",
156182
"+---------+------+",
183+
"| | m |",
184+
"| | n |",
157185
"| eu-cent | k |",
158186
"| eu-cent | l |",
159187
"| eu-east | i |",
160188
"| eu-east | j |",
189+
"| us-east | |",
161190
"| us-east | a |",
162191
"| us-east | b |",
192+
"| us-west | |",
163193
"| us-west | c |",
164194
"| us-west | d |",
165195
"+---------+------+",
@@ -169,16 +199,19 @@ mod tests {
169199
desc: "not in predicate on region and host",
170200
predicates: create_predicate_map(&[
171201
(region_col_id, Predicate::new_not_in(["ca-cent", "ca-east"])),
172-
(host_col_id, Predicate::new_not_in(["j", "k"])),
202+
(host_col_id, Predicate::new_not_in(["j", "k", "m"])),
173203
]),
174204
expected: &[
175205
"+---------+------+",
176206
"| region | host |",
177207
"+---------+------+",
208+
"| | n |",
178209
"| eu-cent | l |",
179210
"| eu-east | i |",
211+
"| us-east | |",
180212
"| us-east | a |",
181213
"| us-east | b |",
214+
"| us-west | |",
182215
"| us-west | c |",
183216
"| us-west | d |",
184217
"+---------+------+",
@@ -936,10 +969,8 @@ mod tests {
936969
);
937970
}
938971

939-
// NB: This test was added as part of https://github.com/influxdata/influxdb/issues/25564
940-
// If we choose to support nulls in the distinct cache then this test will fail
941972
#[test_log::test(tokio::test)]
942-
async fn test_row_with_nulls_ignored() {
973+
async fn test_row_with_nulls_not_ignored() {
943974
let writer = TestWriter::new().await;
944975
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
945976
let cat = writer.catalog();
@@ -994,26 +1025,32 @@ mod tests {
9941025
DistinctCacheFunction::new(writer.db_schema().id, Arc::clone(&distinct_cache_provider));
9951026
ctx.register_udtf(DISTINCT_CACHE_UDTF_NAME, Arc::new(distinct_func));
9961027

997-
let results = ctx
998-
.sql("select * from distinct_cache('bar')")
999-
.await
1000-
.unwrap()
1001-
.collect()
1002-
.await
1003-
.unwrap();
1028+
// Verify all values are returned with no predicate
1029+
{
1030+
let results = ctx
1031+
.sql("select * from distinct_cache('bar')")
1032+
.await
1033+
.unwrap()
1034+
.collect()
1035+
.await
1036+
.unwrap();
10041037

1005-
assert_batches_eq!(
1006-
[
1007-
"+----+----+----+",
1008-
"| t1 | t2 | t3 |",
1009-
"+----+----+----+",
1010-
"| A | A | A |",
1011-
"| A | A | B |",
1012-
"| A | B | B |",
1013-
"+----+----+----+",
1014-
],
1015-
&results
1016-
);
1038+
assert_batches_eq!(
1039+
[
1040+
"+----+----+----+",
1041+
"| t1 | t2 | t3 |",
1042+
"+----+----+----+",
1043+
"| | B | B |",
1044+
"| A | A | A |",
1045+
"| A | A | B |",
1046+
"| A | B | B |",
1047+
"| B | | B |",
1048+
"| B | A | |",
1049+
"+----+----+----+",
1050+
],
1051+
&results
1052+
);
1053+
}
10171054
}
10181055

10191056
#[test_log::test(tokio::test)]

0 commit comments

Comments
 (0)