@@ -18,8 +18,6 @@ use crate::{
18
18
errors:: already_created_error,
19
19
} ;
20
20
use crate :: connectors:: prelude:: * ;
21
- use tremor_common:: time:: nanotime;
22
-
23
21
use mz_postgres_util:: { Config as MzConfig } ;
24
22
use tokio:: task;
25
23
use tokio_postgres:: config:: Config as TokioPgConfig ;
@@ -28,8 +26,6 @@ mod postgres_replication;
28
26
#[ derive( Deserialize , Debug , Clone ) ]
29
27
#[ serde( deny_unknown_fields) ]
30
28
pub ( crate ) struct Config {
31
- /// Interval in nanoseconds
32
- pub interval : u64 ,
33
29
/// Host name
34
30
pub host : String ,
35
31
/// Port number
@@ -63,19 +59,16 @@ impl ConnectorBuilder for Builder {
63
59
let config = Config :: new ( raw) ?;
64
60
let origin_uri = EventOriginUri {
65
61
scheme : "tremor-psql-repl" . to_string ( ) ,
66
- host : config. host ,
67
- port : Option :: from ( config. port ) ,
68
- path : vec ! [ config. interval . to_string( ) ] ,
62
+ host : config. host . clone ( ) ,
63
+ port : Option :: from ( config. port . clone ( ) ) ,
64
+ path : vec ! [ config. host . to_string( ) ] ,
69
65
} ;
70
- let database = config. dbname ;
71
- let username = config. username ;
72
- let password = config. password ;
73
- let pg_config= TokioPgConfig :: from_str ( & format ! ( "host={} port=5432 user={} password={} dbname={}" , origin_uri. host, username, password, database) ) ?;
66
+
67
+ let pg_config= TokioPgConfig :: from_str ( & format ! ( "host={} port={} user={} password={} dbname={}" , config. host, config. port, config. username, config. password, config. dbname) ) ?;
74
68
let connection_config = MzConfig :: new ( pg_config, mz_postgres_util:: TunnelConfig :: Direct ) ?;
75
69
let ( tx, rx) = bounded ( qsize ( ) ) ;
76
70
77
71
Ok ( Box :: new ( PostgresReplication {
78
- interval : config. interval ,
79
72
connection_config,
80
73
origin_uri,
81
74
rx : Some ( rx) ,
@@ -86,7 +79,6 @@ impl ConnectorBuilder for Builder {
86
79
87
80
#[ derive( Debug ) ]
88
81
pub ( crate ) struct PostgresReplication {
89
- interval : u64 ,
90
82
connection_config : MzConfig ,
91
83
origin_uri : EventOriginUri ,
92
84
rx : Option < Receiver < Value < ' static > > > ,
@@ -101,7 +93,6 @@ impl Connector for PostgresReplication {
101
93
builder : SourceManagerBuilder ,
102
94
) -> Result < Option < SourceAddr > > {
103
95
let source = PostgresReplicationSource :: new (
104
- self . interval ,
105
96
self . connection_config . clone ( ) ,
106
97
self . rx . take ( ) . ok_or_else ( already_created_error) ?,
107
98
self . tx . clone ( ) ,
@@ -115,22 +106,18 @@ impl Connector for PostgresReplication {
115
106
}
116
107
117
108
struct PostgresReplicationSource {
118
- interval_ns : u64 ,
119
- next : u64 ,
120
109
connection_config : MzConfig ,
121
110
rx : Receiver < Value < ' static > > ,
122
111
tx : Sender < Value < ' static > > ,
123
112
origin_uri : EventOriginUri ,
124
113
}
125
114
126
115
impl PostgresReplicationSource {
127
- fn new ( interval_ns : u64 , connection_config : MzConfig , rx : Receiver < Value < ' static > > , tx : Sender < Value < ' static > > , origin_uri : EventOriginUri ) -> Self {
116
+ fn new ( connection_config : MzConfig , rx : Receiver < Value < ' static > > , tx : Sender < Value < ' static > > , origin_uri : EventOriginUri ) -> Self {
128
117
Self {
129
- interval_ns,
130
118
connection_config,
131
119
rx,
132
120
tx,
133
- next : nanotime ( ) + interval_ns, // dummy placeholer
134
121
origin_uri,
135
122
}
136
123
}
@@ -139,8 +126,6 @@ impl PostgresReplicationSource {
139
126
#[ async_trait:: async_trait( ) ]
140
127
impl Source for PostgresReplicationSource {
141
128
async fn connect ( & mut self , _ctx : & SourceContext , _attempt : & Attempt ) -> Result < bool > {
142
- self . next = nanotime ( ) + self . interval_ns ;
143
- // postgres_replication::replication(self.connection_config.clone(),self.tx.clone()).await?;
144
129
let conn_config = self . connection_config . clone ( ) ;
145
130
let tx = self . tx . clone ( ) ;
146
131
task:: spawn ( async move { postgres_replication:: replication ( conn_config, tx) . await . unwrap ( ) ; } ) ;
0 commit comments