1- use etcd_client:: { Client , DeleteOptions , GetOptions , KeyValue , PutOptions } ;
1+ use etcd_client:: { Client , ConnectOptions , TlsOptions , Identity , Certificate , Error , DeleteOptions , GetOptions , KeyValue , PutOptions } ;
2+ use std:: time:: Duration ;
23use pgrx:: pg_sys:: panic:: ErrorReport ;
34use pgrx:: PgSqlErrorCode ;
45use supabase_wrappers:: prelude:: * ;
@@ -19,6 +20,33 @@ pub(crate) struct EtcdFdw {
1920 fetch_key : bool ,
2021 fetch_value : bool ,
2122}
23+ pub struct EtcdConfig {
24+ pub endpoints : Vec < String > ,
25+ pub ca_cert_path : Option < String > ,
26+ pub client_cert_path : Option < String > ,
27+ pub client_key_path : Option < String > ,
28+ pub username : Option < String > ,
29+ pub password : Option < String > ,
30+ pub servername : Option < String > ,
31+ pub connect_timeout : Duration ,
32+ pub request_timeout : Duration ,
33+ }
34+
35+ impl Default for EtcdConfig {
36+ fn default ( ) -> Self {
37+ Self {
38+ endpoints : Vec :: new ( ) ,
39+ ca_cert_path : None ,
40+ client_cert_path : None ,
41+ client_key_path : None ,
42+ username : None ,
43+ password : None ,
44+ servername : None ,
45+ connect_timeout : Duration :: from_secs ( 10 ) ,
46+ request_timeout : Duration :: from_secs ( 30 ) ,
47+ }
48+ }
49+ }
2250
2351#[ derive( Error , Debug ) ]
2452pub enum EtcdFdwError {
@@ -34,6 +62,12 @@ pub enum EtcdFdwError {
3462 #[ error( "No connection string option was specified. Specify it with connstr" ) ]
3563 NoConnStr ( ( ) ) ,
3664
65+ #[ error( "KeyFile and CertFile must both be present." ) ]
66+ CertKeyMismatch ( ( ) ) ,
67+
68+ #[ error( "Username and Password must both be specified." ) ]
69+ UserPassMismatch ( ( ) ) ,
70+
3771 #[ error( "Column {0} is not contained in the input dataset" ) ]
3872 MissingColumn ( String ) ,
3973
@@ -42,6 +76,9 @@ pub enum EtcdFdwError {
4276
4377 #[ error( "Key {0} doesn't exist in etcd" ) ]
4478 KeyDoesntExist ( String ) ,
79+
80+ #[ error( "Invalid option '{0}' with value '{1}'" ) ]
81+ InvalidOption ( String , String ) ,
4582}
4683
4784impl From < EtcdFdwError > for ErrorReport {
@@ -50,20 +87,128 @@ impl From<EtcdFdwError> for ErrorReport {
5087 }
5188}
5289
90+ /// Check whether dependent options exits
91+ /// i.e username & pass, cert & key
92+ fn require_pair < T > (
93+ a : & Option < T > ,
94+ b : & Option < T > ,
95+ err : EtcdFdwError ,
96+ ) -> Result < ( ) , EtcdFdwError > {
97+ match ( a, b) {
98+ ( Some ( _) , None ) | ( None , Some ( _) ) => Err ( err) ,
99+ _ => Ok ( ( ) ) ,
100+ }
101+ }
102+
103+ /// Helper function for parsing timeouts
104+ fn parse_timeout (
105+ options : & std:: collections:: HashMap < String , String > ,
106+ key : & str ,
107+ default : Duration ,
108+ ) -> Result < Duration , EtcdFdwError > {
109+ if let Some ( val) = options. get ( key) {
110+ match val. parse :: < u64 > ( ) {
111+ Ok ( secs) => Ok ( Duration :: from_secs ( secs) ) ,
112+ Err ( _) => Err ( EtcdFdwError :: InvalidOption ( key. to_string ( ) , val. clone ( ) ) ) ,
113+ }
114+ } else {
115+ Ok ( default)
116+ }
117+ }
118+
119+
120+
121+ /// Use this to connect to etcd.
122+ /// Parse the certs/key paths and read them as bytes
123+ /// Sets the `TlsOptions` if available to support sll connection
124+ pub async fn connect_etcd ( config : EtcdConfig ) -> Result < Client , Error > {
125+ let mut connect_options = ConnectOptions :: new ( )
126+ . with_connect_timeout ( config. connect_timeout )
127+ . with_timeout ( config. request_timeout ) ;
128+
129+ let use_tls = config. ca_cert_path . is_some ( ) || config. client_cert_path . is_some ( ) ;
130+
131+ if use_tls {
132+ let mut tls_options = TlsOptions :: new ( ) ;
133+
134+ // Load CA cert if provided
135+ if let Some ( ca_path) = & config. ca_cert_path {
136+ let ca_bytes = std:: fs:: read ( ca_path) . map_err ( Error :: IoError ) ?;
137+ let ca_cert = Certificate :: from_pem ( ca_bytes) ;
138+ tls_options = tls_options. ca_certificate ( ca_cert) ;
139+ }
140+
141+ // Load client cert and key if both provided
142+ if let ( Some ( cert_path) , Some ( key_path) ) = ( & config. client_cert_path , & config. client_key_path ) {
143+ let cert_bytes = std:: fs:: read ( cert_path) . map_err ( Error :: IoError ) ?;
144+ let key_bytes = std:: fs:: read ( key_path) . map_err ( Error :: IoError ) ?;
145+ let identity = Identity :: from_pem ( cert_bytes, key_bytes) ;
146+ tls_options = tls_options. identity ( identity) ;
147+ }
148+
149+ // Load domain name if provided
150+ if let Some ( domain) = & config. servername {
151+ tls_options = tls_options. domain_name ( domain) ;
152+ }
153+
154+ connect_options = connect_options. with_tls ( tls_options) ;
155+ }
156+
157+ // Load Username and Password
158+ if let ( Some ( user) , Some ( pass) ) = ( & config. username , & config. password ) {
159+ connect_options = connect_options. with_user ( user, pass) ;
160+ }
161+
162+ let endpoints: Vec < & str > = config. endpoints . iter ( ) . map ( |s| s. as_str ( ) ) . collect ( ) ;
163+ Client :: connect ( endpoints, Some ( connect_options) ) . await
164+ }
165+
166+
53167type EtcdFdwResult < T > = std:: result:: Result < T , EtcdFdwError > ;
54168
55169impl ForeignDataWrapper < EtcdFdwError > for EtcdFdw {
56170 fn new ( server : ForeignServer ) -> EtcdFdwResult < EtcdFdw > {
171+ let mut config = EtcdConfig :: default ( ) ;
172+
57173 // Open connection to etcd specified through the server parameter
58174 let rt = tokio:: runtime:: Runtime :: new ( ) . expect ( "Tokio runtime should be initialized" ) ;
59175
60176 // Add parsing for the multi host connection string things here
61- let server_name = match server. options . get ( "connstr" ) {
62- Some ( x) => x,
177+ let connstr = match server. options . get ( "connstr" ) {
178+ Some ( x) => x. clone ( ) ,
63179 None => return Err ( EtcdFdwError :: NoConnStr ( ( ) ) ) ,
64180 } ;
65181
66- let client = match rt. block_on ( Client :: connect ( & [ server_name] , None ) ) {
182+ // TODO: username & pass should be captured separately i.e. from CREATE USER MAPPING
183+ let cacert_path = server. options . get ( "ssl_ca" ) . cloned ( ) ;
184+ let cert_path = server. options . get ( "ssl_cert" ) . cloned ( ) ;
185+ let key_path = server. options . get ( "ssl_key" ) . cloned ( ) ;
186+ let servername = server. options . get ( "ssl_servername" ) . cloned ( ) ;
187+ let username = server. options . get ( "username" ) . cloned ( ) ;
188+ let password = server. options . get ( "password" ) . cloned ( ) ;
189+
190+ // Parse timeouts with defaults
191+ let connect_timeout = parse_timeout ( & server. options , "connect_timeout" , config. connect_timeout ) ?;
192+ let request_timeout = parse_timeout ( & server. options , "request_timeout" , config. request_timeout ) ?;
193+
194+ // ssl_cert + ssl_key must be both present or both absent
195+ // username + password must be both present or both absent
196+ require_pair ( & cert_path, & key_path, EtcdFdwError :: CertKeyMismatch ( ( ) ) ) ?;
197+ require_pair ( & username, & password, EtcdFdwError :: UserPassMismatch ( ( ) ) ) ?;
198+
199+ config = EtcdConfig {
200+ endpoints : vec ! [ connstr] ,
201+ ca_cert_path : cacert_path,
202+ client_cert_path : cert_path,
203+ client_key_path : key_path,
204+ username : username,
205+ password : password,
206+ servername : servername,
207+ connect_timeout : connect_timeout,
208+ request_timeout : request_timeout,
209+ } ;
210+
211+ let client = match rt. block_on ( connect_etcd ( config) ) {
67212 Ok ( x) => x,
68213 Err ( e) => return Err ( EtcdFdwError :: ClientConnectionError ( e. to_string ( ) ) ) ,
69214 } ;
0 commit comments