1515// specific language governing permissions and limitations
1616// under the License.
1717
18+ use arrow:: array:: RecordBatch ;
19+ use arrow_schema:: SchemaRef ;
20+ use log:: { debug, error, warn} ;
21+ use parking_lot:: { Mutex , RwLock } ;
22+ use std:: collections:: { HashMap , HashSet } ;
23+ use std:: slice:: from_ref;
24+ use std:: sync:: Arc ;
25+ use std:: sync:: atomic:: { AtomicU8 , Ordering } ;
26+ use std:: time:: Duration ;
27+ use tempfile:: TempDir ;
28+
1829use crate :: client:: connection:: FlussConnection ;
1930use crate :: client:: credentials:: CredentialsCache ;
2031use crate :: client:: metadata:: Metadata ;
@@ -30,14 +41,6 @@ use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTabl
3041use crate :: record:: { LogRecordsBatches , ReadContext , ScanRecord , ScanRecords , to_arrow_schema} ;
3142use crate :: rpc:: { RpcClient , message} ;
3243use crate :: util:: FairBucketStatusMap ;
33- use arrow_schema:: SchemaRef ;
34- use log:: { debug, error, warn} ;
35- use parking_lot:: { Mutex , RwLock } ;
36- use std:: collections:: { HashMap , HashSet } ;
37- use std:: slice:: from_ref;
38- use std:: sync:: Arc ;
39- use std:: time:: Duration ;
40- use tempfile:: TempDir ;
4144
4245const LOG_FETCH_MAX_BYTES : i32 = 16 * 1024 * 1024 ;
4346#[ allow( dead_code) ]
@@ -143,12 +146,20 @@ impl<'a> TableScan<'a> {
143146 }
144147}
145148
149+ /// Poll mode for LogScanner - ensures poll() and poll_batches() are not mixed
150+ const POLL_MODE_UNSET : u8 = 0 ;
151+ const POLL_MODE_RECORDS : u8 = 1 ;
152+ const POLL_MODE_BATCHES : u8 = 2 ;
153+
146154pub struct LogScanner {
147155 table_path : TablePath ,
148156 table_id : i64 ,
149157 metadata : Arc < Metadata > ,
150158 log_scanner_status : Arc < LogScannerStatus > ,
151159 log_fetcher : LogFetcher ,
160+ /// Tracks whether scanner is in records mode (poll) or batches mode (poll_batches).
161+ /// Once set, cannot be changed to prevent data loss from mixing polling methods.
162+ poll_mode : AtomicU8 ,
152163}
153164
154165impl LogScanner {
@@ -171,10 +182,32 @@ impl LogScanner {
171182 log_scanner_status. clone ( ) ,
172183 projected_fields,
173184 ) ?,
185+ poll_mode : AtomicU8 :: new ( POLL_MODE_UNSET ) ,
174186 } )
175187 }
176188
177189 pub async fn poll ( & self , timeout : Duration ) -> Result < ScanRecords > {
190+ // Check and set poll mode to prevent mixing with poll_batches
191+ match self . poll_mode . compare_exchange (
192+ POLL_MODE_UNSET ,
193+ POLL_MODE_RECORDS ,
194+ Ordering :: AcqRel ,
195+ Ordering :: Acquire ,
196+ ) {
197+ Ok ( _) => { /* First call, set to records mode */ }
198+ Err ( POLL_MODE_RECORDS ) => { /* Already in records mode, ok */ }
199+ Err ( POLL_MODE_BATCHES ) => {
200+ return Err ( Error :: IllegalState {
201+ message : "Cannot call poll() after poll_batches(). Mixing polling methods causes data loss. Create a new scanner to switch methods." . to_string ( ) ,
202+ } ) ;
203+ }
204+ Err ( invalid) => {
205+ return Err ( Error :: IllegalState {
206+ message : format ! ( "Invalid poll mode state: {}" , invalid) ,
207+ } ) ;
208+ }
209+ }
210+
178211 let start = std:: time:: Instant :: now ( ) ;
179212 let deadline = start + timeout;
180213
@@ -257,6 +290,77 @@ impl LogScanner {
257290 // Collect completed fetches from buffer
258291 self . log_fetcher . collect_fetches ( )
259292 }
293+
294+ /// Poll for Arrow RecordBatches directly.
295+ ///
296+ /// More efficient than `poll()` when you need batch-level access for analytics
297+ /// - This method does not expose per-record offsets or timestamps.
298+ /// Use `poll()` if you need that metadata.
299+ ///
300+ /// - **Do not mix `poll()` and `poll_batches()` on the same scanner.**
301+ /// Calling `poll_batches()` after `poll()` (or vice versa) will return an error
302+ /// to prevent data loss. Create a new scanner if you need to switch methods.
303+ /// ```
304+ pub async fn poll_batches ( & self , timeout : Duration ) -> Result < Vec < RecordBatch > > {
305+ // Check and set poll mode to prevent mixing with poll
306+ match self . poll_mode . compare_exchange (
307+ POLL_MODE_UNSET ,
308+ POLL_MODE_BATCHES ,
309+ Ordering :: AcqRel ,
310+ Ordering :: Acquire ,
311+ ) {
312+ Ok ( _) => { /* First call, set to batches mode */ }
313+ Err ( POLL_MODE_BATCHES ) => { /* Already in batches mode, ok */ }
314+ Err ( POLL_MODE_RECORDS ) => {
315+ return Err ( Error :: IllegalState {
316+ message : "Cannot call poll_batches() after poll(). Mixing polling methods causes data loss. Create a new scanner to switch methods." . to_string ( ) ,
317+ } ) ;
318+ }
319+ Err ( invalid) => {
320+ return Err ( Error :: IllegalState {
321+ message : format ! ( "Invalid poll mode state: {}" , invalid) ,
322+ } ) ;
323+ }
324+ }
325+
326+ let start = std:: time:: Instant :: now ( ) ;
327+ let deadline = start + timeout;
328+
329+ loop {
330+ let batches = self . poll_for_batches ( ) . await ?;
331+
332+ if !batches. is_empty ( ) {
333+ self . log_fetcher . send_fetches ( ) . await ?;
334+ return Ok ( batches) ;
335+ }
336+
337+ let now = std:: time:: Instant :: now ( ) ;
338+ if now >= deadline {
339+ return Ok ( Vec :: new ( ) ) ;
340+ }
341+
342+ let remaining = deadline - now;
343+ let has_data = self
344+ . log_fetcher
345+ . log_fetch_buffer
346+ . await_not_empty ( remaining)
347+ . await ;
348+
349+ if !has_data {
350+ return Ok ( Vec :: new ( ) ) ;
351+ }
352+ }
353+ }
354+
355+ async fn poll_for_batches ( & self ) -> Result < Vec < RecordBatch > > {
356+ let result = self . log_fetcher . collect_batches ( ) ?;
357+ if !result. is_empty ( ) {
358+ return Ok ( result) ;
359+ }
360+
361+ self . log_fetcher . send_fetches ( ) . await ?;
362+ self . log_fetcher . collect_batches ( )
363+ }
260364}
261365
262366struct LogFetcher {
@@ -719,6 +823,134 @@ impl LogFetcher {
719823 }
720824 }
721825
826+ /// Collect completed fetches as RecordBatches
827+ fn collect_batches ( & self ) -> Result < Vec < RecordBatch > > {
828+ // Limit memory usage with both batch count and byte size constraints.
829+ // Max 100 batches per poll, but also check total bytes (soft cap ~64MB).
830+ const MAX_BATCHES : usize = 100 ;
831+ const MAX_BYTES : usize = 64 * 1024 * 1024 ; // 64MB soft cap
832+ let mut result: Vec < RecordBatch > = Vec :: new ( ) ;
833+ let mut batches_remaining = MAX_BATCHES ;
834+ let mut bytes_consumed: usize = 0 ;
835+
836+ while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
837+ let next_in_line = self . log_fetch_buffer . next_in_line_fetch ( ) ;
838+
839+ match next_in_line {
840+ None => {
841+ if let Some ( completed_fetch) = self . log_fetch_buffer . poll ( ) {
842+ if !completed_fetch. is_initialized ( ) {
843+ let size_in_bytes = completed_fetch. size_in_bytes ( ) ;
844+ match self . initialize_fetch ( completed_fetch) {
845+ Ok ( initialized) => {
846+ self . log_fetch_buffer . set_next_in_line_fetch ( initialized) ;
847+ continue ;
848+ }
849+ Err ( e) => {
850+ if result. is_empty ( ) && size_in_bytes == 0 {
851+ continue ;
852+ }
853+ return Err ( e) ;
854+ }
855+ }
856+ } else {
857+ self . log_fetch_buffer
858+ . set_next_in_line_fetch ( Some ( completed_fetch) ) ;
859+ }
860+ } else {
861+ break ;
862+ }
863+ }
864+ Some ( ref f) if f. is_consumed ( ) => {
865+ if let Some ( completed_fetch) = self . log_fetch_buffer . poll ( ) {
866+ if !completed_fetch. is_initialized ( ) {
867+ let size_in_bytes = completed_fetch. size_in_bytes ( ) ;
868+ match self . initialize_fetch ( completed_fetch) {
869+ Ok ( initialized) => {
870+ self . log_fetch_buffer . set_next_in_line_fetch ( initialized) ;
871+ continue ;
872+ }
873+ Err ( e) => {
874+ if result. is_empty ( ) && size_in_bytes == 0 {
875+ continue ;
876+ }
877+ return Err ( e) ;
878+ }
879+ }
880+ } else {
881+ self . log_fetch_buffer
882+ . set_next_in_line_fetch ( Some ( completed_fetch) ) ;
883+ }
884+ } else {
885+ break ;
886+ }
887+ }
888+ Some ( mut next_fetch) => {
889+ let batches =
890+ self . fetch_batches_from_fetch ( & mut next_fetch, batches_remaining) ?;
891+ let batch_count = batches. len ( ) ;
892+
893+ if !batches. is_empty ( ) {
894+ // Track bytes consumed (soft cap - may exceed by one fetch)
895+ let batch_bytes: usize =
896+ batches. iter ( ) . map ( |b| b. get_array_memory_size ( ) ) . sum ( ) ;
897+ bytes_consumed += batch_bytes;
898+
899+ result. extend ( batches) ;
900+ batches_remaining = batches_remaining. saturating_sub ( batch_count) ;
901+ }
902+
903+ if !next_fetch. is_consumed ( ) {
904+ self . log_fetch_buffer
905+ . set_next_in_line_fetch ( Some ( next_fetch) ) ;
906+ }
907+ }
908+ }
909+ }
910+
911+ Ok ( result)
912+ }
913+
914+ fn fetch_batches_from_fetch (
915+ & self ,
916+ next_in_line_fetch : & mut Box < dyn CompletedFetch > ,
917+ max_batches : usize ,
918+ ) -> Result < Vec < RecordBatch > > {
919+ let table_bucket = next_in_line_fetch. table_bucket ( ) . clone ( ) ;
920+ let current_offset = self . log_scanner_status . get_bucket_offset ( & table_bucket) ;
921+
922+ if current_offset. is_none ( ) {
923+ warn ! (
924+ "Ignoring fetched batches for {:?} since bucket was unsubscribed" ,
925+ table_bucket
926+ ) ;
927+ next_in_line_fetch. drain ( ) ;
928+ return Ok ( Vec :: new ( ) ) ;
929+ }
930+
931+ let current_offset = current_offset. unwrap ( ) ;
932+ let fetch_offset = next_in_line_fetch. next_fetch_offset ( ) ;
933+
934+ if fetch_offset == current_offset {
935+ let batches = next_in_line_fetch. fetch_batches ( max_batches) ?;
936+ let next_fetch_offset = next_in_line_fetch. next_fetch_offset ( ) ;
937+
938+ if next_fetch_offset > current_offset {
939+ self . log_scanner_status
940+ . update_offset ( & table_bucket, next_fetch_offset) ;
941+ }
942+
943+ Ok ( batches)
944+ } else {
945+ warn ! (
946+ "Ignoring fetched batches for {:?} at offset {}, expected {}" ,
947+ table_bucket, fetch_offset, current_offset
948+ ) ;
949+ next_in_line_fetch. drain ( ) ;
950+ Ok ( Vec :: new ( ) )
951+ }
952+ }
953+
722954 async fn prepare_fetch_log_requests ( & self ) -> HashMap < i32 , FetchLogRequest > {
723955 let mut fetch_log_req_for_buckets = HashMap :: new ( ) ;
724956 let mut table_id = None ;
0 commit comments