Skip to content

Commit 08dfc26

Browse files
author
fn500i
committed
adding publication and slot as config params
Signed-off-by: fn500i <[email protected]>
1 parent 9e017e7 commit 08dfc26

File tree

2 files changed

+27
-38
lines changed

2 files changed

+27
-38
lines changed

src/connectors/impls/psql_repl.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ pub(crate) struct Config {
3636
pub password: String,
3737
/// Database name
3838
pub dbname: String,
39+
/// Publication name
40+
pub publication: String,
41+
/// Replication slot name
42+
pub replication_slot: String,
3943
}
4044

4145
impl ConfigImpl for Config {}
@@ -63,13 +67,16 @@ impl ConnectorBuilder for Builder {
6367
port: Option::from(config.port.clone()),
6468
path: vec![config.host.to_string()],
6569
};
66-
70+
let publication = config.publication;
71+
let replication_slot = config.replication_slot;
6772
let pg_config= TokioPgConfig::from_str(&format!("host={} port={} user={} password={} dbname={}", config.host, config.port, config.username, config.password, config.dbname))?;
6873
let connection_config = MzConfig::new(pg_config, mz_postgres_util::TunnelConfig::Direct)?;
6974
let (tx,rx) = bounded(qsize());
7075

7176
Ok(Box::new(PostgresReplication {
7277
connection_config,
78+
publication,
79+
replication_slot,
7380
origin_uri,
7481
rx: Some(rx),
7582
tx,
@@ -80,6 +87,8 @@ impl ConnectorBuilder for Builder {
8087
#[derive(Debug)]
8188
pub(crate) struct PostgresReplication {
8289
connection_config : MzConfig,
90+
publication: String,
91+
replication_slot: String,
8392
origin_uri: EventOriginUri,
8493
rx : Option<Receiver<Value<'static>>>,
8594
tx: Sender<Value<'static>>,
@@ -94,6 +103,8 @@ impl Connector for PostgresReplication {
94103
) -> Result<Option<SourceAddr>> {
95104
let source = PostgresReplicationSource::new(
96105
self.connection_config.clone(),
106+
self.publication.clone(),
107+
self.replication_slot.clone(),
97108
self.rx.take().ok_or_else(already_created_error)?,
98109
self.tx.clone(),
99110
self.origin_uri.clone());
@@ -107,15 +118,19 @@ impl Connector for PostgresReplication {
107118

108119
struct PostgresReplicationSource {
109120
connection_config : MzConfig,
121+
publication: String,
122+
replication_slot: String,
110123
rx: Receiver<Value<'static>>,
111124
tx: Sender<Value<'static>>,
112125
origin_uri: EventOriginUri,
113126
}
114127

115128
impl PostgresReplicationSource {
116-
fn new(connection_config: MzConfig, rx: Receiver<Value<'static>>, tx: Sender<Value<'static>>, origin_uri: EventOriginUri) -> Self {
129+
fn new(connection_config: MzConfig, publication: String,replication_slot: String,rx: Receiver<Value<'static>>, tx: Sender<Value<'static>>, origin_uri: EventOriginUri) -> Self {
117130
Self {
118131
connection_config,
132+
publication,
133+
replication_slot,
119134
rx,
120135
tx,
121136
origin_uri,
@@ -127,8 +142,10 @@ impl PostgresReplicationSource {
127142
impl Source for PostgresReplicationSource {
128143
async fn connect(&mut self, _ctx: &SourceContext, _attempt: &Attempt) -> Result<bool> {
129144
let conn_config = self.connection_config.clone();
145+
let publication = self.publication.clone();
146+
let replication_slot = self.replication_slot.clone();
130147
let tx = self.tx.clone();
131-
task::spawn(async move {postgres_replication::replication(conn_config,tx).await.unwrap();});
148+
task::spawn(async move {postgres_replication::replication(conn_config, &publication, &replication_slot,tx).await.unwrap();});
132149
Ok(true)
133150
}
134151

src/connectors/impls/psql_repl/postgres_replication.rs

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -93,62 +93,36 @@ async fn produce_replication<'a>(
9393
Some(Ok(postgres_protocol::message::backend::ReplicationMessage::XLogData(xlog_data))) => {
9494
last_data_message = Instant::now();
9595
match xlog_data.data() {
96-
LogicalReplicationMessage::Origin(origin) => {
97-
// metrics.transactions.inc();
98-
last_commit_lsn = PgLsn::from(origin.commit_lsn());
99-
100-
// println!("======== ORIGIN ==========");
101-
// let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
102-
// println!("{}", serialized_xlog);
103-
// println!("======== END OF the ORIGIN MESSAGE JSON ==========");
104-
105-
// for (output, row) in deletes.drain(..) {
106-
// yield Event::Message(last_commit_lsn, (output, row, -1));
107-
// }
108-
// for (output, row) in inserts.drain(..) {
109-
// yield Event::Message(last_commit_lsn, (output, row, 1));
110-
// }
111-
// yield Event::Progress([PgLsn::from(u64::from(last_commit_lsn) + 1)]);
112-
// metrics.lsn.set(last_commit_lsn.into());
96+
LogicalReplicationMessage::Origin(_origin) => {
11397
}
11498

11599
LogicalReplicationMessage::Commit(commit) => {
116100
last_commit_lsn = PgLsn::from(commit.end_lsn());
117-
// println!("======== COMMIT ==========");
118-
// let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
119-
// println!("{}", serialized_xlog);
120-
// println!("======== END OF the COMMIT MESSAGE JSON ==========");
121101
}
122102
LogicalReplicationMessage::Begin(_begin) => {
123-
// last_commit_lsn = PgLsn::from(begin.final_lsn());
124-
// println!("======== BEGIN ==========");
125-
// let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
126-
// println!("{}", serialized_xlog);
127-
// println!("======== END OF the BEGIN MESSAGE JSON ==========");
128-
//
129103
}
130-
//
104+
131105
// LogicalReplicationMessage::Insert(_insert) => {
132106
// println!("======== INSERT ==========");
133107
// let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
134108
// println!("{}", serialized_xlog);
135109
// println!("======== END OF the INSERT MESSAGE JSON ==========");
136110
// }
137-
//
111+
138112
// LogicalReplicationMessage::Update(_update) => {
139113
// println!("======== UPDATE ==========");
140114
// let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
141115
// println!("{}", serialized_xlog);
142116
// println!("======== END OF the UPDATE MESSAGE JSON ==========");
143117
// }
144-
//
118+
145119
// LogicalReplicationMessage::Delete(_delete) => {
146120
// println!("======== DELETE ==========");
147121
// let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
148122
// println!("{}", serialized_xlog);
149123
// println!("======== END OF the DELETE MESSAGE JSON ==========");
150124
// }
151-
//
125+
152126
// LogicalReplicationMessage::Relation(_relation) => {
153127
// println!("======== RELATION ==========");
154128
// let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
@@ -366,15 +340,13 @@ struct PublicationTables {
366340
publication_tables: Vec<PostgresTableDesc>,
367341
}
368342

369-
pub(crate) async fn replication(connection_config:MzConfig, tx:Sender<tremor_value::Value<'static>>) -> Result<(), anyhow::Error> {
370-
let publication = "gamespub";
343+
pub(crate) async fn replication(connection_config:MzConfig, publication : &str, slot: &str, tx:Sender<tremor_value::Value<'static>>) -> Result<(), anyhow::Error> {
371344
let publication_tables =
372345
mz_postgres_util::publication_info(&connection_config, publication, None).await?;
373-
let slot = "gamess";
374346
let source_id = "source_id";
375347
let mut _replication_lsn = PgLsn::from(0);
376348

377-
// println!("======== BEGIN SNAPSHOT ==========");
349+
// println!("======== BEGIN SNA≠PSHOT ==========");
378350

379351
// Validate publication tables against the state snapshot
380352
// dbg!(&publication_tables);

0 commit comments

Comments
 (0)