Skip to content

Commit b1acefe

Browse files
committed
WIP: Logging of table columns and source columns
1 parent a9c23c5 commit b1acefe

File tree

2 files changed

+14
-5
lines changed

2 files changed

+14
-5
lines changed

rust/cubestore/cubestore/src/store/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1446,6 +1446,7 @@ impl ChunkStore {
14461446
in_memory: bool,
14471447
) -> Result<Vec<ChunkUploadJob>, CubeError> {
14481448
let mut rows = rows.0;
1449+
log::debug!("build_index_chunks rows.len(): {}, columns: {:?}", rows.len(), columns);
14491450
let mut futures = Vec::new();
14501451
for index in indexes.iter() {
14511452
let index_columns = index.get_row().columns();
@@ -1492,7 +1493,7 @@ fn remap_columns(
14921493
old_columns: &[Column],
14931494
new_columns: &[Column],
14941495
) -> Result<Vec<ArrayRef>, CubeError> {
1495-
assert_eq!(old_columns.len(), old.len());
1496+
assert_eq!(old_columns.len(), old.len(), "old_columns: {}", old_columns.iter().map(|c| c.get_name()).join(", "));
14961497
let mut new = Vec::with_capacity(new_columns.len());
14971498
for new_column in new_columns.iter() {
14981499
let old_column = old_columns

rust/cubestore/cubestore/src/streaming/mod.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ impl StreamingServiceImpl {
130130
user,
131131
password,
132132
url,
133-
} => Ok(Arc::new(KSqlStreamingSource {
133+
} => {
134+
log::debug!("source_by: KSql: table columns: {:?}", table.get_row().get_columns());
135+
Ok(Arc::new(KSqlStreamingSource {
134136
user: user.clone(),
135137
password: password.clone(),
136138
table: table_name,
@@ -143,13 +145,16 @@ impl StreamingServiceImpl {
143145
columns: table.get_row().get_columns().clone(),
144146
seq_column_index: seq_column.get_index(),
145147

146-
})),
148+
})) },
147149
SourceCredentials::Kafka {
148150
user,
149151
password,
150152
host,
151153
use_ssl,
152-
} => Ok(Arc::new(KafkaStreamingSource::try_new(
154+
} => {
155+
log::debug!("source_by: Kafka: table columns: {:?}, source columns: {:?}", table.get_row().get_columns(), table.get_row().source_columns());
156+
157+
Ok(Arc::new(KafkaStreamingSource::try_new(
153158
table.get_id(),
154159
table.get_row().unique_key_columns()
155160
.ok_or_else(|| CubeError::internal(format!("Streaming table without unique key columns: {:?}", table)))?
@@ -170,7 +175,7 @@ impl StreamingServiceImpl {
170175
*use_ssl,
171176
trace_obj,
172177
self.metadata_cache_factory.clone(),
173-
).await?)),
178+
).await?)) },
174179
}
175180
}
176181

@@ -317,6 +322,7 @@ impl StreamingService for StreamingServiceImpl {
317322
let rows = new_rows;
318323
debug!("Received {} rows for {}", rows.len(), location);
319324
let table_cols = source.source_columns().as_slice();
325+
log::debug!("stream_table: table_cols (source_columns): {:?}, table columns: {:?}", table_cols, table.get_row().get_columns());
320326
let mut builders = create_array_builders(table_cols);
321327

322328
let mut start_seq: Option<i64> = None;
@@ -350,7 +356,9 @@ impl StreamingService for StreamingServiceImpl {
350356
.create_replay_handle(table.get_id(), location_index, seq_pointer)
351357
.await?;
352358
let data = finish(builders);
359+
log::debug!("stream_table: after finish data.len(): {}, table columns: {:?}", data.len(), table.get_row().get_columns());
353360
let data = source.apply_post_processing(data).await?;
361+
log::debug!("stream_table: after apply_post_processing data.len(): {}, table columns: {:?}", data.len(), table.get_row().get_columns());
354362

355363
let partition_started_at = SystemTime::now();
356364
let new_chunks = self

0 commit comments

Comments
 (0)