Skip to content

Commit 4a917c5

Browse files
authored
refactor: remove variants for unused series key type (#26443)
* refactor: remove unused Key type from write buffer The write buffer had a Key variant for handling the experimental v3 write API that was phased out and removed from an earlier iteration of influxdb3. * refactor: remove key column type from last cache
1 parent 1ec063b commit 4a917c5

File tree

6 files changed

+13
-103
lines changed

6 files changed

+13
-103
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d
157157
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
158158
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
159159
executor = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
160-
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f", features = ["v3"]}
160+
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f"}
161161
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
162162
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }
163163
iox_catalog = { git = "https://github.com/influxdata/influxdb3_core", rev = "26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f" }

influxdb3_cache/src/distinct_cache/cache.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,6 @@ pub(crate) struct Value(Arc<str>);
462462
impl From<&FieldData> for Value {
463463
fn from(field: &FieldData) -> Self {
464464
match field {
465-
FieldData::Key(s) => Self(Arc::from(s.as_str())),
466465
FieldData::Tag(s) => Self(Arc::from(s.as_str())),
467466
FieldData::String(s) => Self(Arc::from(s.as_str())),
468467
FieldData::Timestamp(_)

influxdb3_cache/src/last_cache/cache.rs

Lines changed: 9 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ pub(crate) struct LastCache {
5454
pub(crate) value_columns: ValueColumnType,
5555
/// The Arrow Schema for the table that this cache is associated with
5656
pub(crate) schema: ArrowSchemaRef,
57-
/// Stores the series key for tables for ensuring non-nullability in the column buffer for
58-
/// series key columns
59-
pub(crate) series_key: HashSet<ColumnId>,
6057
/// The internal state of the cache
6158
pub(crate) state: LastCacheState,
6259
}
@@ -227,7 +224,6 @@ impl LastCache {
227224
},
228225
},
229226
schema: Arc::new(schema_builder.finish()),
230-
series_key: table_def.series_key.iter().copied().collect(),
231227
state: LastCacheState::Init,
232228
})
233229
}
@@ -250,11 +246,6 @@ impl LastCache {
250246
"provided value columns are not the same",
251247
));
252248
}
253-
if self.series_key != other.series_key {
254-
return Err(Error::cache_already_exists(
255-
"the series key is not the same",
256-
));
257-
}
258249
Ok(())
259250
}
260251

