Skip to content

Commit 3b1b1ab

Browse files
authored
feat(cubestore): Filter for data from kafka streams (#6054)
1 parent 848fff3 commit 3b1b1ab

File tree

2 files changed

+257
-5
lines changed

2 files changed

+257
-5
lines changed

rust/cubestore/cubestore/src/streaming/kafka.rs

Lines changed: 182 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,30 @@ use crate::config::injection::DIService;
22
use crate::config::ConfigObj;
33
use crate::metastore::table::StreamOffset;
44
use crate::metastore::Column;
5+
use crate::sql::MySqlDialectWithBackTicks;
56
use crate::streaming::{parse_json_payload_and_key, StreamingSource};
67
use crate::table::{Row, TableValue};
78
use crate::CubeError;
9+
use arrow::array::ArrayRef;
10+
use arrow::record_batch::RecordBatch;
11+
use arrow::{datatypes::Schema, datatypes::SchemaRef};
812
use async_std::stream;
913
use async_trait::async_trait;
14+
use datafusion::catalog::TableReference;
1015
use datafusion::cube_ext;
16+
use datafusion::datasource::datasource::Statistics;
17+
use datafusion::datasource::TableProvider;
18+
use datafusion::error::DataFusionError;
19+
use datafusion::logical_plan::Expr as DExpr;
20+
use datafusion::logical_plan::LogicalPlan;
21+
use datafusion::physical_plan::empty::EmptyExec;
22+
use datafusion::physical_plan::memory::MemoryExec;
23+
use datafusion::physical_plan::udaf::AggregateUDF;
24+
use datafusion::physical_plan::udf::ScalarUDF;
25+
use datafusion::physical_plan::{collect, ExecutionPlan};
26+
use datafusion::prelude::ExecutionContext;
27+
use datafusion::sql::parser::Statement as DFStatement;
28+
use datafusion::sql::planner::{ContextProvider, SqlToRel};
1129
use futures::Stream;
1230
use json::object::Object;
1331
use json::JsonValue;
@@ -16,6 +34,10 @@ use rdkafka::error::KafkaResult;
1634
use rdkafka::message::BorrowedMessage;
1735
use rdkafka::util::Timeout;
1836
use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
37+
use sqlparser::ast::{Query, SetExpr, Statement};
38+
use sqlparser::parser::Parser;
39+
use sqlparser::tokenizer::Tokenizer;
40+
use std::any::Any;
1941
use std::pin::Pin;
2042
use std::sync::Arc;
2143
use std::time::Duration;
@@ -28,18 +50,18 @@ pub struct KafkaStreamingSource {
2850
password: Option<String>,
2951
topic: String,
3052
host: String,
31-
// TODO Support parsing of filters and applying before insert
32-
_select_statement: Option<String>,
3353
offset: Option<StreamOffset>,
3454
partition: usize,
3555
kafka_client: Arc<dyn KafkaClientService>,
3656
use_ssl: bool,
57+
post_filter: Option<Arc<dyn ExecutionPlan>>,
3758
}
3859

3960
impl KafkaStreamingSource {
4061
pub fn new(
4162
table_id: u64,
4263
unique_key_columns: Vec<Column>,
64+
columns: Vec<Column>,
4365
user: Option<String>,
4466
password: Option<String>,
4567
topic: String,
@@ -50,18 +72,156 @@ impl KafkaStreamingSource {
5072
kafka_client: Arc<dyn KafkaClientService>,
5173
use_ssl: bool,
5274
) -> Self {
75+
let post_filter = if let Some(select_statement) = select_statement {
76+
let planner = KafkaFilterPlanner {
77+
topic: topic.clone(),
78+
columns,
79+
};
80+
match planner.parse_select_statement(select_statement.clone()) {
81+
Ok(p) => p,
82+
Err(e) => {
83+
//FIXME May be we should stop execution here
84+
log::error!(
85+
"Error while parsing `select_statement`: {}. Select statement ignored",
86+
e
87+
);
88+
None
89+
}
90+
}
91+
} else {
92+
None
93+
};
94+
5395
KafkaStreamingSource {
5496
table_id,
5597
unique_key_columns,
5698
user,
5799
password,
58100
topic,
59101
host,
60-
_select_statement: select_statement,
61102
offset,
62103
partition,
63104
kafka_client,
64105
use_ssl,
106+
post_filter,
107+
}
108+
}
109+
}
110+
111+
pub struct KafkaFilterPlanner {
112+
topic: String,
113+
columns: Vec<Column>,
114+
}
115+
116+
impl KafkaFilterPlanner {
117+
fn parse_select_statement(
118+
&self,
119+
select_statement: String,
120+
) -> Result<Option<Arc<dyn ExecutionPlan>>, CubeError> {
121+
let dialect = &MySqlDialectWithBackTicks {};
122+
let mut tokenizer = Tokenizer::new(dialect, &select_statement);
123+
let tokens = tokenizer.tokenize().unwrap();
124+
let statement = Parser::new(tokens, dialect).parse_statement()?;
125+
126+
match &statement {
127+
Statement::Query(box Query {
128+
body: SetExpr::Select(s),
129+
..
130+
}) => {
131+
if s.selection.is_none() {
132+
return Ok(None);
133+
}
134+
let provider = TopicTableProvider::new(self.topic.clone(), &self.columns);
135+
let query_planner = SqlToRel::new(&provider);
136+
let logical_plan =
137+
query_planner.statement_to_plan(&DFStatement::Statement(statement.clone()))?;
138+
let physical_filter = Self::make_physical_filter(&logical_plan)?;
139+
Ok(physical_filter)
140+
}
141+
_ => Err(CubeError::user(format!(
142+
"{} is not valid select query",
143+
select_statement
144+
))),
145+
}
146+
}
147+
148+
/// Only Projection > Filter > TableScan plans are allowed
149+
fn make_physical_filter(
150+
plan: &LogicalPlan,
151+
) -> Result<Option<Arc<dyn ExecutionPlan>>, CubeError> {
152+
match plan {
153+
LogicalPlan::Projection { input, .. } => match input.as_ref() {
154+
filter_plan @ LogicalPlan::Filter { input, .. } => match input.as_ref() {
155+
LogicalPlan::TableScan { .. } => {
156+
let plan_ctx = Arc::new(ExecutionContext::new());
157+
let phys_plan = plan_ctx.create_physical_plan(&filter_plan)?;
158+
Ok(Some(phys_plan))
159+
}
160+
_ => Ok(None),
161+
},
162+
_ => Ok(None),
163+
},
164+
_ => Ok(None),
165+
}
166+
}
167+
}
168+
169+
#[derive(Debug, Clone)]
170+
struct TopicTableProvider {
171+
topic: String,
172+
schema: SchemaRef,
173+
}
174+
175+
impl TopicTableProvider {
176+
pub fn new(topic: String, columns: &Vec<Column>) -> Self {
177+
let schema = Arc::new(Schema::new(
178+
columns.iter().map(|c| c.clone().into()).collect::<Vec<_>>(),
179+
));
180+
Self { topic, schema }
181+
}
182+
}
183+
184+
impl ContextProvider for TopicTableProvider {
185+
fn get_table_provider(&self, name: TableReference) -> Option<Arc<dyn TableProvider>> {
186+
match name {
187+
TableReference::Bare { table } if table == self.topic => Some(Arc::new(self.clone())),
188+
_ => None,
189+
}
190+
}
191+
192+
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
193+
None
194+
}
195+
196+
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
197+
None
198+
}
199+
}
200+
201+
impl TableProvider for TopicTableProvider {
202+
fn as_any(&self) -> &dyn Any {
203+
self
204+
}
205+
206+
fn schema(&self) -> SchemaRef {
207+
self.schema.clone()
208+
}
209+
210+
fn scan(
211+
&self,
212+
_projection: &Option<Vec<usize>>,
213+
_batch_size: usize,
214+
_filters: &[DExpr],
215+
_limit: Option<usize>,
216+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
217+
Ok(Arc::new(EmptyExec::new(false, self.schema())))
218+
}
219+
220+
fn statistics(&self) -> Statistics {
221+
Statistics {
222+
num_rows: None,
223+
total_byte_size: None,
224+
column_statistics: None,
65225
}
66226
}
67227
}
@@ -297,6 +457,25 @@ impl StreamingSource for KafkaStreamingSource {
297457
Ok(stream)
298458
}
299459

