Skip to content

Commit 544ce6b

Browse files
authored
Implement missing CRUD functionality (#13)
* fix,feat: Implement column name based filtering for the scan functions since we run into problems with delete and update if we don't. Implement insertion and deletion * feat: Implement the update functionality. All CRUD functions are now done
1 parent 223d807 commit 544ce6b

File tree

1 file changed

+145
-21
lines changed

1 file changed

+145
-21
lines changed

src/lib.rs

Lines changed: 145 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
use etcd_client::{Client, GetOptions, KeyValue};
2-
use futures::Stream;
1+
use etcd_client::{Client, DeleteOptions, GetOptions, KeyValue, PutOptions};
32
use pgrx::pg_sys::panic::ErrorReport;
4-
use pgrx::pg_sys::ErrorContextCallback;
53
use pgrx::PgSqlErrorCode;
64
use supabase_wrappers::prelude::*;
75
use thiserror::Error;
8-
use tokio::runtime::*;
96

107
pgrx::pg_module_magic!();
118

@@ -19,23 +16,37 @@ pub(crate) struct EtcdFdw {
1916
rt: Runtime,
2017
prefix: String,
2118
fetch_results: Vec<KeyValue>,
19+
fetch_key: bool,
20+
fetch_value: bool,
2221
}
2322

2423
#[derive(Error, Debug)]
2524
pub enum EtcdFdwError {
26-
#[error("Failed to fetch from etcd")]
25+
#[error("Failed to fetch from etcd: {0}")]
2726
FetchError(String),
2827

29-
#[error("Failed to connect to client")]
28+
#[error("Failed to send update to etcd: {0}")]
29+
UpdateError(String),
30+
31+
#[error("Failed to connect to client: {0}")]
3032
ClientConnectionError(String),
3133

3234
#[error("No connection string option was specified. Specify it with connstr")]
3335
NoConnStr(()),
36+
37+
#[error("Column {0} is not contained in the input dataset")]
38+
MissingColumn(String),
39+
40+
#[error("Key {0} already exists in etcd. No duplicates allowed")]
41+
KeyAlreadyExists(String),
42+
43+
#[error("Key {0} doesn't exist in etcd")]
44+
KeyDoesntExist(String),
3445
}
3546

3647
impl From<EtcdFdwError> for ErrorReport {
37-
fn from(_value: EtcdFdwError) -> Self {
38-
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, "", "")
48+
fn from(value: EtcdFdwError) -> Self {
49+
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{}", value), "")
3950
}
4051
}
4152

@@ -44,7 +55,7 @@ type EtcdFdwResult<T> = std::result::Result<T, EtcdFdwError>;
4455
impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
4556
fn new(server: ForeignServer) -> EtcdFdwResult<EtcdFdw> {
4657
// Open connection to etcd specified through the server parameter
47-
let rt = tokio::runtime::Runtime::new().unwrap();
58+
let rt = tokio::runtime::Runtime::new().expect("Tokio runtime should be initialized");
4859

4960
// Add parsing for the multi host connection string things here
5061
let server_name = match server.options.get("connstr") {
@@ -67,13 +78,15 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
6778
rt,
6879
prefix,
6980
fetch_results,
81+
fetch_key: false,
82+
fetch_value: false,
7083
})
7184
}
7285

7386
fn begin_scan(
7487
&mut self,
7588
_quals: &[Qual],
76-
_columns: &[Column],
89+
columns: &[Column],
7790
_sorts: &[Sort],
7891
limit: &Option<Limit>,
7992
_options: &std::collections::HashMap<String, String>,
@@ -85,7 +98,10 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
8598
Some(x) => get_options = get_options.with_limit(x.count),
8699
None => (),
87100
}
88-
// Also do quals, columns and sorts.
101+
// Check if columns contains key and value
102+
let colnames: Vec<String> = columns.iter().map(|x| x.name.clone()).collect();
103+
self.fetch_key = colnames.contains(&String::from("key"));
104+
self.fetch_value = colnames.contains(&String::from("value"));
89105

90106
let result = self
91107
.rt
@@ -110,34 +126,141 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
110126
let value = x
111127
.value_str()
112128
.expect("Expected a value, but the value was empty");
113-
row.push("key", Some(Cell::String(key.to_string())));
114-
row.push("value", Some(Cell::String(value.to_string())));
129+
if self.fetch_key {
130+
row.push("key", Some(Cell::String(key.to_string())));
131+
}
132+
if self.fetch_value {
133+
row.push("value", Some(Cell::String(value.to_string())));
134+
}
115135
}))
116136
}
117137
}
118138

119139
fn end_scan(&mut self) -> EtcdFdwResult<()> {
120140
self.fetch_results = vec![];
141+
self.fetch_key = false;
142+
self.fetch_value = false;
121143
Ok(())
122144
}
123145

124146
fn begin_modify(
125147
&mut self,
126148
_options: &std::collections::HashMap<String, String>,
127149
) -> Result<(), EtcdFdwError> {
128-
todo!("Begin modify is not yet implemented")
150+
// This currently does nothing
151+
Ok(())
129152
}
130153

