Skip to content

Commit 7ec10e7

Browse files
committed
feat: add spring_push and spring_source_row_{from_json, close}
1 parent 4b3b52e commit 7ec10e7

4 files changed

Lines changed: 139 additions & 6 deletions

File tree

springql.h

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,15 @@ typedef void *SpringConfig;
5050
typedef void *SpringPipeline;
5151

5252
/**
53-
* Row object from an in memory queue.
53+
* Row object to pop from an in memory queue.
5454
*/
5555
typedef void *SpringSinkRow;
5656

57+
/**
58+
* Row object to push into an in memory queue.
59+
*/
60+
typedef void *SpringSourceRow;
61+
5762
/**
5863
* Returns default configuration.
5964
*
@@ -165,14 +170,50 @@ SpringSinkRow *spring_pop_non_blocking(const SpringPipeline *pipeline,
165170
bool *is_err);
166171

167172
/**
168-
* Frees heap occupied by a `SpringRow`.
173+
* Push a row into an in memory queue. This is a non-blocking function.
174+
*
175+
* # Returns
176+
*
177+
* - `Ok`: on success.
178+
* - `Unavailable`: queue named `queue` does not exist.
179+
*/
180+
enum SpringErrno spring_push(const SpringPipeline *pipeline,
181+
const char *queue,
182+
const SpringSourceRow *row);
183+
184+
/**
185+
* Create a source row from JSON string
186+
*
187+
* # Returns
188+
*
189+
* - non-NULL: Successfully created a row.
190+
* - NULL: Error occurred.
191+
*
192+
* # Errors
193+
*
194+
* - `InvalidFormat`: JSON string is invalid.
195+
*/
196+
SpringSourceRow *spring_source_row_from_json(const char *json);
197+
198+
/**
199+
* Frees heap occupied by a `SpringSourceRow`.
200+
*
201+
* # Returns
202+
*
203+
* - `Ok`: on success.
204+
* - `CNull`: `pipeline` is a NULL pointer.
205+
*/
206+
enum SpringErrno spring_source_row_close(SpringSourceRow *row);
207+
208+
/**
209+
* Frees heap occupied by a `SpringSinkRow`.
169210
*
170211
* # Returns
171212
*
172213
* - `Ok`: on success.
173214
* - `CNull`: `pipeline` is a NULL pointer.
174215
*/
175-
enum SpringErrno spring_row_close(SpringSinkRow *row);
216+
enum SpringErrno spring_sink_row_close(SpringSinkRow *row);
176217

