Skip to content

Commit fbfc71a

Browse files
author
fn500i
committed
sending relation event
sending publication tables Signed-off-by: fn500i <[email protected]>
1 parent badb994 commit fbfc71a

File tree

2 files changed

+36
-23
lines changed

2 files changed

+36
-23
lines changed

src/connectors/impls/psql_repl/postgres_replication.rs

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@ use std::{
1818
},
1919
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
2020
};
21-
// use serde::Deserialize;
22-
// use serde_json::Value;
23-
// use simd_json::ValueAccess;
24-
// use tokio_postgres::{replication::LogicalReplicationStream, types::PgLsn, Client, SimpleQueryMessage, GenericClient};
21+
2522
use tokio_postgres::{replication::LogicalReplicationStream, types::PgLsn, Client, SimpleQueryMessage};
2623
mod serializer;
2724
use serializer::SerializedXLogDataBody;
@@ -152,12 +149,12 @@ async fn produce_replication<'a>(
152149
// println!("======== END OF the DELETE MESSAGE JSON ==========");
153150
// }
154151
//
155-
LogicalReplicationMessage::Relation(_relation) => {
152+
// LogicalReplicationMessage::Relation(_relation) => {
156153
// println!("======== RELATION ==========");
157154
// let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
158155
// println!("{}", serialized_xlog);
159156
// println!("======== END OF the RELATION MESSAGE JSON ==========");
160-
}
157+
// }
161158
_ => yield xlog_data,
162159
}
163160
}
@@ -364,6 +361,11 @@ struct SourceTable {
364361
casts: Vec<MirScalarExpr>,
365362
}
366363