131-
fn insert(&mut self, _row: &Row) -> Result<(), EtcdFdwError> {
132-
todo!("Insert is not yet implemented")
154+
fn insert(&mut self, row: &Row) -> Result<(), EtcdFdwError> {
155+
let key_string = match row
156+
.cols
157+
.iter()
158+
.zip(row.cells.clone())
159+
.filter(|(name, _cell)| *name == "key")
160+
.last()
161+
{
162+
Some(x) => x.1.expect("The key column should be present").to_string(),
163+
None => return Err(EtcdFdwError::MissingColumn("key".to_string())),
164+
};
165+
let value_string = match row
166+
.cols
167+
.iter()
168+
.zip(row.cells.clone())
169+
.filter(|(name, _cell)| *name == "value")
170+
.last()
171+
{
172+
Some(x) => x.1.expect("The value column should be present").to_string(),
173+
None => return Err(EtcdFdwError::MissingColumn("value".to_string())),
174+
};
175+
let key = key_string.trim_matches(|x| x == '\'');
176+
let value = value_string.trim_matches(|x| x == '\'');
177+
178+
// See if key already exists. Error if it does
179+
match self.rt.block_on(self.client.get(key, None)) {
180+
Ok(x) => {
181+
if let Some(y) = x.kvs().first() {
182+
if y.key_str().expect("There should be a key string") == key {
183+
return Err(EtcdFdwError::KeyAlreadyExists(format!("{}", key)));
184+
}
185+
}
186+
}
187+
Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())),
188+
}
189+
190+
match self
191+
.rt
192+
.block_on(self.client.put(key, value, Some(PutOptions::new())))
193+
{
194+
Ok(_) => Ok(()),
195+
Err(e) => return Err(EtcdFdwError::UpdateError(e.to_string())),
196+
}
133197
}
134198

135-
fn update(&mut self, _rowid: &Cell, _new_row: &Row) -> Result<(), EtcdFdwError> {
136-
todo!("Update is not yet implemented")
199+
fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), EtcdFdwError> {
200+
let key_string = rowid.to_string();
201+
let key = key_string.trim_matches(|x| x == '\'');
202+
203+
match self.rt.block_on(self.client.get(key, None)) {
204+
Ok(x) => {
205+
if let Some(y) = x.kvs().first() {
206+
if y.key_str().expect("There should be a key string") != key {
207+
return Err(EtcdFdwError::KeyDoesntExist(format!("{}", key)));
208+
}
209+
}
210+
}
211+
Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())),
212+
}
213+
214+
let value_string = match new_row
215+
.cols
216+
.iter()
217+
.zip(new_row.cells.clone())
218+
.filter(|(name, _cell)| *name == "value")
219+
.last()
220+
{
221+
Some(x) => x.1.expect("The value column should be present").to_string(),
222+
None => return Err(EtcdFdwError::MissingColumn("value".to_string())),
223+
};
224+
let value = value_string.trim_matches(|x| x == '\'');
225+
226+
match self.rt.block_on(self.client.put(key, value, None)) {
227+
Ok(_) => Ok(()),
228+
Err(e) => return Err(EtcdFdwError::UpdateError(e.to_string())),
229+
}
137230
}
138231

139-
fn delete(&mut self, _rowid: &Cell) -> Result<(), EtcdFdwError> {
140-
todo!("Delete is not yet implemented")
232+
fn delete(&mut self, rowid: &Cell) -> Result<(), EtcdFdwError> {
233+
let key_string = rowid.to_string();
234+
let key = key_string.trim_matches(|x| x == '\'');
235+
236+
let delete_options = DeleteOptions::new();
237+
238+
match self.rt.block_on(self.client.get(key, None)) {
239+
Ok(x) => {
240+
if let Some(y) = x.kvs().first() {
241+
if y.key_str().expect("There should be a key string") != key {
242+
return Err(EtcdFdwError::KeyDoesntExist(format!("{}", key)));
243+
}
244+
}
245+
}
246+
Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())),
247+
}
248+
249+
match self
250+
.rt
251+
.block_on(self.client.delete(key, Some(delete_options)))
252+
{
253+
Ok(x) => {
254+
if x.deleted() == 0 {
255+
return Err(EtcdFdwError::UpdateError(format!(
256+
"Deletion seemingly successful, but deleted count is {}",
257+
x.deleted()
258+
)));
259+
}
260+
Ok(())
261+
}
262+
Err(e) => Err(EtcdFdwError::UpdateError(e.to_string())),
263+
}
141264
}
142265

143266
// fn get_rel_size(
@@ -152,6 +275,7 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
152275
// }
153276

154277
fn end_modify(&mut self) -> Result<(), EtcdFdwError> {
155-
todo!()
278+
// This currently also does nothing
279+
Ok(())
156280
}
157281
}

0 commit comments

Comments
 (0)