@@ -317,7 +308,6 @@ impl LastCache {
317308
self.ttl,
318309
Arc::clone(&table_def),
319310
Arc::clone(&self.key_column_ids),
320-
&self.series_key,
321311
&self.value_columns,
322312
))
323313
}
@@ -330,7 +320,6 @@ impl LastCache {
330320
self.ttl,
331321
Arc::clone(&table_def),
332322
Arc::clone(&self.key_column_ids),
333-
&self.series_key,
334323
&self.value_columns,
335324
));
336325
}
@@ -684,9 +673,7 @@ impl KeyValue {
684673
impl From<&FieldData> for KeyValue {
685674
fn from(field: &FieldData) -> Self {
686675
match field {
687-
FieldData::Key(s) | FieldData::Tag(s) | FieldData::String(s) => {
688-
Self::String(s.to_owned())
689-
}
676+
FieldData::Tag(s) | FieldData::String(s) => Self::String(s.to_owned()),
690677
FieldData::Integer(i) => Self::Int(*i),
691678
FieldData::UInteger(u) => Self::UInt(*u),
692679
FieldData::Boolean(b) => Self::Bool(*b),
@@ -733,7 +720,6 @@ impl LastCacheStore {
733720
ttl: Duration,
734721
table_def: Arc<TableDefinition>,
735722
key_column_ids: Arc<IndexSet<ColumnId>>,
736-
series_keys: &HashSet<ColumnId>,
737723
value_columns: &ValueColumnType,
738724
) -> Self {
739725
let (cache, value_column_ids) = match value_columns {
@@ -742,16 +728,7 @@ impl LastCacheStore {
742728
.columns
743729
.iter()
744730
.filter(|&(col_id, _)| (!key_column_ids.contains(col_id)))
745-
.map(|(col_id, col_def)| {
746-
(
747-
*col_id,
748-
CacheColumn::new(
749-
col_def.data_type,
750-
count,
751-
series_keys.contains(col_id),
752-
),
753-
)
754-
})
731+
.map(|(col_id, col_def)| (*col_id, CacheColumn::new(col_def.data_type, count)))
755732
.collect();
756733
(cache, None)
757734
}
@@ -763,16 +740,7 @@ impl LastCacheStore {
763740
.column_definition_by_id(id)
764741
.expect("valid column id")
765742
})
766-
.map(|col_def| {
767-
(
768-
col_def.id,
769-
CacheColumn::new(
770-
col_def.data_type,
771-
count,
772-
series_keys.contains(&col_def.id),
773-
),
774-
)
775-
})
743+
.map(|col_def| (col_def.id, CacheColumn::new(col_def.data_type, count)))
776744
.collect();
777745
(cache, Some(columns.clone()))
778746
}
@@ -832,7 +800,7 @@ impl LastCacheStore {
832800
// In this case, there is not an entry for the field in the cache, so if the
833801
// value is not one of the key columns, then it is a new field being added.
834802
let col = self.cache.entry(field.id).or_insert_with(|| {
835-
CacheColumn::new(data_type_from_buffer_field(field), self.count, false)
803+
CacheColumn::new(data_type_from_buffer_field(field), self.count)
836804
});
837805
// Back-fill the new cache entry with nulls, then push the new value:
838806
for _ in 0..starting_cache_size {
@@ -985,10 +953,10 @@ pub(crate) struct CacheColumn {
985953

986954
impl CacheColumn {
987955
/// Create a new [`CacheColumn`] for the given arrow [`DataType`] and size
988-
fn new(data_type: InfluxColumnType, size: usize, is_series_key: bool) -> Self {
956+
fn new(data_type: InfluxColumnType, size: usize) -> Self {
989957
Self {
990958
size,
991-
data: CacheColumnData::new(data_type, size, is_series_key),
959+
data: CacheColumnData::new(data_type, size),
992960
}
993961
}
994962

@@ -1022,21 +990,14 @@ enum CacheColumnData {
1022990
String(VecDeque<Option<String>>),
1023991
Bool(VecDeque<Option<bool>>),
1024992
Tag(VecDeque<Option<String>>),
1025-
Key(VecDeque<String>),
1026993
Time(VecDeque<i64>),
1027994
}
1028995

1029996
impl CacheColumnData {
1030997
/// Create a new [`CacheColumnData`]
1031-
fn new(data_type: InfluxColumnType, size: usize, is_series_key: bool) -> Self {
998+
fn new(data_type: InfluxColumnType, size: usize) -> Self {
1032999
match data_type {
1033-
InfluxColumnType::Tag => {
1034-
if is_series_key {
1035-
Self::Key(VecDeque::with_capacity(size))
1036-
} else {
1037-
Self::Tag(VecDeque::with_capacity(size))
1038-
}
1039-
}
1000+
InfluxColumnType::Tag => Self::Tag(VecDeque::with_capacity(size)),
10401001
InfluxColumnType::Field(field) => match field {
10411002
InfluxFieldType::Float => Self::F64(VecDeque::with_capacity(size)),
10421003
InfluxFieldType::Integer => Self::I64(VecDeque::with_capacity(size)),
@@ -1057,7 +1018,6 @@ impl CacheColumnData {
10571018
CacheColumnData::String(buf) => buf.len(),
10581019
CacheColumnData::Bool(buf) => buf.len(),
10591020
CacheColumnData::Tag(buf) => buf.len(),
1060-
CacheColumnData::Key(buf) => buf.len(),
10611021
CacheColumnData::Time(buf) => buf.len(),
10621022
}
10631023
}
@@ -1066,8 +1026,6 @@ impl CacheColumnData {
10661026
fn push_front(&mut self, field_data: &FieldData) {
10671027
match (field_data, self) {
10681028
(FieldData::Timestamp(val), CacheColumnData::Time(buf)) => buf.push_front(*val),
1069-
(FieldData::Key(val), CacheColumnData::Key(buf)) => buf.push_front(val.to_owned()),
1070-
(FieldData::Tag(val), CacheColumnData::Key(buf)) => buf.push_front(val.to_owned()),
10711029
(FieldData::Tag(val), CacheColumnData::Tag(buf)) => {
10721030
buf.push_front(Some(val.to_owned()))
10731031
}
@@ -1090,7 +1048,6 @@ impl CacheColumnData {
10901048
CacheColumnData::String(buf) => buf.push_front(None),
10911049
CacheColumnData::Bool(buf) => buf.push_front(None),
10921050
CacheColumnData::Tag(buf) => buf.push_front(None),
1093-
CacheColumnData::Key(_) => panic!("pushed null value to series key column in cache"),
10941051
CacheColumnData::Time(_) => panic!("pushed null value to time column in cache"),
10951052
}
10961053
}
@@ -1151,14 +1108,6 @@ impl CacheColumnData {
11511108
});
11521109
Arc::new(b.finish())
11531110
}
1154-
CacheColumnData::Key(buf) => {
1155-
let mut b: GenericByteDictionaryBuilder<Int32Type, GenericStringType<i32>> =
1156-
StringDictionaryBuilder::new();
1157-
buf.iter().take(n_non_expired).for_each(|val| {
1158-
b.append_value(val);
1159-
});
1160-
Arc::new(b.finish())
1161-
}
11621111
CacheColumnData::Time(buf) => {
11631112
let mut b = TimestampNanosecondBuilder::new();
11641113
buf.iter()
@@ -1177,7 +1126,6 @@ impl CacheColumnData {
11771126
CacheColumnData::String(buf) => buf.truncate(len),
11781127
CacheColumnData::Bool(buf) => buf.truncate(len),
11791128
CacheColumnData::Tag(buf) => buf.truncate(len),
1180-
CacheColumnData::Key(buf) => buf.truncate(len),
11811129
CacheColumnData::Time(buf) => buf.truncate(len),
11821130
}
11831131
}
@@ -1186,7 +1134,7 @@ impl CacheColumnData {
11861134
fn data_type_from_buffer_field(field: &Field) -> InfluxColumnType {
11871135
match field.value {
11881136
FieldData::Timestamp(_) => InfluxColumnType::Timestamp,
1189-
FieldData::Key(_) | FieldData::Tag(_) => InfluxColumnType::Tag,
1137+
FieldData::Tag(_) => InfluxColumnType::Tag,
11901138
FieldData::String(_) => InfluxColumnType::Field(InfluxFieldType::String),
11911139
FieldData::Integer(_) => InfluxColumnType::Field(InfluxFieldType::Integer),
11921140
FieldData::UInteger(_) => InfluxColumnType::Field(InfluxFieldType::UInteger),

influxdb3_py_api/src/system_py.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -560,11 +560,6 @@ pub fn execute_python_with_batch(
560560
.set_item(field_name.as_ref(), t.as_str())
561561
.context("failed to set tag field")?;
562562
}
563-
FieldData::Key(k) => {
564-
py_row
565-
.set_item(field_name.as_ref(), k.as_str())
566-
.context("failed to set key field")?;
567-
}
568563
FieldData::Timestamp(t) => {
569564
py_row
570565
.set_item(field_name.as_ref(), t)

influxdb3_wal/src/lib.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use data_types::Timestamp;
1313
use hashbrown::HashMap;
1414
use indexmap::IndexMap;
1515
use influxdb_line_protocol::FieldValue;
16-
use influxdb_line_protocol::v3::SeriesValue;
1716
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
1817
use influxdb3_shutdown::ShutdownToken;
1918
use iox_time::Time;
@@ -388,7 +387,6 @@ pub struct Row {
388387
#[derive(Clone, Debug, Serialize, Deserialize)]
389388
pub enum FieldData {
390389
Timestamp(i64),
391-
Key(String),
392390
Tag(String),
393391
String(String),
394392
Integer(i64),
@@ -402,7 +400,6 @@ impl PartialEq for FieldData {
402400
match (self, other) {
403401
(FieldData::Timestamp(a), FieldData::Timestamp(b)) => a == b,
404402
(FieldData::Tag(a), FieldData::Tag(b)) => a == b,
405-
(FieldData::Key(a), FieldData::Key(b)) => a == b,
406403
(FieldData::String(a), FieldData::String(b)) => a == b,
407404
(FieldData::Integer(a), FieldData::Integer(b)) => a == b,
408405
(FieldData::UInteger(a), FieldData::UInteger(b)) => a == b,
@@ -415,14 +412,6 @@ impl PartialEq for FieldData {
415412

416413
impl Eq for FieldData {}
417414

418-
impl<'a> From<&SeriesValue<'a>> for FieldData {
419-
fn from(sk: &SeriesValue<'a>) -> Self {
420-
match sk {
421-
SeriesValue::String(s) => Self::Key(s.to_string()),
422-
}
423-
}
424-
}
425-
426415
impl<'a> From<FieldValue<'a>> for FieldData {
427416
fn from(value: FieldValue<'a>) -> Self {
428417
match value {

influxdb3_write/src/write_buffer/table_buffer.rs

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -264,22 +264,6 @@ impl MutableTableChunk {
264264
panic!("unexpected field type");
265265
}
266266
}
267-
FieldData::Key(v) => {
268-
if let Entry::Vacant(e) = self.data.entry(f.id) {
269-
let key_builder = StringDictionaryBuilder::new();
270-
if self.row_count > 0 {
271-
panic!(
272-
"series key columns must be passed in the very first write for a table"
273-
);
274-
}
275-
e.insert(Builder::Key(key_builder));
276-
}
277-
let b = self.data.get_mut(&f.id).expect("key builder should exist");
278-
let Builder::Key(b) = b else {
279-
panic!("unexpected field type");
280-
};
281-
b.append_value(v);
282-
}
283267
FieldData::String(v) => {
284268
let b = self.data.entry(f.id).or_insert_with(|| {
285269
let mut string_builder = StringBuilder::new();
@@ -359,7 +343,7 @@ impl MutableTableChunk {
359343
Builder::I64(b) => b.append_null(),
360344
Builder::U64(b) => b.append_null(),
361345
Builder::String(b) => b.append_null(),
362-
Builder::Tag(b) | Builder::Key(b) => {
346+
Builder::Tag(b) => {
363347
// NOTE: we use an empty string "" for tags that are omitted
364348
b.append_value("");
365349
}
@@ -513,9 +497,6 @@ pub(super) enum Builder {
513497
U64(UInt64Builder),
514498
String(StringBuilder),
515499
Tag(StringDictionaryBuilder<Int32Type>),
516-
// For now we use a string dict to be consistent with tags, but in future
517-
// keys, like fields may support different data types.
518-
Key(StringDictionaryBuilder<Int32Type>),
519500
Time(TimestampNanosecondBuilder),
520501
}
521502

@@ -528,7 +509,6 @@ impl Builder {
528509
Self::U64(b) => Arc::new(b.finish_cloned()),
529510
Self::String(b) => Arc::new(b.finish_cloned()),
530511
Self::Tag(b) => Arc::new(b.finish_cloned()),
531-
Self::Key(b) => Arc::new(b.finish_cloned()),
532512
Self::Time(b) => Arc::new(b.finish_cloned()),
533513
}
534514
}
@@ -556,7 +536,6 @@ impl Builder {
556536
Arc::new(b.finish()),
557537
),
558538
Self::Tag(mut b) => (InfluxColumnType::Tag, Arc::new(b.finish())),
559-
Self::Key(mut b) => (InfluxColumnType::Tag, Arc::new(b.finish())),
560539
Self::Time(mut b) => (InfluxColumnType::Timestamp, Arc::new(b.finish())),
561540
}
562541
}
@@ -578,7 +557,7 @@ impl Builder {
578557
+ b.offsets_slice().len()
579558
+ b.validity_slice().map(|s| s.len()).unwrap_or(0)
580559
}
581-
Self::Tag(b) | Self::Key(b) => {
560+
Self::Tag(b) => {
582561
let b = b.finish_cloned();
583562
b.keys().len() * size_of::<i32>() + b.values().get_array_memory_size()
584563
}
@@ -724,7 +703,7 @@ mod tests {
724703
table_buffer.buffer_chunk(0, &rows);
725704

726705
let size = table_buffer.computed_size();
727-
assert_eq!(size, 17763);
706+
assert_eq!(size, 17739);
728707
}
729708

730709
#[test]

0 commit comments

Comments
 (0)