460+
async fn apply_post_filter(&self, data: Vec<ArrayRef>) -> Result<Vec<ArrayRef>, CubeError> {
461+
if let Some(post_filter) = &self.post_filter {
462+
let schema = post_filter.children()[0].schema();
463+
let batch = RecordBatch::try_new(schema.clone(), data)?;
464+
let input = Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None)?);
465+
let filter = post_filter.with_new_children(vec![input])?;
466+
let mut out_batches = collect(filter).await?;
467+
let res = if out_batches.len() == 1 {
468+
out_batches.pop().unwrap()
469+
} else {
470+
RecordBatch::concat(&schema, &out_batches)?
471+
};
472+
473+
Ok(res.columns().to_vec())
474+
} else {
475+
Ok(data)
476+
}
477+
}
478+
300479
fn validate_table_location(&self) -> Result<(), CubeError> {
301480
// TODO
302481
// self.query(None)?;

rust/cubestore/cubestore/src/streaming/mod.rs

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::table::{Row, TableValue, TimestampValue};
1414
use crate::util::decimal::Decimal;
1515
use crate::CubeError;
1616
use arrow::array::ArrayBuilder;
17+
use arrow::array::ArrayRef;
1718
use async_trait::async_trait;
1819
use chrono::Utc;
1920
use datafusion::cube_ext::ordfloat::OrdF64;
@@ -133,6 +134,7 @@ impl StreamingServiceImpl {
133134
table.get_row().unique_key_columns()
134135
.ok_or_else(|| CubeError::internal(format!("Streaming table without unique key columns: {:?}", table)))?
135136
.into_iter().cloned().collect(),
137+
table.get_row().get_columns().clone(),
136138
user.clone(),
137139
password.clone(),
138140
table_name,
@@ -307,11 +309,14 @@ impl StreamingService for StreamingServiceImpl {
307309
.meta_store
308310
.create_replay_handle(table.get_id(), location_index, seq_pointer)
309311
.await?;
312+
let data = finish(builders);
313+
let data = source.apply_post_filter(data).await?;
314+
310315
let new_chunks = self
311316
.chunk_store
312317
.partition_data(
313318
table.get_id(),
314-
finish(builders),
319+
data,
315320
table.get_row().get_columns().as_slice(),
316321
true,
317322
)
@@ -355,6 +360,10 @@ pub trait StreamingSource: Send + Sync {
355360
initial_seq_value: Option<i64>,
356361
) -> Result<Pin<Box<dyn Stream<Item = Result<Row, CubeError>> + Send>>, CubeError>;
357362

363+
async fn apply_post_filter(&self, data: Vec<ArrayRef>) -> Result<Vec<ArrayRef>, CubeError> {
364+
Ok(data)
365+
}
366+
358367
fn validate_table_location(&self) -> Result<(), CubeError>;
359368
}
360369

@@ -1067,7 +1076,8 @@ mod tests {
10671076
serde_json::json!({ "MESSAGEID": i.to_string() }).to_string()
10681077
)),
10691078
payload: Some(
1070-
serde_json::json!({ "ANONYMOUSID": j.to_string() }).to_string(),
1079+
serde_json::json!({ "ANONYMOUSID": j.to_string(), "TIMESTAMP": i })
1080+
.to_string(),
10711081
),
10721082
offset: i,
10731083
});
@@ -1368,4 +1378,67 @@ mod tests {
13681378
})
13691379
.await;
13701380
}
1381+
1382+
#[tokio::test]
1383+
async fn streaming_filter_kafka() {
1384+
Config::test("streaming_filter_kafka").update_config(|mut c| {
1385+
c.stream_replay_check_interval_secs = 1;
1386+
c.compaction_in_memory_chunks_max_lifetime_threshold = 8;
1387+
c.partition_split_threshold = 1000000;
1388+
c.max_partition_split_threshold = 1000000;
1389+
c.compaction_chunks_count_threshold = 100;
1390+
c.compaction_chunks_total_size_threshold = 100000;
1391+
c.stale_stream_timeout = 1;
1392+
c.wal_split_threshold = 16384;
1393+
c
1394+
}).start_with_injector_override(async move |injector| {
1395+
injector.register_typed::<dyn KafkaClientService, _, _, _>(async move |_| {
1396+
Arc::new(MockKafkaClient)
1397+
})
1398+
.await
1399+
}, async move |services| {
1400+
let service = services.sql_service;
1401+
1402+
let _ = service.exec_query("CREATE SCHEMA test").await.unwrap();
1403+
1404+
service
1405+
.exec_query("CREATE SOURCE OR UPDATE kafka AS 'kafka' VALUES (user = 'foo', password = 'bar', host = 'localhost:9092')")
1406+
.await
1407+
.unwrap();
1408+
1409+
let listener = services.cluster.job_result_listener();
1410+
1411+
let _ = service
1412+
.exec_query("CREATE TABLE test.events_by_type_1 (`ANONYMOUSID` text, `MESSAGEID` text, `TIMESTAMP` int) \
1413+
WITH (stream_offset = 'earliest', select_statement = 'SELECT * FROM EVENTS_BY_TYPE WHERE TIMESTAMP >= 10000 and TIMESTAMP < 14000') \
1414+
unique key (`ANONYMOUSID`, `MESSAGEID`, `TIMESTAMP`) INDEX by_anonymous(`ANONYMOUSID`, `TIMESTAMP`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'")
1415+
.await
1416+
.unwrap();
1417+
1418+
let wait = listener.wait_for_job_results(vec![
1419+
(RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/0".to_string())),
1420+
(RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/1".to_string())),
1421+
]);
1422+
timeout(Duration::from_secs(15), wait).await.unwrap().unwrap();
1423+
1424+
let result = service
1425+
.exec_query("SELECT COUNT(*) FROM test.events_by_type_1")
1426+
.await
1427+
.unwrap();
1428+
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(8000)])]);
1429+
1430+
let result = service
1431+
.exec_query("SELECT min(TIMESTAMP) FROM test.events_by_type_1 ")
1432+
.await
1433+
.unwrap();
1434+
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(10000)])]);
1435+
1436+
let result = service
1437+
.exec_query("SELECT max(TIMESTAMP) FROM test.events_by_type_1 ")
1438+
.await
1439+
.unwrap();
1440+
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(13999)])]);
1441+
})
1442+
.await;
1443+
}
13711444
}

0 commit comments

Comments
 (0)