@@ -42,26 +42,8 @@ async fn produce_replication<'a>(
42
42
committed_lsn : Arc < AtomicU64 > ,
43
43
) -> impl Stream < Item = Result < XLogDataBody < LogicalReplicationMessage > , ReplicationError > > + ' a {
44
44
async_stream:: try_stream!( {
45
- // //let mut last_data_message = Instant::now();
46
- // let mut inserts = vec![];
47
- // let mut deletes = vec![];
48
-
49
45
let mut last_feedback = Instant :: now( ) ;
50
-
51
- // Scratch space to use while evaluating casts
52
- // let mut datum_vec = DatumVec::new();
53
-
54
46
let mut last_commit_lsn = as_of;
55
- // let mut observed_wal_end = as_of;
56
- // The outer loop alternates the client between streaming the replication slot and using
57
- // normal SQL queries with pg admin functions to fast-foward our cursor in the event of WAL
58
- // lag.
59
- //
60
- // TODO(petrosagg): we need to do the above because a replication slot can be active only
61
- // one place which is why we need to do this dance of entering and exiting replication mode
62
- // in order to be able to use the administrative functions below. Perhaps it's worth
63
- // creating two independent slots so that we can use the secondary to check without
64
- // interrupting the stream on the first one
65
47
loop {
66
48
let client = client_config. clone( ) . connect_replication( ) . await ?;
67
49
let query = format!(
@@ -101,40 +83,11 @@ async fn produce_replication<'a>(
101
83
}
102
84
LogicalReplicationMessage :: Begin ( _begin) => {
103
85
}
104
-
105
- // LogicalReplicationMessage::Insert(_insert) => {
106
- // println!("======== INSERT ==========");
107
- // let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
108
- // println!("{}", serialized_xlog);
109
- // println!("======== END OF the INSERT MESSAGE JSON ==========");
110
- // }
111
-
112
- // LogicalReplicationMessage::Update(_update) => {
113
- // println!("======== UPDATE ==========");
114
- // let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
115
- // println!("{}", serialized_xlog);
116
- // println!("======== END OF the UPDATE MESSAGE JSON ==========");
117
- // }
118
-
119
- // LogicalReplicationMessage::Delete(_delete) => {
120
- // println!("======== DELETE ==========");
121
- // let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
122
- // println!("{}", serialized_xlog);
123
- // println!("======== END OF the DELETE MESSAGE JSON ==========");
124
- // }
125
-
126
- // LogicalReplicationMessage::Relation(_relation) => {
127
- // println!("======== RELATION ==========");
128
- // let serialized_xlog = serde_json::to_string_pretty(&SerializedXLogDataBody(xlog_data)).unwrap();
129
- // println!("{}", serialized_xlog);
130
- // println!("======== END OF the RELATION MESSAGE JSON ==========");
131
- // }
132
86
_ => yield xlog_data,
133
87
}
134
88
}
135
89
Some ( Ok ( ReplicationMessage :: PrimaryKeepAlive ( keepalive) ) ) => {
136
90
needs_status_update = needs_status_update || keepalive. reply( ) == 1 ;
137
- // observed_wal_end = PgLsn::from(keepalive.wal_end());
138
91
139
92
if last_data_message. elapsed( ) > WAL_LAG_GRACE_PERIOD {
140
93
break ;
@@ -193,7 +146,6 @@ async fn produce_replication<'a>(
193
146
publication = publication
194
147
) ;
195
148
196
- // let peek_binary_start_time = Instant::now();
197
149
let rows = client. simple_query( & query) . await ?;
198
150
199
151
let changes = rows
@@ -215,14 +167,6 @@ async fn produce_replication<'a>(
215
167
. count( ) ;
216
168
217
169
dbg!( changes) ;
218
-
219
- // If there are no changes until the end of the WAL it's safe to fast forward
220
- // if changes == 0 {
221
- // last_commit_lsn = observed_wal_end;
222
- // // `Progress` events are _frontiers_, so we add 1, just like when we
223
- // // handle data in `Commit` above.
224
- // yield Event::Progress([PgLsn::from(u64::from(last_commit_lsn) + 1)]);
225
- // }
226
170
}
227
171
} )
228
172
}
@@ -290,8 +234,6 @@ fn produce_snapshot<'a>(
290
234
291
235
tokio:: pin!( reader) ;
292
236
let mut text_row = Row :: default ( ) ;
293
- // TODO: once tokio-stream is released with https://github.com/tokio-rs/tokio/pull/4502
294
- // we can convert this into a single `timeout(...)` call on the reader CopyOutStream
295
237
while let Some ( b) = tokio:: time:: timeout( Duration :: from_secs( 30 ) , reader. next( ) )
296
238
. await ?
297
239
. transpose( ) ?
@@ -345,17 +287,11 @@ pub(crate) async fn replication(connection_config:MzConfig, publication : &str,
345
287
mz_postgres_util:: publication_info ( & connection_config, publication, None ) . await ?;
346
288
let source_id = "source_id" ;
347
289
let mut _replication_lsn = PgLsn :: from ( 0 ) ;
348
-
349
- // println!("======== BEGIN SNAPSHOT ==========");
350
-
351
- // Validate publication tables against the state snapshot
352
- // dbg!(&publication_tables);
353
- let mut postgres_tables = Vec :: new ( ) ;
354
- for postgres_table_desc in & publication_tables {
355
- postgres_tables. push ( postgres_table_desc. clone ( ) ) ;
356
- }
357
- let publication_tables_json = serde_json:: to_string ( & PublicationTables { publication_tables : postgres_tables} ) . unwrap ( ) ;
358
- let json_obj : tremor_value:: Value = serde_json:: from_str ( & publication_tables_json) ?;
290
+ // let mut postgres_tables = Vec::new();
291
+ // let publication_tables = mz_postgres_util::publication_info(&connection_config, publication, None).await?;
292
+ let publication_tables_json = serde_json:: to_string ( & PublicationTables { publication_tables : publication_tables. clone ( ) } ) ?;
293
+ let json_obj = serde_json:: from_str :: < tremor_value:: Value > ( & publication_tables_json) ?;
294
+ dbg ! ( & publication_tables_json) ;
359
295
tx. send ( json_obj. into_static ( ) ) . await ?;
360
296
361
297
let source_tables: BTreeMap < u32 , SourceTable > = publication_tables
@@ -373,25 +309,19 @@ pub(crate) async fn replication(connection_config:MzConfig, publication : &str,
373
309
. collect ( ) ;
374
310
let client = connection_config. clone ( ) . connect_replication ( ) . await ?;
375
311
376
- // Technically there is TOCTOU problem here but it makes the code easier and if we end
377
- // up attempting to create a slot and it already exists we will simply retry
378
- // Also, we must check if the slot exists before we start a transaction because creating a
379
- // slot must be the first statement in a transaction
380
312
let res = client
381
313
. simple_query ( & format ! (
382
314
r#"SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = '{}'"# ,
383
315
slot
384
316
) )
385
317
. await ?;
386
318
387
- // dbg!(&res);
388
319
let slot_lsn = parse_single_row ( & res, "confirmed_flush_lsn" ) ;
389
320
client
390
321
. simple_query ( "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;" )
391
322
. await ?;
392
323
let ( slot_lsn, snapshot_lsn, temp_slot) : ( PgLsn , PgLsn , _ ) = match slot_lsn {
393
324
Ok ( slot_lsn) => {
394
- // dbg!(&slot_lsn);
395
325
// The main slot already exists which means we can't use it for the snapshot. So
396
326
// we'll create a temporary replication slot in order to both set the transaction's
397
327
// snapshot to be a consistent point and also to find out the LSN that the snapshot
@@ -400,50 +330,33 @@ pub(crate) async fn replication(connection_config:MzConfig, publication : &str,
400
330
// When this happens we'll most likely be snapshotting at a later LSN than the slot
401
331
// which we will take care below by rewinding.
402
332
let temp_slot = uuid:: Uuid :: new_v4 ( ) . to_string ( ) . replace ( '-' , "" ) ;
403
- // dbg!(&temp_slot);
404
333
let res = client
405
334
. simple_query ( & format ! (
406
335
r#"CREATE_REPLICATION_SLOT {:?} TEMPORARY LOGICAL "pgoutput" USE_SNAPSHOT"# ,
407
336
temp_slot
408
337
) )
409
338
. await ?;
410
- // dbg!(&res);
411
339
let snapshot_lsn = parse_single_row ( & res, "consistent_point" ) ?;
412
340
( slot_lsn, snapshot_lsn, Some ( temp_slot) )
413
341
}
414
342
Err ( _e) => {
415
- // dbg!(e);
416
343
let res = client
417
344
. simple_query ( & format ! (
418
345
r#"CREATE_REPLICATION_SLOT {:?} LOGICAL "pgoutput" USE_SNAPSHOT"# ,
419
346
slot
420
347
) )
421
348
. await ?;
422
- // dbg!(&res);
423
349
let slot_lsn: PgLsn = parse_single_row ( & res, "consistent_point" ) ?;
424
350
( slot_lsn, slot_lsn, None )
425
351
}
426
352
} ;
427
353
428
- // dbg!(&slot_lsn, &snapshot_lsn, &temp_slot);
429
-
430
354
let mut stream = Box :: pin ( produce_snapshot ( & client, & source_tables) . enumerate ( ) ) ;
431
355
432
356
while let Some ( ( _i, event) ) = stream. as_mut ( ) . next ( ) . await {
433
- // if i > 0 {
434
- // // Failure scenario after we have produced at least one row, but before a
435
- // // successful `COMMIT`
436
- // // fail::fail_point!("pg_snapshot_failure", |_| {
437
- // return Err(anyhow::anyhow!(
438
- // "recoverable errors should crash the process"
439
- // ));
440
- // // });
441
- // }
442
357
let ( _output, _row) = event?;
443
-
444
- // dbg!(output, row, slot_lsn, 1);
445
358
}
446
- // println!("======== END SNAPSHOT ==========");
359
+
447
360
448
361
if let Some ( temp_slot) = temp_slot {
449
362
let _ = client
0 commit comments