Skip to content

Commit 4524579

Browse files
authored
chore(cubestore): Work with parse_timestamp in kafka filters (#6101)
1 parent 1c1cade commit 4524579

File tree

2 files changed

+159
-9
lines changed

2 files changed

+159
-9
lines changed

rust/cubestore/cubestore/src/streaming/kafka.rs

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ use crate::streaming::{parse_json_payload_and_key, StreamingSource};
77
use crate::table::{Row, TableValue};
88
use crate::CubeError;
99
use arrow::array::ArrayRef;
10+
use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
1011
use arrow::record_batch::RecordBatch;
11-
use arrow::{datatypes::Schema, datatypes::SchemaRef};
1212
use async_std::stream;
1313
use async_trait::async_trait;
14+
use chrono::DateTime;
1415
use datafusion::catalog::TableReference;
1516
use datafusion::cube_ext;
1617
use datafusion::datasource::datasource::Statistics;
@@ -19,11 +20,14 @@ use datafusion::error::DataFusionError;
1920
use datafusion::logical_plan::Expr as DExpr;
2021
use datafusion::logical_plan::LogicalPlan;
2122
use datafusion::physical_plan::empty::EmptyExec;
23+
use datafusion::physical_plan::functions::Signature;
2224
use datafusion::physical_plan::memory::MemoryExec;
2325
use datafusion::physical_plan::udaf::AggregateUDF;
2426
use datafusion::physical_plan::udf::ScalarUDF;
27+
use datafusion::physical_plan::ColumnarValue;
2528
use datafusion::physical_plan::{collect, ExecutionPlan};
2629
use datafusion::prelude::ExecutionContext;
30+
use datafusion::scalar::ScalarValue;
2731
use datafusion::sql::parser::Statement as DFStatement;
2832
use datafusion::sql::planner::{ContextProvider, SqlToRel};
2933
use futures::Stream;
@@ -85,6 +89,7 @@ impl KafkaStreamingSource {
8589
"Error while parsing `select_statement`: {}. Select statement ignored",
8690
e
8791
);
92+
8893
None
8994
}
9095
}
@@ -189,8 +194,79 @@ impl ContextProvider for TopicTableProvider {
189194
}
190195
}
191196

192-
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
193-
None
197+
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
198+
match name {
199+
"parse_timestamp" | "PARSE_TIMESTAMP" => {
200+
let meta = ScalarUDF {
201+
name: "PARSE_TIMESTAMP".to_string(),
202+
signature: Signature::Exact(vec![
203+
DataType::Utf8,
204+
DataType::Utf8,
205+
DataType::Utf8,
206+
]),
207+
return_type: Arc::new(|_| {
208+
Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)))
209+
}),
210+
211+
fun: Arc::new(move |inputs| {
212+
if inputs.len() != 3 {
213+
return Err(DataFusionError::Execution(
214+
"Expected 3 arguments in PARSE_TIMESTAMP".to_string(),
215+
));
216+
}
217+
match &inputs[1] {
218+
ColumnarValue::Scalar(ScalarValue::Utf8(Some(_))) => {}
219+
_ => {
220+
return Err(DataFusionError::Execution(
221+
"Only scalar arguments are supported in PARSE_TIMESTAMP"
222+
.to_string(),
223+
));
224+
}
225+
};
226+
match &inputs[2] {
227+
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => {
228+
if s.to_lowercase() != "utc" {
229+
return Err(DataFusionError::Execution(
230+
"Only UTC timezone supported in PARSE_TIMESTAMP"
231+
.to_string(),
232+
));
233+
}
234+
}
235+
_ => {
236+
return Err(DataFusionError::Execution(
237+
"Only scalar arguments are supported in PARSE_TIMESTAMP"
238+
.to_string(),
239+
));
240+
}
241+
};
242+
match &inputs[0] {
243+
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => {
244+
let ts = match DateTime::parse_from_rfc3339(s) {
245+
Ok(ts) => ts,
246+
Err(e) => {
247+
return Err(DataFusionError::Execution(format!(
248+
"Error while parsing timestamp: {}",
249+
e
250+
)));
251+
}
252+
};
253+
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
254+
Some(ts.timestamp_nanos()),
255+
)))
256+
}
257+
_ => {
258+
return Err(DataFusionError::Execution(
259+
"Only scalar arguments are supported in PARSE_TIMESTAMP"
260+
.to_string(),
261+
));
262+
}
263+
}
264+
}),
265+
};
266+
Some(Arc::new(meta))
267+
}
268+
_ => None,
269+
}
194270
}
195271