177218
/**
178219
* Get a 2-byte integer column.

src/lib.rs

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod spring_errno;
1010
pub mod spring_last_err;
1111
mod spring_pipeline;
1212
mod spring_sink_row;
13+
mod spring_source_row;
1314

1415
use std::{
1516
ffi::{c_void, CStr},
@@ -25,6 +26,7 @@ use crate::{
2526
spring_last_err::{update_last_error, LastError},
2627
spring_pipeline::SpringPipeline,
2728
spring_sink_row::SpringSinkRow,
29+
spring_source_row::SpringSourceRow,
2830
};
2931
use ::springql::{error::SpringError, SpringPipeline as Pipeline};
3032

@@ -212,14 +214,72 @@ pub unsafe extern "C" fn spring_pop_non_blocking(
212214
}
213215
}
214216

215-
/// Frees heap occupied by a `SpringRow`.
217+
/// Push a row into an in memory queue. This is a non-blocking function.
218+
///
219+
/// # Returns
220+
///
221+
/// - `Ok`: on success.
222+
/// - `Unavailable`: queue named `queue` does not exist.
223+
#[no_mangle]
224+
pub unsafe extern "C" fn spring_push(
225+
pipeline: *const SpringPipeline,
226+
queue: *const c_char,
227+
row: *const SpringSourceRow,
228+
) -> SpringErrno {
229+
let pipeline = (*pipeline).as_pipeline();
230+
let queue = CStr::from_ptr(queue).to_string_lossy().into_owned();
231+
let source_row = (*row).to_row();
232+
let result = with_catch(|| pipeline.push(&queue, source_row));
233+
match result {
234+
Ok(()) => SpringErrno::Ok,
235+
Err(e) => e,
236+
}
237+
}
238+
239+
/// Create a source row from JSON string
240+
///
241+
/// # Returns
242+
///
243+
/// - non-NULL: Successfully created a row.
244+
/// - NULL: Error occurred.
245+
///
246+
/// # Errors
247+
///
248+
/// - `InvalidFormat`: JSON string is invalid.
249+
#[no_mangle]
250+
pub unsafe extern "C" fn spring_source_row_from_json(json: *const c_char) -> *mut SpringSourceRow {
251+
let json = CStr::from_ptr(json).to_string_lossy().into_owned();
252+
let res_source_row = with_catch(|| ::springql::SpringSourceRow::from_json(&json));
253+
match res_source_row {
254+
Ok(source_row) => SpringSourceRow::new(source_row).into_ptr(),
255+
Err(_) => ptr::null_mut(),
256+
}
257+
}
258+
259+
/// Frees heap occupied by a `SpringSourceRow`.
260+
///
261+
/// # Returns
262+
///
263+
/// - `Ok`: on success.
264+
/// - `CNull`: `pipeline` is a NULL pointer.
265+
#[no_mangle]
266+
pub extern "C" fn spring_source_row_close(row: *mut SpringSourceRow) -> SpringErrno {
267+
if row.is_null() {
268+
SpringErrno::CNull
269+
} else {
270+
SpringSourceRow::drop(row);
271+
SpringErrno::Ok
272+
}
273+
}
274+
275+
/// Frees heap occupied by a `SpringSinkRow`.
216276
///
217277
/// # Returns
218278
///
219279
/// - `Ok`: on success.
220280
/// - `CNull`: `pipeline` is a NULL pointer.
221281
#[no_mangle]
222-
pub extern "C" fn spring_row_close(row: *mut SpringSinkRow) -> SpringErrno {
282+
pub extern "C" fn spring_sink_row_close(row: *mut SpringSinkRow) -> SpringErrno {
223283
if row.is_null() {
224284
SpringErrno::CNull
225285
} else {

src/spring_sink_row.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
use ::springql::SpringSinkRow as SinkRow;
44

55
use std::{ffi::c_void, mem};
6-
/// Row object from an in memory queue.
6+
7+
/// Row object to pop from an in memory queue.
78
#[non_exhaustive]
89
#[repr(transparent)]
910
pub struct SpringSinkRow(*mut c_void);

src/spring_source_row.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// This file is part of https://github.com/SpringQL/SpringQL-client-c which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
2+
3+
use ::springql::SpringSourceRow as SourceRow;
4+
5+
use std::{ffi::c_void, mem};
6+
7+
/// Row object to push into an in memory queue.
8+
#[non_exhaustive]
9+
#[repr(transparent)]
10+
pub struct SpringSourceRow(*mut c_void);
11+
12+
impl SpringSourceRow {
13+
pub fn new(source_row: SourceRow) -> Self {
14+
SpringSourceRow(unsafe { mem::transmute(Box::new(source_row)) })
15+
}
16+
17+
pub fn to_row(&self) -> SourceRow {
18+
unsafe { &*(self.0 as *const SourceRow) }.clone()
19+
}
20+
21+
pub fn drop(ptr: *mut SpringSourceRow) {
22+
let outer = unsafe { Box::from_raw(ptr) };
23+
let inner = unsafe { Box::from_raw(outer.0) };
24+
drop(inner);
25+
drop(outer);
26+
}
27+
28+
pub fn into_ptr(self) -> *mut SpringSourceRow {
29+
Box::into_raw(Box::new(self))
30+
}
31+
}

0 commit comments

Comments
 (0)