Skip to content

Commit 06974d9

Browse files
authored
feat: Ability to set a marker and roll back a buffer. General Python testing improvements. (#17)
1 parent c2d46ea commit 06974d9

File tree

10 files changed

+344
-98
lines changed

10 files changed

+344
-98
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,6 @@ ffi = ["insecure_skip_verify"]
4242
# Auto-generate the header. This is for dev-debugging-diffing only.
4343
# A hand-crafted header is easier on the eyes.
4444
gen_h = ["ffi", "cbindgen"]
45+
46+
# Auto-generate a Cython .pxd file.
47+
gen_cython = ["ffi", "cbindgen"]

build.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,26 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
191191
bindings.write_to_file("include/questdb/ilp/line_sender.h");
192192
}
193193

194+
#[cfg(feature = "gen_cython")]
195+
{
196+
let crate_dir = std::env::var("CARGO_MANIFEST_DIR")?;
197+
198+
let config = cbindgen::Config {
199+
language: cbindgen::Language::Cython,
200+
documentation: false,
201+
cython: cbindgen::CythonConfig {
202+
header: Some("questdb/ilp/line_sender.h".to_owned()),
203+
cimports: std::collections::BTreeMap::new()},
204+
..Default::default()
205+
};
206+
207+
let bindings = cbindgen::Builder::new()
208+
.with_crate(crate_dir)
209+
.with_config(config)
210+
.generate()?;
211+
bindings.write_to_file("cython/questdb/ilp/line_sender.pxd");
212+
}
213+
194214
json_tests::build()?;
195215

196216
Ok(())

cpp_test/test_line_sender.cpp

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,4 +607,46 @@ TEST_CASE("Test timestamp column.")
607607

608608
CHECK(server.recv() == 1);
609609
CHECK(server.msgs()[0] == exp);
610+
}
611+
612+
TEST_CASE("Test Marker")
613+
{
614+
questdb::ilp::line_sender_buffer buffer;
615+
buffer.clear_marker();
616+
buffer.clear_marker();
617+
618+
buffer.set_marker();
619+
buffer.table("test");
620+
CHECK(buffer.peek() == "test");
621+
CHECK(buffer.size() == 4);
622+
623+
buffer.rewind_to_marker();
624+
CHECK(buffer.peek() == "");
625+
CHECK(buffer.size() == 0);
626+
627+
// Can't rewind, no marker set: Cleared by `rewind_to_marker`.
628+
CHECK_THROWS_AS(buffer.rewind_to_marker(), questdb::ilp::line_sender_error);
629+
630+
buffer.table("a").symbol("b", "c");
631+
CHECK_THROWS_AS(buffer.set_marker(), questdb::ilp::line_sender_error);
632+
CHECK_THROWS_AS(buffer.rewind_to_marker(), questdb::ilp::line_sender_error);
633+
CHECK(buffer.peek() == "a,b=c");
634+
635+
buffer.at_now();
636+
CHECK(buffer.peek() == "a,b=c\n");
637+
638+
buffer.set_marker();
639+
buffer.clear_marker();
640+
buffer.clear_marker();
641+
CHECK_THROWS_AS(buffer.rewind_to_marker(), questdb::ilp::line_sender_error);
642+
buffer.set_marker();
643+
buffer.table("d").symbol("e", "f");
644+
CHECK(buffer.peek() == "a,b=c\nd,e=f");
645+
646+
buffer.rewind_to_marker();
647+
CHECK(buffer.peek() == "a,b=c\n");
648+
649+
buffer.clear();
650+
CHECK(buffer.peek() == "");
651+
CHECK_THROWS_AS(buffer.rewind_to_marker(), questdb::ilp::line_sender_error);
610652
}

include/questdb/ilp/line_sender.h

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,11 @@ typedef enum line_sender_error_code
7575
LINESENDER_API
7676
line_sender_error_code line_sender_error_get_code(const line_sender_error*);
7777

78-
/** ASCII encoded error message. Never returns NULL. */
78+
/**
79+
* UTF-8 encoded error message. Never returns NULL.
80+
* The `len_out` argument is set to the number of bytes in the string.
81+
* The string is NOT null-terminated.
82+
*/
7983
LINESENDER_API
8084
const char* line_sender_error_msg(const line_sender_error*, size_t* len_out);
8185

@@ -86,7 +90,10 @@ void line_sender_error_free(line_sender_error*);
8690

8791
/////////// Preparing strings and names
8892

