Skip to content

Commit 4124715

Browse files
committed
WIP: logging of columns, round three
1 parent 73d617e commit 4124715

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

rust/cubestore/cubestore/src/import/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,7 @@ impl ImportService for ImportServiceImpl {
827827
)));
828828
}
829829
if Table::is_stream_location(location) {
830+
log::debug!("ImportService::import_table_part to stream table for table id {}, location {:?}", table.get_id(), location);
830831
self.streaming_service.stream_table(table, location).await?;
831832
} else {
832833
self.do_import(&table, *format, location, data_loaded_size.clone())
@@ -844,6 +845,7 @@ impl ImportService for ImportServiceImpl {
844845
) -> Result<(), CubeError> {
845846
let table = self.meta_store.get_table_by_id(table_id).await?;
846847
if Table::is_stream_location(location) {
848+
log::debug!("ImportService::validate_table_location for table id {}, location {:?}", table.get_id(), location);
847849
self.streaming_service
848850
.validate_table_location(table, location)
849851
.await?;

rust/cubestore/cubestore/src/streaming/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl StreamingServiceImpl {
131131
password,
132132
url,
133133
} => {
134-
log::debug!("source_by: KSql: table columns: {:?}", table.get_row().get_columns());
134+
log::debug!("source_by: KSql: table id: {}, table columns: [{}]", table.get_id(), table.get_row().get_columns().iter().map(|c| c.get_name()).join(", "));
135135
Ok(Arc::new(KSqlStreamingSource {
136136
user: user.clone(),
137137
password: password.clone(),
@@ -152,7 +152,7 @@ impl StreamingServiceImpl {
152152
host,
153153
use_ssl,
154154
} => {
155-
log::debug!("source_by: Kafka: table columns: {:?}, source columns: {:?}", table.get_row().get_columns(), table.get_row().source_columns());
155+
log::debug!("source_by: Kafka: table id: {}, table columns: [{}], source columns: {:?}", table.get_id(), table.get_row().get_columns().iter().map(|c| c.get_name()).join(", "), table.get_row().source_columns().as_ref().map(|cvec| cvec.iter().map(|c| c.get_name()).join(", ")));
156156

157157
Ok(Arc::new(KafkaStreamingSource::try_new(
158158
table.get_id(),
@@ -242,7 +242,7 @@ impl StreamingService for StreamingServiceImpl {
242242
.meta_store
243243
.get_trace_obj_by_table_id(table.get_id())
244244
.await?;
245-
245+
log::debug!("StreamingServiceImpl::stream_table for table id {}", table.get_id());
246246
let source = self.source_by(&table, location, trace_obj).await?;
247247
let seq_column = table.get_row().seq_column().ok_or_else(|| {
248248
CubeError::internal(format!(
@@ -425,6 +425,7 @@ impl StreamingService for StreamingServiceImpl {
425425
table: IdRow<Table>,
426426
location: &str,
427427
) -> Result<(), CubeError> {
428+
log::debug!("StreamingServiceImpl::validate_location for table id {}", table.get_id());
428429
let source = self.source_by(&table, location, None).await?;
429430
source.validate_table_location()?;
430431
Ok(())

0 commit comments

Comments
 (0)