364+
#[derive(Serialize, Deserialize)]
365+
struct PublicationTables {
366+
publication_tables: Vec<PostgresTableDesc>,
367+
}
368+
367369
pub(crate) async fn replication(connection_config:MzConfig, tx:Sender<tremor_value::Value<'static>>) -> Result<(), anyhow::Error> {
368370
let publication = "gamespub";
369371
let publication_tables =
@@ -372,10 +374,18 @@ pub(crate) async fn replication(connection_config:MzConfig, tx:Sender<tremor_val
372374
let source_id = "source_id";
373375
let mut _replication_lsn = PgLsn::from(0);
374376

375-
println!("======== BEGIN SNAPSHOT ==========");
377+
// println!("======== BEGIN SNAPSHOT ==========");
376378

377379
// Validate publication tables against the state snapshot
378-
dbg!(&publication_tables);
380+
// dbg!(&publication_tables);
381+
let mut postgres_tables = Vec::new();
382+
for postgres_table_desc in &publication_tables {
383+
postgres_tables.push(postgres_table_desc.clone());
384+
}
385+
let publication_tables_json = serde_json::to_string(&PublicationTables{publication_tables: postgres_tables}).unwrap();
386+
let json_obj : tremor_value::Value = serde_json::from_str(&publication_tables_json)?;
387+
tx.send(json_obj.into_static()).await?;
388+
379389
let source_tables: BTreeMap<u32, SourceTable> = publication_tables
380390
.into_iter()
381391
.map(|t| {
@@ -402,14 +412,14 @@ pub(crate) async fn replication(connection_config:MzConfig, tx:Sender<tremor_val
402412
))
403413
.await?;
404414

405-
dbg!(&res);
415+
// dbg!(&res);
406416
let slot_lsn = parse_single_row(&res, "confirmed_flush_lsn");
407417
client
408418
.simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
409419
.await?;
410420
let (slot_lsn, snapshot_lsn, temp_slot): (PgLsn, PgLsn, _) = match slot_lsn {
411421
Ok(slot_lsn) => {
412-
dbg!(&slot_lsn);
422+
// dbg!(&slot_lsn);
413423
// The main slot already exists which means we can't use it for the snapshot. So
414424
// we'll create a temporary replication slot in order to both set the transaction's
415425
// snapshot to be a consistent point and also to find out the LSN that the snapshot
@@ -418,32 +428,32 @@ pub(crate) async fn replication(connection_config:MzConfig, tx:Sender<tremor_val
418428
// When this happens we'll most likely be snapshotting at a later LSN than the slot
419429
// which we will take care below by rewinding.
420430
let temp_slot = uuid::Uuid::new_v4().to_string().replace('-', "");
421-
dbg!(&temp_slot);
431+
// dbg!(&temp_slot);
422432
let res = client
423433
.simple_query(&format!(
424434
r#"CREATE_REPLICATION_SLOT {:?} TEMPORARY LOGICAL "pgoutput" USE_SNAPSHOT"#,
425435
temp_slot
426436
))
427437
.await?;
428-
dbg!(&res);
438+
// dbg!(&res);
429439
let snapshot_lsn = parse_single_row(&res, "consistent_point")?;
430440
(slot_lsn, snapshot_lsn, Some(temp_slot))
431441
}
432-
Err(e) => {
433-
dbg!(e);
442+
Err(_e) => {
443+
// dbg!(e);
434444
let res = client
435445
.simple_query(&format!(
436446
r#"CREATE_REPLICATION_SLOT {:?} LOGICAL "pgoutput" USE_SNAPSHOT"#,
437447
slot
438448
))
439449
.await?;
440-
dbg!(&res);
450+
// dbg!(&res);
441451
let slot_lsn: PgLsn = parse_single_row(&res, "consistent_point")?;
442452
(slot_lsn, slot_lsn, None)
443453
}
444454
};
445455

446-
dbg!(&slot_lsn, &snapshot_lsn, &temp_slot);
456+
// dbg!(&slot_lsn, &snapshot_lsn, &temp_slot);
447457

448458
let mut stream = Box::pin(produce_snapshot(&client, &source_tables).enumerate());
449459

@@ -457,11 +467,11 @@ pub(crate) async fn replication(connection_config:MzConfig, tx:Sender<tremor_val
457467
// ));
458468
// // });
459469
// }
460-
let (output, row) = event?;
470+
let (_output, _row) = event?;
461471

462-
dbg!(output, row, slot_lsn, 1);
472+
// dbg!(output, row, slot_lsn, 1);
463473
}
464-
println!("======== END SNAPSHOT ==========");
474+
// println!("======== END SNAPSHOT ==========");
465475

466476
if let Some(temp_slot) = temp_slot {
467477
let _ = client
@@ -491,7 +501,7 @@ pub(crate) async fn replication(connection_config:MzConfig, tx:Sender<tremor_val
491501
)
492502
.await;
493503
tokio::pin!(replication_stream);
494-
println!("======== STARTING WHILE LOOP ==========");
504+
// println!("======== STARTING WHILE LOOP ==========");
495505
while let Some(event) = replication_stream.next().await {
496506
// let event = event?;
497507
let serialized_event = serde_json::to_string_pretty(&SerializedXLogDataBody(event?)).unwrap();

src/connectors/impls/psql_repl/postgres_replication/serializer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ impl<'a> Serialize for SerializedTuple<'a> {
1313
where
1414
S: Serializer,
1515
{
16-
let data = self.0.tuple_data().iter().map(|d| SerializedTupleData::from_tuple_data(d)).collect::<Vec<_>>();
16+
let data = self.0.tuple_data().iter().map(|d| {
17+
SerializedTupleData::from_tuple_data(d)
18+
}).collect::<Vec<_>>();
1719
let mut state = serializer.serialize_struct("Tuple", 1)?;
1820
state.serialize_field("data", &data)?;
1921
state.end()
@@ -52,6 +54,7 @@ impl<'a, 'b> From<&'b Tuple> for SerializedTuple<'a> where 'b: 'a {
5254
/// a tuple. This struct is used by SerializedTuple and SerializedOptionTuple
5355
/// to serialize tuple data into JSON format.
5456
#[derive(Serialize)]
57+
#[serde(untagged)]
5558
enum SerializedTupleData {
5659
Null,
5760
UnchangedToast,
@@ -157,7 +160,6 @@ impl fmt::Display for CustomError {
157160
write!(f, "{}", self.message)
158161
}
159162
}
160-
161163
///SerializedXLogDataBody: A wrapper struct around a XLogDataBody object that provides a
162164
/// Serialize implementation for it.
163165
/// This struct is used to serialize logical replication messages into JSON format.
@@ -209,6 +211,7 @@ impl<'a> Serialize for SerializedLogicalReplicationMessage<'a> {
209211
state.serialize_field("name", &name)?;
210212
}
211213
LogicalReplicationMessage::Relation(ref msg) => {
214+
state.serialize_field("type", "RELATION")?;
212215
state.serialize_field("rel_id", &msg.rel_id())?;
213216
let namespace = msg
214217
.namespace()
@@ -277,4 +280,4 @@ impl<'a> Serialize for SerializedLogicalReplicationMessage<'a> {
277280
}
278281
state.end()
279282
}
280-
}
283+
}

0 commit comments

Comments
 (0)