Skip to content

Commit 5119856

Browse files
Add more CREATE FOREIGN TABLE options (#21)
Add more CREATE FOREIGN TABLE options and improve options handling and validation. prefix keys_only revision key range_eng consistency
1 parent 4249520 commit 5119856

File tree

2 files changed

+150
-21
lines changed

2 files changed

+150
-21
lines changed

README.md

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ CREATE SERVER my_etcd_server foreign data wrapper etcd_fdw options (connstr '127
2929
```
3030

3131
```sql
32-
CREATE foreign table test (key text, value text) server my_etcd_server options(rowid 'key');
32+
CREATE foreign table test (key text, value text) server my_etcd_server options(rowid_column 'key');
3333
```
3434

3535
```sql
@@ -93,6 +93,57 @@ Usage
9393

9494
Timeout in seconds to each request after the connection has been established.
9595

96+
97+
## CREATE FOREIGN TABLE options
98+
99+
`etcd_fdw` accepts the following table-level options via the
100+
`CREATE FOREIGN TABLE` command.
101+
102+
- **rowid_column** as *string*, mandatory, no default
103+
104+
Specifies which column should be treated as the unique row identifier.
105+
Usually set to key.
106+
107+
- **prefix** as *string*, optional, no default
108+
109+
Restrict the scan to keys beginning with this prefix.
110+
If not provided, the FDW will fetch all keys from the etcd server
111+
112+
- **keys_only** as *string*, optional, default `false`
113+
114+
If set to true, only the keys are fetched, not the values.
115+
Useful to reduce network overhead when values are not needed.
116+
117+
- **revision** as *string*, optional, default `0`
118+
119+
Read key-value data at a specific etcd revision.
120+
If 0, the latest revision is used.
121+
122+
- **key** as *string*, optional, no default
123+
124+
The starting key to fetch from etcd.
125+
126+
This option defines the beginning of the range.
127+
If neither `prefix` nor `key` is specified, the FDW will default to `\0` (the lowest possible key).
128+
129+
- **range_end** as *string*, optional, no default
130+
131+
The exclusive end of the key range. Restricts the scan to the half-open interval `[key, range_end)`.
132+
133+
All keys between key (inclusive) and range_end (exclusive) will be returned.
134+
If range_end is omitted, only the single key defined by key will be returned (unless prefix is used).
135+
136+
- **consistency** as *string*, optional, default `l`
137+
138+
Specifies the read consistency level for etcd queries.
139+
140+
141+
Linearizable(`l`), Ensures the result reflects the latest consensus state of the cluster.
142+
Linearizable reads have higher latency but guarantee fresh data.
143+
144+
Serializable(`s`), Allows serving results from a local etcd member without cluster-wide consensus.
145+
Serializable reads are faster and lighter on the cluster, but may return stale data in some cases
146+
96147
## What doesn't work
97148
etcd_fdw supports almost all kinds of CRUD operations. What doesn't work is modifying the key (which is the rowid value) directly using `UPDATE` statements.
98149
What does work is the following workflow:

src/lib.rs

Lines changed: 98 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ pgrx::pg_module_magic!();
1616
pub(crate) struct EtcdFdw {
1717
client: Client,
1818
rt: Runtime,
19-
prefix: String,
2019
fetch_results: Vec<KeyValue>,
2120
fetch_key: bool,
2221
fetch_value: bool,
@@ -75,11 +74,20 @@ pub enum EtcdFdwError {
7574
#[error("Key {0} already exists in etcd. No duplicates allowed")]
7675
KeyAlreadyExists(String),
7776

77+
#[error("Options 'prefix' and 'range_end' cannot be used together")]
78+
ConflictingPrefixAndRange,
79+
80+
#[error("Options 'prefix' and 'key' should not be used together")]
81+
ConflictingPrefixAndKey,
82+
7883
#[error("Key {0} doesn't exist in etcd")]
7984
KeyDoesntExist(String),
8085

8186
#[error("Invalid option '{0}' with value '{1}'")]
8287
InvalidOption(String, String),
88+
89+
#[error("{0}")]
90+
OptionsError(#[from] OptionsError),
8391
}
8492

8593
impl From<EtcdFdwError> for ErrorReport {
@@ -90,13 +98,13 @@ impl From<EtcdFdwError> for ErrorReport {
9098

9199
/// Check whether dependent options exits
92100
/// i.e username & pass, cert & key
93-
fn require_pair<T>(
94-
a: &Option<T>,
95-
b: &Option<T>,
101+
fn require_pair(
102+
a: bool,
103+
b: bool,
96104
err: EtcdFdwError,
97105
) -> Result<(), EtcdFdwError> {
98106
match (a, b) {
99-
(Some(_), None) | (None, Some(_)) => Err(err),
107+
(true, false) | (false, true) => Err(err),
100108
_ => Ok(()),
101109
}
102110
}
@@ -194,8 +202,8 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
194202

195203
// ssl_cert + ssl_key must be both present or both absent
196204
// username + password must be both present or both absent
197-
require_pair(&cert_path, &key_path, EtcdFdwError::CertKeyMismatch(()))?;
198-
require_pair(&username, &password, EtcdFdwError::UserPassMismatch(()))?;
205+
require_pair(cert_path.is_some(), key_path.is_some(), EtcdFdwError::CertKeyMismatch(()))?;
206+
require_pair(username.is_some(), password.is_some(), EtcdFdwError::UserPassMismatch(()))?;
199207

200208
config = EtcdConfig {
201209
endpoints: vec![connstr],
@@ -213,16 +221,12 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
213221
Ok(x) => x,
214222
Err(e) => return Err(EtcdFdwError::ClientConnectionError(e.to_string())),
215223
};
216-
let prefix = match server.options.get("prefix") {
217-
Some(x) => x.clone(),
218-
None => String::from(""),
219-
};
224+
220225
let fetch_results = vec![];
221226

222227
Ok(Self {
223228
client,
224229
rt,
225-
prefix,
226230
fetch_results,
227231
fetch_key: false,
228232
fetch_value: false,
@@ -235,23 +239,65 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
235239
columns: &[Column],
236240
_sorts: &[Sort],
237241
limit: &Option<Limit>,
238-
_options: &std::collections::HashMap<String, String>,
242+
options: &std::collections::HashMap<String, String>,
239243
) -> Result<(), EtcdFdwError> {
240-
// Select get all rows as a result into a field of the struct
241-
// Build Query options from parameters
242-
let mut get_options = GetOptions::new().with_all_keys();
243-
match limit {
244-
Some(x) => get_options = get_options.with_limit(x.count),
245-
None => (),
244+
// parse the options defined when `CREATE FOREIGN TABLE`
245+
let prefix = options.get("prefix").cloned();
246+
let range_end = options.get("range_end").cloned();
247+
let key_start = options.get("key").cloned();
248+
let keys_only = options.get("keys_only").map(|v| v == "true").unwrap_or(false);
249+
let revision = options.get("revision").and_then(|v| v.parse::<i64>().ok()).unwrap_or(0);
250+
let serializable = options.get("consistency").map(|v| v == "s").unwrap_or(false);
251+
let mut get_options = GetOptions::new();
252+
253+
// prefix and range are mutually exclusive
254+
match (prefix.as_ref(), range_end.as_ref()) {
255+
(Some(_), Some(_)) => {
256+
return Err(EtcdFdwError::ConflictingPrefixAndRange);
257+
}
258+
(Some(_), None) => {
259+
get_options = get_options.with_prefix();
260+
}
261+
(None, Some(r)) => {
262+
get_options = get_options.with_range(r.clone());
263+
}
264+
(None, None) => {
265+
if key_start.is_none() {
266+
get_options = get_options.with_all_keys();
267+
}
268+
}
269+
}
270+
271+
if let Some(x) = limit {
272+
get_options = get_options.with_limit(x.count);
273+
}
274+
275+
if keys_only {
276+
get_options = get_options.with_keys_only();
277+
}
278+
279+
if revision > 0 {
280+
get_options = get_options.with_revision(revision);
246281
}
282+
283+
if serializable {
284+
get_options = get_options.with_serializable();
285+
}
286+
287+
// preference order : prefix > key_start > default "\0"
288+
// samllest possible valid key '\0'
289+
let key = prefix.clone()
290+
.or_else(|| key_start.clone())
291+
.unwrap_or_else(|| String::from("\0"));
292+
247293
// Check if columns contains key and value
248294
let colnames: Vec<String> = columns.iter().map(|x| x.name.clone()).collect();
249295
self.fetch_key = colnames.contains(&String::from("key"));
250296
self.fetch_value = colnames.contains(&String::from("value"));
251297

252298
let result = self
253299
.rt
254-
.block_on(self.client.get(self.prefix.clone(), Some(get_options)));
300+
.block_on(self.client.get(key, Some(get_options)));
255301
let mut result_unwrapped = match result {
256302
Ok(x) => x,
257303
Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())),
@@ -424,6 +470,38 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
424470
// This currently also does nothing
425471
Ok(())
426472
}
473+
474+
fn validator(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) -> EtcdFdwResult<()> {
475+
if let Some(oid) = catalog {
476+
if oid == FOREIGN_SERVER_RELATION_ID {
477+
check_options_contain(&options, "connstr")?;
478+
479+
let cacert_path_exists = check_options_contain(&options, "ssl_ca").is_ok();
480+
let cert_path_exists = check_options_contain(&options, "ssl_cert").is_ok();
481+
let username_exists = check_options_contain(&options, "username").is_ok();
482+
let password_exists = check_options_contain(&options, "password").is_ok();
483+
484+
require_pair(cacert_path_exists, cert_path_exists, EtcdFdwError::CertKeyMismatch(()))?;
485+
require_pair(username_exists, password_exists, EtcdFdwError::UserPassMismatch(()))?;
486+
} else if oid == FOREIGN_TABLE_RELATION_ID {
487+
check_options_contain(&options, "rowid_column")?;
488+
489+
let prefix_exists = check_options_contain(&options, "prefix").is_ok();
490+
let rannge_exists = check_options_contain(&options, "range_end").is_ok();
491+
let key_exists = check_options_contain(&options, "key").is_ok();
492+
493+
if prefix_exists && rannge_exists {
494+
return Err(EtcdFdwError::ConflictingPrefixAndRange);
495+
}
496+
497+
if prefix_exists && key_exists {
498+
return Err(EtcdFdwError::ConflictingPrefixAndKey);
499+
}
500+
}
501+
}
502+
503+
Ok(())
504+
}
427505
}
428506

429507
#[cfg(test)]

0 commit comments

Comments
 (0)