196272
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {

rust/cubestore/cubestore/src/streaming/mod.rs

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ mod tests {
911911
use crate::streaming::kafka::KafkaMessage;
912912
use crate::streaming::{KSqlQuery, KSqlQuerySchema, KsqlClient, KsqlResponse};
913913
use crate::TableId;
914+
use chrono::{SecondsFormat, TimeZone, Utc};
914915
use sqlparser::ast::{BinaryOperator, Expr, SetExpr, Statement, Value};
915916
use sqlparser::parser::Parser;
916917
use sqlparser::tokenizer::Tokenizer;
@@ -1069,14 +1070,18 @@ mod tests {
10691070
continue;
10701071
}
10711072

1073+
let ts_string = Utc
1074+
.timestamp_opt(i, 0)
1075+
.unwrap()
1076+
.to_rfc3339_opts(SecondsFormat::Millis, true);
10721077
messages.push(KafkaMessage::MockMessage {
10731078
// Keys in kafka can have suffixes which contain arbitrary metadata like window size
10741079
key: Some(format!(
10751080
"{}foo",
10761081
serde_json::json!({ "MESSAGEID": i.to_string() }).to_string()
10771082
)),
10781083
payload: Some(
1079-
serde_json::json!({ "ANONYMOUSID": j.to_string(), "TIMESTAMP": i })
1084+
serde_json::json!({ "ANONYMOUSID": j.to_string(), "FILTER_ID":i, "TIMESTAMP": ts_string })
10801085
.to_string(),
10811086
),
10821087
offset: i,
@@ -1397,6 +1402,7 @@ mod tests {
13971402
})
13981403
.await
13991404
}, async move |services| {
1405+
//PARSE_TIMESTAMP('2023-01-24T23:59:59.999Z', 'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC')
14001406
let service = services.sql_service;
14011407

14021408
let _ = service.exec_query("CREATE SCHEMA test").await.unwrap();
@@ -1409,9 +1415,9 @@ mod tests {
14091415
let listener = services.cluster.job_result_listener();
14101416

14111417
let _ = service
1412-
.exec_query("CREATE TABLE test.events_by_type_1 (`ANONYMOUSID` text, `MESSAGEID` text, `TIMESTAMP` int) \
1413-
WITH (stream_offset = 'earliest', select_statement = 'SELECT * FROM EVENTS_BY_TYPE WHERE TIMESTAMP >= 10000 and TIMESTAMP < 14000') \
1414-
unique key (`ANONYMOUSID`, `MESSAGEID`, `TIMESTAMP`) INDEX by_anonymous(`ANONYMOUSID`, `TIMESTAMP`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'")
1418+
.exec_query("CREATE TABLE test.events_by_type_1 (`ANONYMOUSID` text, `MESSAGEID` text, `FILTER_ID` int) \
1419+
WITH (stream_offset = 'earliest', select_statement = 'SELECT * FROM EVENTS_BY_TYPE WHERE FILTER_ID >= 10000 and FILTER_ID < 14000') \
1420+
unique key (`ANONYMOUSID`, `MESSAGEID`, `FILTER_ID`) INDEX by_anonymous(`ANONYMOUSID`, `FILTER_ID`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'")
14151421
.await
14161422
.unwrap();
14171423

@@ -1428,17 +1434,85 @@ mod tests {
14281434
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(8000)])]);
14291435

14301436
let result = service
1431-
.exec_query("SELECT min(TIMESTAMP) FROM test.events_by_type_1 ")
1437+
.exec_query("SELECT min(FILTER_ID) FROM test.events_by_type_1 ")
14321438
.await
14331439
.unwrap();
14341440
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(10000)])]);
14351441

14361442
let result = service
1437-
.exec_query("SELECT max(TIMESTAMP) FROM test.events_by_type_1 ")
1443+
.exec_query("SELECT max(FILTER_ID) FROM test.events_by_type_1 ")
14381444
.await
14391445
.unwrap();
14401446
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(13999)])]);
14411447
})
14421448
.await;
14431449
}
1450+
1451+
#[tokio::test]
1452+
async fn streaming_filter_kafka_parse_timestamp() {
1453+
Config::test("streaming_filter_kafka_parse_timestamp").update_config(|mut c| {
1454+
c.stream_replay_check_interval_secs = 1;
1455+
c.compaction_in_memory_chunks_max_lifetime_threshold = 8;
1456+
c.partition_split_threshold = 1000000;
1457+
c.max_partition_split_threshold = 1000000;
1458+
c.compaction_chunks_count_threshold = 100;
1459+
c.compaction_chunks_total_size_threshold = 100000;
1460+
c.stale_stream_timeout = 1;
1461+
c.wal_split_threshold = 16384;
1462+
c
1463+
}).start_with_injector_override(async move |injector| {
1464+
injector.register_typed::<dyn KafkaClientService, _, _, _>(async move |_| {
1465+
Arc::new(MockKafkaClient)
1466+
})
1467+
.await
1468+
}, async move |services| {
1469+
//PARSE_TIMESTAMP('2023-01-24T23:59:59.999Z', 'yyyy-MM-dd''T''HH:mm:ss.SSSX', 'UTC')
1470+
let service = services.sql_service;
1471+
1472+
let _ = service.exec_query("CREATE SCHEMA test").await.unwrap();
1473+
1474+
service
1475+
.exec_query("CREATE SOURCE OR UPDATE kafka AS 'kafka' VALUES (user = 'foo', password = 'bar', host = 'localhost:9092')")
1476+
.await
1477+
.unwrap();
1478+
1479+
let listener = services.cluster.job_result_listener();
1480+
1481+
let _ = service
1482+
.exec_query("CREATE TABLE test.events_by_type_1 (`ANONYMOUSID` text, `MESSAGEID` text, `FILTER_ID` int, `TIMESTAMP` timestamp) \
1483+
WITH (stream_offset = 'earliest', select_statement = 'SELECT * FROM EVENTS_BY_TYPE \
1484+
WHERE TIMESTAMP >= PARSE_TIMESTAMP(\\'1970-01-01T10:00:00.000Z\\', \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') \
1485+
AND
1486+
TIMESTAMP < PARSE_TIMESTAMP(\\'1970-01-01T11:10:00.000Z\\', \\'yyyy-MM-dd\\'\\'T\\'\\'HH:mm:ss.SSSX\\', \\'UTC\\') \
1487+
') \
1488+
unique key (`ANONYMOUSID`, `MESSAGEID`, `FILTER_ID`, `TIMESTAMP`) INDEX by_anonymous(`ANONYMOUSID`, `TIMESTAMP`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'")
1489+
.await
1490+
.unwrap();
1491+
1492+
let wait = listener.wait_for_job_results(vec![
1493+
(RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/0".to_string())),
1494+
(RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/1".to_string())),
1495+
]);
1496+
timeout(Duration::from_secs(15), wait).await.unwrap().unwrap();
1497+
1498+
let result = service
1499+
.exec_query("SELECT COUNT(*) FROM test.events_by_type_1")
1500+
.await
1501+
.unwrap();
1502+
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(8400)])]);
1503+
1504+
let result = service
1505+
.exec_query("SELECT min(FILTER_ID) FROM test.events_by_type_1 ")
1506+
.await
1507+
.unwrap();
1508+
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(10 * 3600)])]);
1509+
1510+
let result = service
1511+
.exec_query("SELECT max(FILTER_ID) FROM test.events_by_type_1 ")
1512+
.await
1513+
.unwrap();
1514+
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(11 * 3600 + 600 - 1)])]);
1515+
})
1516+
.await;
1517+
}
14441518
}

0 commit comments

Comments
 (0)