Skip to content

Commit 28be260

Browse files
committed
chore(cubestore): Direct kafka download support for ksql streams and tables -- handle ksql meta in a key and
1 parent 2a94c9c commit 28be260

File tree

2 files changed

+238
-14
lines changed

2 files changed

+238
-14
lines changed

rust/cubestore/cubestore/src/streaming/kafka.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use async_std::stream;
99
use async_trait::async_trait;
1010
use datafusion::cube_ext;
1111
use futures::Stream;
12+
use json::object::Object;
13+
use json::JsonValue;
1214
use rdkafka::consumer::{Consumer, StreamConsumer};
1315
use rdkafka::error::KafkaResult;
1416
use rdkafka::message::BorrowedMessage;
@@ -86,24 +88,32 @@ pub struct KafkaClientServiceImpl {
8688

8789
pub enum KafkaMessage<'a> {
8890
BorrowedMessage(BorrowedMessage<'a>),
91+
MockMessage {
92+
key: Option<String>,
93+
payload: Option<String>,
94+
offset: i64,
95+
},
8996
}
9097

9198
impl<'a> KafkaMessage<'a> {
9299
pub fn key(&self) -> Option<&[u8]> {
93100
match self {
94101
KafkaMessage::BorrowedMessage(m) => m.key(),
102+
KafkaMessage::MockMessage { key, .. } => key.as_ref().map(|k| k.as_bytes()),
95103
}
96104
}
97105

98106
pub fn payload(&self) -> Option<&[u8]> {
99107
match self {
100108
KafkaMessage::BorrowedMessage(m) => m.payload(),
109+
KafkaMessage::MockMessage { payload, .. } => payload.as_ref().map(|k| k.as_bytes()),
101110
}
102111
}
103112

104113
pub fn offset(&self) -> i64 {
105114
match self {
106115
KafkaMessage::BorrowedMessage(m) => m.offset(),
116+
KafkaMessage::MockMessage { offset, .. } => *offset,
107117
}
108118
}
109119
}
@@ -242,29 +252,37 @@ impl StreamingSource for KafkaStreamingSource {
242252
&self.password,
243253
self.use_ssl,
244254
Arc::new(move |m| -> Result<_, _> {
245-
if let Some((payload_str, key_str)) = m
246-
.payload()
247-
.map(|p| String::from_utf8_lossy(p))
248-
.zip(m.key().map(|p| String::from_utf8_lossy(p)))
249-
{
255+
if let Some(payload_str) = m.payload().map(|p| String::from_utf8_lossy(p)) {
250256
let payload = json::parse(payload_str.as_ref()).map_err(|e| {
251257
CubeError::user(format!("Can't parse '{}' payload: {}", payload_str, e))
252258
})?;
253-
// TODO Handle properly meta in a key added by ksql after \0
254-
let key = json::parse(key_str.as_ref().split("\0").next().unwrap())
255-
.map_err(|e| {
256-
CubeError::user(format!("Can't parse '{}' key: {}", key_str, e))
257-
})?;
259+
// Kafka can store additional metadata in suffix that contains information about window size for example
260+
// Another use case is streams would usually don't have any keys
261+
let mut key = JsonValue::Object(Object::new());
262+
if let Some(key_str) = m.key().map(|p| String::from_utf8_lossy(p)) {
263+
if key_str.starts_with("{") {
264+
if let Some(last_brace) = key_str.find("}") {
265+
key = json::parse(&key_str.as_ref()[0..last_brace + 1])
266+
.map_err(|e| {
267+
CubeError::user(format!(
268+
"Can't parse '{}' key: {}",
269+
key_str, e
270+
))
271+
})?;
272+
}
273+
}
274+
}
275+
258276
let mut values = parse_json_payload_and_key(
259277
&column_to_move,
260278
&unique_key_columns,
261279
payload,
262-
key,
280+
&key,
263281
)
264282
.map_err(|e| {
265283
CubeError::user(format!(
266284
"Can't parse kafka row with '{}' key and '{}' payload: {}",
267-
key_str, payload_str, e
285+
key, payload_str, e
268286
))
269287
})?;
270288
values[seq_column_to_move.get_index()] = TableValue::Int(m.offset());

rust/cubestore/cubestore/src/streaming/mod.rs

Lines changed: 208 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ pub fn parse_json_payload_and_key(
418418
columns: &Vec<Column>,
419419
unique_key_columns: &Vec<Column>,
420420
payload: JsonValue,
421-
key: JsonValue,
421+
key: &JsonValue,
422422
) -> Result<Vec<TableValue>, CubeError> {
423423
match payload {
424424
JsonValue::Object(obj) => columns
@@ -427,7 +427,7 @@ pub fn parse_json_payload_and_key(
427427
let mut field_value = obj.get(col.get_name());
428428
if field_value.is_none() {
429429
if unique_key_columns.iter().any(|c| c.get_name() == col.get_name()) {
430-
field_value = match &key {
430+
field_value = match key {
431431
JsonValue::Object(obj) => obj.get(col.get_name()),
432432
x if unique_key_columns.len() == 1 => Some(x),
433433
x => return Err(CubeError::internal(format!(
@@ -889,6 +889,7 @@ mod tests {
889889
use std::time::Duration;
890890

891891
use pretty_assertions::assert_eq;
892+
use rdkafka::Offset;
892893

893894
use crate::cluster::Cluster;
894895
use crate::config::Config;
@@ -898,6 +899,7 @@ mod tests {
898899
use crate::metastore::job::JobType;
899900
use crate::scheduler::SchedulerImpl;
900901
use crate::sql::MySqlDialectWithBackTicks;
902+
use crate::streaming::kafka::KafkaMessage;
901903
use crate::streaming::{KSqlQuery, KSqlQuerySchema, KsqlClient, KsqlResponse};
902904
use crate::TableId;
903905
use sqlparser::ast::{BinaryOperator, Expr, SetExpr, Statement, Value};
@@ -1022,6 +1024,69 @@ mod tests {
10221024
}
10231025
}
10241026

1027+
pub struct MockKafkaClient;
1028+
1029+
crate::di_service!(MockKafkaClient, [KafkaClientService]);
1030+
1031+
#[async_trait::async_trait]
1032+
impl KafkaClientService for MockKafkaClient {
1033+
async fn create_message_stream(
1034+
&self,
1035+
_table_id: u64,
1036+
_topic: String,
1037+
partition: i32,
1038+
offset: Offset,
1039+
_hosts: Vec<String>,
1040+
_user: &Option<String>,
1041+
_password: &Option<String>,
1042+
_use_ssl: bool,
1043+
to_row: Arc<dyn Fn(KafkaMessage) -> Result<Option<Row>, CubeError> + Send + Sync>,
1044+
) -> Result<Pin<Box<dyn Stream<Item = Result<Row, CubeError>> + Send>>, CubeError> {
1045+
let max_offset = 50000;
1046+
let offset = match offset {
1047+
Offset::Beginning => 0,
1048+
Offset::End => max_offset,
1049+
Offset::Stored => 0,
1050+
Offset::Invalid => 0,
1051+
Offset::Offset(offset) => offset,
1052+
Offset::OffsetTail(offset) => max_offset - offset,
1053+
};
1054+
1055+
let mut messages = Vec::new();
1056+
1057+
for i in offset..max_offset {
1058+
for j in 0..2 {
1059+
if partition != j {
1060+
continue;
1061+
}
1062+
1063+
messages.push(KafkaMessage::MockMessage {
1064+
// Keys in kafka can have suffixes which contain arbitrary metadata like window size
1065+
key: Some(format!(
1066+
"{}foo",
1067+
serde_json::json!({ "MESSAGEID": i.to_string() }).to_string()
1068+
)),
1069+
payload: Some(
1070+
serde_json::json!({ "ANONYMOUSID": j.to_string() }).to_string(),
1071+
),
1072+
offset: i,
1073+
});
1074+
}
1075+
}
1076+
1077+
let rows = messages
1078+
.into_iter()
1079+
.map(|m| to_row(m))
1080+
.collect::<Result<Vec<_>, _>>()?
1081+
.into_iter()
1082+
.flatten()
1083+
.map(|m| Ok(m))
1084+
.collect::<Vec<_>>();
1085+
1086+
Ok(Box::pin(stream::iter(rows)))
1087+
}
1088+
}
1089+
10251090
#[tokio::test]
10261091
async fn streaming_replay() {
10271092
Config::test("streaming_replay").update_config(|mut c| {
@@ -1162,4 +1227,145 @@ mod tests {
11621227
})
11631228
.await;
11641229
}
1230+
1231+
#[tokio::test]
1232+
async fn streaming_replay_kafka() {
1233+
Config::test("streaming_replay_kafka").update_config(|mut c| {
1234+
c.stream_replay_check_interval_secs = 1;
1235+
c.compaction_in_memory_chunks_max_lifetime_threshold = 8;
1236+
c.partition_split_threshold = 1000000;
1237+
c.max_partition_split_threshold = 1000000;
1238+
c.compaction_chunks_count_threshold = 100;
1239+
c.compaction_chunks_total_size_threshold = 100000;
1240+
c.stale_stream_timeout = 1;
1241+
c.wal_split_threshold = 16384;
1242+
c
1243+
}).start_with_injector_override(async move |injector| {
1244+
injector.register_typed::<dyn KafkaClientService, _, _, _>(async move |_| {
1245+
Arc::new(MockKafkaClient)
1246+
})
1247+
.await
1248+
}, async move |services| {
1249+
let chunk_store = services.injector.get_service_typed::<dyn ChunkDataStore>().await;
1250+
let scheduler = services.injector.get_service_typed::<SchedulerImpl>().await;
1251+
let service = services.sql_service;
1252+
let meta_store = services.meta_store;
1253+
1254+
let _ = service.exec_query("CREATE SCHEMA test").await.unwrap();
1255+
1256+
service
1257+
.exec_query("CREATE SOURCE OR UPDATE kafka AS 'kafka' VALUES (user = 'foo', password = 'bar', host = 'localhost:9092')")
1258+
.await
1259+
.unwrap();
1260+
1261+
let listener = services.cluster.job_result_listener();
1262+
1263+
let _ = service
1264+
.exec_query("CREATE TABLE test.events_by_type_1 (`ANONYMOUSID` text, `MESSAGEID` text) WITH (stream_offset = 'earliest') unique key (`ANONYMOUSID`, `MESSAGEID`) INDEX by_anonymous(`ANONYMOUSID`) location 'stream://kafka/EVENTS_BY_TYPE/0', 'stream://kafka/EVENTS_BY_TYPE/1'")
1265+
.await
1266+
.unwrap();
1267+
1268+
let wait = listener.wait_for_job_results(vec![
1269+
(RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/0".to_string())),
1270+
(RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/1".to_string())),
1271+
]);
1272+
timeout(Duration::from_secs(15), wait).await.unwrap().unwrap();
1273+
1274+
let result = service
1275+
.exec_query("SELECT COUNT(*) FROM test.events_by_type_1")
1276+
.await
1277+
.unwrap();
1278+
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(100000)])]);
1279+
1280+
let listener = services.cluster.job_result_listener();
1281+
let chunks = meta_store.chunks_table().all_rows().await.unwrap();
1282+
let replay_handles = meta_store.get_replay_handles_by_ids(chunks.iter().filter_map(|c| c.get_row().replay_handle_id().clone()).collect()).await.unwrap();
1283+
let mut middle_chunk = None;
1284+
for chunk in chunks.iter() {
1285+
if let Some(handle_id) = chunk.get_row().replay_handle_id() {
1286+
let handle = replay_handles.iter().find(|h| h.get_id() == *handle_id).unwrap();
1287+
if let Some(seq_pointers) = handle.get_row().seq_pointers_by_location() {
1288+
if seq_pointers.iter().any(|p| p.as_ref().map(|p| p.start_seq().as_ref().zip(p.end_seq().as_ref()).map(|(a, b)| *a > 0 && *b <= 32768).unwrap_or(false)).unwrap_or(false)) {
1289+
chunk_store.free_memory_chunk(chunk.get_id()).await.unwrap();
1290+
middle_chunk = Some(chunk.clone());
1291+
break;
1292+
}
1293+
}
1294+
}
1295+
}
1296+
Delay::new(Duration::from_millis(10000)).await;
1297+
scheduler.schedule_compaction_in_memory_chunks_if_needed(&meta_store.get_partition(middle_chunk.unwrap().get_row().get_partition_id()).await.unwrap()).await.unwrap();
1298+
1299+
let wait = listener.wait_for_job_results(vec![
1300+
(RowKey::Table(TableId::Partitions, 1), JobType::InMemoryChunksCompaction),
1301+
]);
1302+
timeout(Duration::from_secs(10), wait).await.unwrap().unwrap();
1303+
1304+
println!("chunks: {:#?}", service
1305+
.exec_query("SELECT * FROM system.chunks")
1306+
.await
1307+
.unwrap()
1308+
);
1309+
println!("replay handles: {:#?}", service
1310+
.exec_query("SELECT * FROM system.replay_handles")
1311+
.await
1312+
.unwrap()
1313+
);
1314+
1315+
let result = service
1316+
.exec_query("SELECT COUNT(*) FROM test.events_by_type_1")
1317+
.await
1318+
.unwrap();
1319+
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(100000 - 16384)])]);
1320+
1321+
let listener = services.cluster.job_result_listener();
1322+
1323+
scheduler.reconcile_table_imports().await.unwrap();
1324+
1325+
let wait = listener.wait_for_job_results(vec![
1326+
(RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/0".to_string())),
1327+
(RowKey::Table(TableId::Tables, 1), JobType::TableImportCSV("stream://kafka/EVENTS_BY_TYPE/1".to_string())),
1328+
]);
1329+
timeout(Duration::from_secs(10), wait).await.unwrap().unwrap();
1330+
Delay::new(Duration::from_millis(10000)).await;
1331+
1332+
let result = service
1333+
.exec_query("SELECT COUNT(*) FROM test.events_by_type_1")
1334+
.await
1335+
.unwrap();
1336+
assert_eq!(result.get_rows(), &vec![Row::new(vec![TableValue::Int(100000)])]);
1337+
1338+
println!("replay handles pre merge: {:#?}", service
1339+
.exec_query("SELECT * FROM system.replay_handles")
1340+
.await
1341+
.unwrap()
1342+
);
1343+
1344+
scheduler.merge_replay_handles().await.unwrap();
1345+
1346+
let result = service
1347+
.exec_query("SELECT * FROM system.replay_handles WHERE has_failed_to_persist_chunks = true")
1348+
.await
1349+
.unwrap();
1350+
assert_eq!(result.get_rows().len(), 0);
1351+
1352+
println!("replay handles after merge: {:#?}", service
1353+
.exec_query("SELECT * FROM system.replay_handles")
1354+
.await
1355+
.unwrap()
1356+
);
1357+
1358+
service
1359+
.exec_query("DROP TABLE test.events_by_type_1")
1360+
.await
1361+
.unwrap();
1362+
1363+
let result = service
1364+
.exec_query("SELECT * FROM system.replay_handles")
1365+
.await
1366+
.unwrap();
1367+
assert_eq!(result.get_rows().len(), 0);
1368+
})
1369+
.await;
1370+
}
11651371
}

0 commit comments

Comments
 (0)