@@ -39,6 +39,7 @@ pub const READY_FOR_QUERY_TAG: u8 = b'Z';
39
39
// replication message tags
40
40
pub const XLOG_DATA_TAG : u8 = b'w' ;
41
41
pub const PRIMARY_KEEPALIVE_TAG : u8 = b'k' ;
42
+ pub const INTERPRETED_WAL_RECORD_TAG : u8 = b'0' ;
42
43
43
44
// logical replication message tags
44
45
const BEGIN_TAG : u8 = b'B' ;
@@ -325,6 +326,7 @@ impl Message {
325
326
pub enum ReplicationMessage < D > {
326
327
XLogData ( XLogDataBody < D > ) ,
327
328
PrimaryKeepAlive ( PrimaryKeepAliveBody ) ,
329
+ RawInterpretedWalRecords ( RawInterpretedWalRecordsBody < D > ) ,
328
330
}
329
331
330
332
impl ReplicationMessage < Bytes > {
@@ -370,6 +372,15 @@ impl ReplicationMessage<Bytes> {
370
372
reply,
371
373
} )
372
374
}
375
+ INTERPRETED_WAL_RECORD_TAG => {
376
+ let streaming_lsn = buf. read_u64 :: < BigEndian > ( ) ?;
377
+ let wal_end = buf. read_u64 :: < BigEndian > ( ) ?;
378
+ ReplicationMessage :: RawInterpretedWalRecords ( RawInterpretedWalRecordsBody {
379
+ streaming_lsn,
380
+ wal_end,
381
+ data : buf. read_all ( ) ,
382
+ } )
383
+ }
373
384
tag => {
374
385
return Err ( io:: Error :: new (
375
386
io:: ErrorKind :: InvalidInput ,
@@ -950,6 +961,30 @@ impl<D> XLogDataBody<D> {
950
961
}
951
962
}
952
963
964
+ #[ derive( Debug ) ]
965
+ pub struct RawInterpretedWalRecordsBody < D > {
966
+ streaming_lsn : u64 ,
967
+ wal_end : u64 ,
968
+ data : D ,
969
+ }
970
+
971
+ impl < D > RawInterpretedWalRecordsBody < D > {
972
+ #[ inline]
973
+ pub fn streaming_lsn ( & self ) -> u64 {
974
+ self . streaming_lsn
975
+ }
976
+
977
+ #[ inline]
978
+ pub fn wal_end ( & self ) -> u64 {
979
+ self . wal_end
980
+ }
981
+
982
+ #[ inline]
983
+ pub fn data ( & self ) -> & D {
984
+ & self . data
985
+ }
986
+ }
987
+
953
988
#[ derive( Debug ) ]
954
989
pub struct PrimaryKeepAliveBody {
955
990
wal_end : u64 ,
0 commit comments