Skip to content

Commit ea03516

Browse files
committed
ensure proper ownership in conversion
1 parent c27764a commit ea03516

File tree

1 file changed

+45
-36
lines changed

1 file changed

+45
-36
lines changed

r/src/convert_array_stream.c

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,46 @@
2626
#include "convert.h"
2727
#include "schema.h"
2828

29+
static int convert_next(SEXP converter_xptr, struct ArrowArrayStream* stream,
30+
SEXP schema_xptr, int64_t* n_batches) {
31+
SEXP array_xptr = PROTECT(nanoarrow_array_owning_xptr());
32+
struct ArrowArray* array = nanoarrow_output_array_from_xptr(array_xptr);
33+
34+
// Fetch the next array
35+
int result = ArrowArrayStreamGetNext(stream, array, NULL);
36+
if (result != NANOARROW_OK) {
37+
Rf_error("ArrowArrayStream::get_next(): %s", ArrowArrayStreamGetLastError(stream));
38+
}
39+
40+
// Check if the stream is finished
41+
if (array->release == NULL) {
42+
UNPROTECT(1);
43+
return 0;
44+
}
45+
46+
// Bump the batch counter
47+
(*n_batches)++;
48+
49+
// Set the schema of the allocated array and pass it to the converter
50+
R_SetExternalPtrTag(array_xptr, schema_xptr);
51+
if (nanoarrow_converter_set_array(converter_xptr, array_xptr) != NANOARROW_OK) {
52+
nanoarrow_converter_stop(converter_xptr);
53+
}
54+
55+
// After set_array, the converter is responsible for the array_xptr
56+
UNPROTECT(1);
57+
58+
// Materialize the array into the converter
59+
int64_t n_materialized =
60+
nanoarrow_converter_materialize_n(converter_xptr, array->length);
61+
if (n_materialized != array->length) {
62+
Rf_error("Expected to materialize %ld values in batch %ld but materialized %ld",
63+
(long)array->length, (long)(*n_batches), (long)n_materialized);
64+
}
65+
66+
return 1;
67+
}
68+
2969
SEXP nanoarrow_c_convert_array_stream(SEXP array_stream_xptr, SEXP ptype_sexp,
3070
SEXP size_sexp, SEXP n_sexp) {
3171
struct ArrowArrayStream* array_stream =
@@ -58,49 +98,18 @@ SEXP nanoarrow_c_convert_array_stream(SEXP array_stream_xptr, SEXP ptype_sexp,
5898
nanoarrow_converter_stop(converter_xptr);
5999
}
60100

61-
SEXP array_xptr = PROTECT(nanoarrow_array_owning_xptr());
62-
struct ArrowArray* array = nanoarrow_output_array_from_xptr(array_xptr);
63-
64101
int64_t n_batches = 0;
65-
int64_t n_materialized = 0;
66-
if (n > 0) {
67-
result = ArrowArrayStreamGetNext(array_stream, array, NULL);
68-
n_batches++;
69-
if (result != NANOARROW_OK) {
70-
Rf_error("ArrowArrayStream::get_next(): %s",
71-
ArrowArrayStreamGetLastError(array_stream));
102+
do {
103+
if (n_batches >= n) {
104+
break;
72105
}
73-
74-
while (array->release != NULL) {
75-
if (nanoarrow_converter_set_array(converter_xptr, array_xptr) != NANOARROW_OK) {
76-
nanoarrow_converter_stop(converter_xptr);
77-
}
78-
79-
n_materialized = nanoarrow_converter_materialize_n(converter_xptr, array->length);
80-
if (n_materialized != array->length) {
81-
Rf_error("Expected to materialize %ld values in batch %ld but materialized %ld",
82-
(long)array->length, (long)n_batches, (long)n_materialized);
83-
}
84-
85-
if (n_batches >= n) {
86-
break;
87-
}
88-
89-
array->release(array);
90-
result = ArrowArrayStreamGetNext(array_stream, array, NULL);
91-
n_batches++;
92-
if (result != NANOARROW_OK) {
93-
Rf_error("ArrowArrayStream::get_next(): %s",
94-
ArrowArrayStreamGetLastError(array_stream));
95-
}
96-
}
97-
}
106+
} while (convert_next(converter_xptr, array_stream, schema_xptr, &n_batches));
98107

99108
if (nanoarrow_converter_finalize(converter_xptr) != NANOARROW_OK) {
100109
nanoarrow_converter_stop(converter_xptr);
101110
}
102111

103112
SEXP result_sexp = PROTECT(nanoarrow_converter_release_result(converter_xptr));
104-
UNPROTECT(4);
113+
UNPROTECT(3);
105114
return result_sexp;
106115
}

0 commit comments

Comments
 (0)