Skip to content

Commit 6b4d299

Browse files
[Arrow] Refactor ack logic to handle record batch autochunking. (#85)
* [Arrow] Refactor ack logic to handle record batch autochunking. * Update next changelog.
1 parent 7c3e36d commit 6b4d299

File tree

5 files changed

+412
-114
lines changed

5 files changed

+412
-114
lines changed

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
### Bug Fixes
1919

20+
- **[Experimental] Record-based acknowledgment tracking for Arrow Flight streams**: Added cumulative record counting to support proper ack tracking and correct recovery when batches are auto-chunked.
2021

2122
### Documentation
2223

sdk/src/arrow_metadata.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,18 @@ pub struct FlightAckMetadata {
6666
/// (schema validation, table access) but no batches acked yet.
6767
/// The SDK waits for this signal during stream creation.
6868
pub ack_up_to_offset: OffsetId,
69+
/// Cumulative count of records durably stored up to this acknowledgment.
70+
pub ack_up_to_records: u64,
6971
}
7072

7173
impl FlightAckMetadata {
7274
/// Create new acknowledgement metadata.
7375
#[allow(dead_code)]
74-
pub fn new(ack_up_to_offset: OffsetId) -> Self {
75-
Self { ack_up_to_offset }
76+
pub fn new(ack_up_to_offset: OffsetId, ack_up_to_records: u64) -> Self {
77+
Self {
78+
ack_up_to_offset,
79+
ack_up_to_records,
80+
}
7681
}
7782

7883
/// Create a "stream ready" signal indicating successful stream setup.
@@ -81,6 +86,7 @@ impl FlightAckMetadata {
8186
pub fn stream_ready() -> Self {
8287
Self {
8388
ack_up_to_offset: STREAM_READY_OFFSET,
89+
ack_up_to_records: 0,
8490
}
8591
}
8692

@@ -129,10 +135,11 @@ mod tests {
129135
}
130136

131137
#[test]
132-
fn test_flight_ack_metadata_parse() {
133-
let json = r#"{"ack_up_to_offset": 99}"#;
138+
fn test_flight_ack_metadata_parse_with_records() {
139+
let json = r#"{"ack_up_to_offset": 99, "ack_up_to_records": 5000}"#;
134140
let parsed = FlightAckMetadata::from_bytes(json.as_bytes()).unwrap();
135141
assert_eq!(parsed.ack_up_to_offset, 99);
142+
assert_eq!(parsed.ack_up_to_records, 5000);
136143
}
137144

138145
#[test]

0 commit comments

Comments
 (0)