@@ -1150,11 +1150,38 @@ void ColumnWriterImpl::FlushBufferedDataPages() {
11501150// ----------------------------------------------------------------------
11511151// TypedColumnWriter
11521152
1153+ // DoInBatches for non-repeated columns
11531154template <typename Action, typename GetBufferedRows>
1154- inline void DoInBatches (const int16_t * def_levels, const int16_t * rep_levels,
1155- int64_t num_levels, int64_t batch_size, int64_t max_rows_per_page,
1156- bool pages_change_on_record_boundaries, Action&& action,
1157- GetBufferedRows&& curr_page_buffered_rows) {
1155+ inline void DoInBatchesNonRepeated (int64_t num_levels, int64_t batch_size,
1156+ int64_t max_rows_per_page, Action&& action,
1157+ GetBufferedRows&& curr_page_buffered_rows) {
1158+ int64_t offset = 0 ;
1159+ while (offset < num_levels) {
1160+ int64_t page_buffered_rows = curr_page_buffered_rows ();
1161+ ARROW_DCHECK_LE (page_buffered_rows, max_rows_per_page);
1162+
1163+ // Every record contains only one level.
1164+ int64_t max_batch_size = std::min (batch_size, num_levels - offset);
1165+ max_batch_size = std::min (max_batch_size, max_rows_per_page - page_buffered_rows);
1166+ int64_t end_offset = offset + max_batch_size;
1167+
1168+ ARROW_DCHECK_LE (offset, end_offset);
1169+ ARROW_DCHECK_LE (end_offset, num_levels);
1170+
1171+ // Always check page limit for non-repeated columns.
1172+ action (offset, end_offset - offset, /* check_page_limit=*/ true );
1173+
1174+ offset = end_offset;
1175+ }
1176+ }
1177+
1178+ // DoInBatches for repeated columns
1179+ template <typename Action, typename GetBufferedRows>
1180+ inline void DoInBatchesRepeated (const int16_t * def_levels, const int16_t * rep_levels,
1181+ int64_t num_levels, int64_t batch_size,
1182+ int64_t max_rows_per_page,
1183+ bool pages_change_on_record_boundaries, Action&& action,
1184+ GetBufferedRows&& curr_page_buffered_rows) {
11581185 int64_t offset = 0 ;
11591186 while (offset < num_levels) {
11601187 int64_t max_batch_size = std::min (batch_size, num_levels - offset);
@@ -1164,25 +1191,17 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
11641191 int64_t page_buffered_rows = curr_page_buffered_rows ();
11651192 ARROW_DCHECK_LE (page_buffered_rows, max_rows_per_page);
11661193
1167- if (!rep_levels) {
1168- // If rep_levels is null, then we are writing a non-repeated column.
1169- // In this case, every record contains only one level.
1170- max_batch_size = std::min (max_batch_size, max_rows_per_page - page_buffered_rows);
1171- end_offset = offset + max_batch_size;
1172- check_page_limit_end_offset = end_offset;
1173- } else {
1174- // Iterate rep_levels to find the shortest sequence that ends before a record
1175- // boundary (i.e. rep_levels == 0) with a size no less than max_batch_size
1176- for (int64_t i = offset; i < num_levels; ++i) {
1177- if (rep_levels[i] == 0 ) {
1178- // Use the beginning of last record to check page limit.
1179- check_page_limit_end_offset = i;
1180- if (i - offset >= max_batch_size || page_buffered_rows >= max_rows_per_page) {
1181- end_offset = i;
1182- break ;
1183- }
1184- page_buffered_rows += 1 ;
1194+ // Iterate rep_levels to find the shortest sequence that ends before a record
1195+ // boundary (i.e. rep_levels == 0) with a size no less than max_batch_size
1196+ for (int64_t i = offset; i < num_levels; ++i) {
1197+ if (rep_levels[i] == 0 ) {
1198+ // Use the beginning of last record to check page limit.
1199+ check_page_limit_end_offset = i;
1200+ if (i - offset >= max_batch_size || page_buffered_rows >= max_rows_per_page) {
1201+ end_offset = i;
1202+ break ;
11851203 }
1204+ page_buffered_rows += 1 ;
11861205 }
11871206 }
11881207
@@ -1212,6 +1231,22 @@ inline void DoInBatches(const int16_t* def_levels, const int16_t* rep_levels,
12121231 }
12131232}
12141233
1234+ template <typename Action, typename GetBufferedRows>
1235+ inline void DoInBatches (const int16_t * def_levels, const int16_t * rep_levels,
1236+ int64_t num_levels, int64_t batch_size, int64_t max_rows_per_page,
1237+ bool pages_change_on_record_boundaries, Action&& action,
1238+ GetBufferedRows&& curr_page_buffered_rows) {
1239+ if (!rep_levels) {
1240+ DoInBatchesNonRepeated (num_levels, batch_size, max_rows_per_page,
1241+ std::forward<Action>(action),
1242+ std::forward<GetBufferedRows>(curr_page_buffered_rows));
1243+ } else {
1244+ DoInBatchesRepeated (def_levels, rep_levels, num_levels, batch_size, max_rows_per_page,
1245+ pages_change_on_record_boundaries, std::forward<Action>(action),
1246+ std::forward<GetBufferedRows>(curr_page_buffered_rows));
1247+ }
1248+ }
1249+
12151250namespace {
12161251
12171252bool DictionaryDirectWriteSupported (const ::arrow::Array& array) {
0 commit comments