89-
/** Non-owning validated UTF-8 encoded string. */
93+
/**
94+
* Non-owning validated UTF-8 encoded string.
95+
* The string need not be null-terminated.
96+
*/
9097
typedef struct line_sender_utf8
9198
{
9299
// Don't initialize fields directly.
@@ -100,7 +107,7 @@ typedef struct line_sender_utf8
100107
*
101108
* @param[out] str The object to be initialized.
102109
* @param[in] len Length in bytes of the buffer.
103-
* @param[in] buf UTF-8 encoded buffer.
110+
* @param[in] buf UTF-8 encoded buffer. Need not be null-terminated.
104111
* @param[out] err_out Set on error.
105112
* @return true on success, false on error.
106113
*/
@@ -124,7 +131,10 @@ line_sender_utf8 line_sender_utf8_assert(size_t len, const char* buf);
124131
#define QDB_UTF8_LITERAL(literal) \
125132
line_sender_utf8_assert(sizeof(literal) - 1, (literal))
126133

127-
/** Non-owning validated table, symbol or column name. UTF-8 encoded. */
134+
/**
135+
* Non-owning validated table, symbol or column name. UTF-8 encoded.
136+
* Need not be null-terminated.
137+
*/
128138
typedef struct line_sender_table_name
129139
{
130140
// Don't initialize fields directly.
@@ -139,7 +149,7 @@ typedef struct line_sender_table_name
139149
*
140150
* @param[out] name The object to be initialized.
141151
* @param[in] len Length in bytes of the buffer.
142-
* @param[in] buf UTF-8 encoded buffer.
152+
* @param[in] buf UTF-8 encoded buffer. Need not be null-terminated.
143153
* @param[out] err_out Set on error.
144154
* @return true on success, false on error.
145155
*/
@@ -166,7 +176,10 @@ line_sender_table_name line_sender_table_name_assert(
166176
#define QDB_TABLE_NAME_LITERAL(literal) \
167177
line_sender_table_name_assert(sizeof(literal) - 1, (literal))
168178

169-
/** Non-owning validated table, symbol or column name. UTF-8 encoded. */
179+
/**
180+
* Non-owning validated table, symbol or column name. UTF-8 encoded.
181+
* Need not be null-terminated.
182+
*/
170183
typedef struct line_sender_column_name
171184
{
172185
// Don't initialize fields directly.
@@ -181,7 +194,7 @@ typedef struct line_sender_column_name
181194
*
182195
* @param[out] name The object to be initialized.
183196
* @param[in] len Length in bytes of the buffer.
184-
* @param[in] buf UTF-8 encoded buffer.
197+
* @param[in] buf UTF-8 encoded buffer. Need not be null-terminated.
185198
* @param[out] err_out Set on error.
186199
* @return true on success, false on error.
187200
*/
@@ -236,6 +249,7 @@ line_sender_buffer* line_sender_buffer_clone(const line_sender_buffer* buffer);
236249
/**
237250
* Pre-allocate to ensure the buffer has enough capacity for at least the
238251
* specified additional byte count. This may be rounded up.
252+
* This does not allocate if such additional capacity is already satisfied.
239253
* See: `capacity`.
240254
*/
241255
LINESENDER_API
@@ -247,6 +261,32 @@ void line_sender_buffer_reserve(
247261
LINESENDER_API
248262
size_t line_sender_buffer_capacity(const line_sender_buffer* buffer);
249263

264+
/**
265+
* Mark a rewind point.
266+
* This allows undoing accumulated changes to the buffer for one or more
267+
* rows by calling `rewind_to_marker`.
268+
* Any previous marker will be discarded.
269+
* Once the marker is no longer needed, call `clear_marker`.
270+
*/
271+
LINESENDER_API
272+
bool line_sender_buffer_set_marker(
273+
line_sender_buffer* buffer,
274+
line_sender_error** err_out);
275+
276+
/**
277+
* Undo all changes since the last `set_marker` call.
278+
* As a side-effect, this also clears the marker.
279+
*/
280+
LINESENDER_API
281+
bool line_sender_buffer_rewind_to_marker(
282+
line_sender_buffer* buffer,
283+
line_sender_error** err_out);
284+
285+
/** Discard the marker. */
286+
LINESENDER_API
287+
void line_sender_buffer_clear_marker(
288+
line_sender_buffer* buffer);
289+
250290
/**
251291
* Remove all accumulated data and prepare the buffer for new lines.
252292
* This does not affect the buffer's capacity.

include/questdb/ilp/line_sender.hpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,13 @@ namespace questdb::ilp
317317
return *this;
318318
}
319319

320+
/**
321+
* Pre-allocate to ensure the buffer has enough capacity for at least
322+
* the specified additional byte count. This may be rounded up.
323+
* This does not allocate if such additional capacity is already
324+
* satisfied.
325+
* See: `capacity`.
326+
*/
320327
void reserve(size_t additional)
321328
{
322329
may_init();
@@ -357,6 +364,28 @@ namespace questdb::ilp
357364
}
358365
}
359366

367+
void set_marker()
368+
{
369+
may_init();
370+
line_sender_error::wrapped_call(
371+
::line_sender_buffer_set_marker, _impl);
372+
}
373+
374+
void rewind_to_marker()
375+
{
376+
may_init();
377+
line_sender_error::wrapped_call(
378+
::line_sender_buffer_rewind_to_marker, _impl);
379+
}
380+
381+
void clear_marker() noexcept
382+
{
383+
if (_impl)
384+
{
385+
::line_sender_buffer_clear_marker(_impl);
386+
}
387+
}
388+
360389
void clear() noexcept
361390
{
362391
if (_impl)

src/ffi.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,17 @@ pub unsafe extern "C" fn line_sender_error_get_code(
134134
(&*error).0.code().into()
135135
}
136136

137-
/// ASCII encoded error message. Never returns NULL.
137+
/// UTF-8 encoded error message. Never returns NULL.
138+
/// The `len_out` argument is set to the number of bytes in the string.
139+
/// The string is NOT null-terminated.
138140
#[no_mangle]
139141
pub unsafe extern "C" fn line_sender_error_msg(
140142
error: *const line_sender_error,
141143
len_out: *mut size_t) -> *const c_char
142144
{
143145
let msg: &str = &(&*error).0.msg;
144146
*len_out = msg.len();
145-
msg.as_ptr() as *mut i8
147+
msg.as_ptr() as *mut c_char
146148
}
147149

148150
/// Clean up the error.
@@ -154,6 +156,7 @@ pub unsafe extern "C" fn line_sender_error_free(error: *mut line_sender_error) {
154156
}
155157

156158
/// Non-owning validated UTF-8 encoded string.
159+
/// The string need not be null-terminated.
157160
#[repr(C)]
158161
#[derive(Debug, Copy, Clone)]
159162
pub struct line_sender_utf8 {
@@ -259,7 +262,7 @@ unsafe fn unwrap_utf8(
259262
///
260263
/// @param[out] str The object to be initialized.
261264
/// @param[in] len Length in bytes of the buffer.
262-
/// @param[in] buf UTF-8 encoded buffer.
265+
/// @param[in] buf UTF-8 encoded buffer. Need not be null-terminated.
263266
/// @param[out] err_out Set on error.
264267
/// @return true on success, false on error.
265268
#[no_mangle]
@@ -298,6 +301,7 @@ pub unsafe extern "C" fn line_sender_utf8_assert(
298301
}
299302

300303
/// Non-owning validated table name. UTF-8 encoded.
304+
/// Need not be null-terminated.
301305
#[repr(C)]
302306
#[derive(Debug, Copy, Clone)]
303307
pub struct line_sender_table_name
@@ -317,6 +321,7 @@ impl line_sender_table_name {
317321
}
318322

319323
/// Non-owning validated symbol or column name. UTF-8 encoded.
324+
/// Need not be null-terminated.
320325
#[repr(C)]
321326
#[derive(Debug, Copy, Clone)]
322327
pub struct line_sender_column_name
@@ -340,7 +345,7 @@ impl line_sender_column_name {
340345
///
341346
/// @param[out] name The object to be initialized.
342347
/// @param[in] len Length in bytes of the buffer.
343-
/// @param[in] buf UTF-8 encoded buffer.
348+
/// @param[in] buf UTF-8 encoded buffer. Need not be null-terminated.
344349
/// @param[out] err_out Set on error.
345350
/// @return true on success, false on error.
346351
#[no_mangle]
@@ -387,7 +392,7 @@ pub unsafe extern "C" fn line_sender_table_name_assert(
387392
///
388393
/// @param[out] name The object to be initialized.
389394
/// @param[in] len Length in bytes of the buffer.
390-
/// @param[in] buf UTF-8 encoded buffer.
395+
/// @param[in] buf UTF-8 encoded buffer. Need not be null-terminated.
391396
/// @param[out] err_out Set on error.
392397
/// @return true on success, false on error.
393398
#[no_mangle]
@@ -634,6 +639,42 @@ pub unsafe extern "C" fn line_sender_buffer_capacity(
634639
unwrap_buffer(buffer).capacity()
635640
}
636641

642+
/// Mark a rewind point.
643+
/// This allows undoing accumulated changes to the buffer for one or more
644+
/// rows by calling `rewind_to_marker`.
645+
/// Any previous marker will be discarded.
646+
/// Once the marker is no longer needed, call `clear_marker`.
647+
#[no_mangle]
648+
pub unsafe extern "C" fn line_sender_buffer_set_marker(
649+
buffer: *mut line_sender_buffer,
650+
err_out: *mut *mut line_sender_error) -> bool
651+
{
652+
let buffer = unwrap_buffer_mut(buffer);
653+
bubble_err_to_c!(err_out, buffer.set_marker());
654+
true
655+
}
656+
657+
/// Undo all changes since the last `set_marker` call.
658+
/// As a side-effect, this also clears the marker.
659+
#[no_mangle]
660+
pub unsafe extern "C" fn line_sender_buffer_rewind_to_marker(
661+
buffer: *mut line_sender_buffer,
662+
err_out: *mut *mut line_sender_error) -> bool
663+
{
664+
let buffer = unwrap_buffer_mut(buffer);
665+
bubble_err_to_c!(err_out, buffer.rewind_to_marker());
666+
true
667+
}
668+
669+
/// Discard the marker.
670+
#[no_mangle]
671+
pub unsafe extern "C" fn line_sender_buffer_clear_marker(
672+
buffer: *mut line_sender_buffer)
673+
{
674+
let buffer = unwrap_buffer_mut(buffer);
675+
buffer.clear_marker();
676+
}
677+
637678
/// Remove all accumulated data and prepare the buffer for new lines.
638679
/// This does not affect the buffer's capacity.
639680
#[no_mangle]

0 commit comments

Comments
 (0)