Skip to content

Commit bf5bce3

Browse files
committed
box: release array and schema in box_insert_arrow
According to the documentation to the Apache Arrow C data interface: > Consumers MUST call a base structure’s release callback when they > won’t be using it anymore[^1] `box_insert_arrow()` is clearly a consumer so it must move or release the input Arrow array and schema to conform to the specification while currently it does not. This patch fixes this issue. Note that for performance reasons (to avoid extra copying), the engine callback that implements batch insertion should be allowed to move the input Arrow array and schema to a private location. To allow for that, we have to move the code that encodes the input Arrow in the IPC format for xlog from `txn_add_redo()` to `space_execute_insert_arrow()`. Closes tarantool#11338 [^1]: https://arrow.apache.org/docs/format/CDataInterface.html#release-callback-semantics-for-consumers NO_DOC=bug fix NO_CHANGELOG=batch insertion isn't implemented in CE
1 parent ac36c76 commit bf5bce3

File tree

8 files changed

+52
-33
lines changed

8 files changed

+52
-33
lines changed

perf/lua/column_insert_module.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,6 @@ insert_batch_lua_func(struct lua_State *L)
276276
batch_row_count, sparse_mode);
277277
if (box_insert_arrow(space_id, &array, &schema) != 0)
278278
return luaT_error(L);
279-
schema.release(&schema);
280-
array.release(&array);
281279
}
282280
return 0;
283281
}

src/box/box.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4134,7 +4134,12 @@ box_insert_arrow(uint32_t space_id, struct ArrowArray *array,
41344134
request.space_id = space_id;
41354135
request.arrow_array = array;
41364136
request.arrow_schema = schema;
4137-
return box_process1(&request, NULL);
4137+
int rc = box_process1(&request, NULL);
4138+
if (array->release != NULL)
4139+
array->release(array);
4140+
if (schema->release != NULL)
4141+
schema->release(schema);
4142+
return rc;
41384143
}
41394144

41404145
/**

src/box/box.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ struct ArrowSchema;
683683
* If a column is nullable in space format, it can be omitted. All non-nullable
684684
* columns (including primary key parts) must be present in the batch.
685685
*
686-
* This function does not release neither `array` nor `schema`.
686+
* Both `array` and `schema` are moved or released, even on failure.
687687
*
688688
* \param space_id space identifier
689689
* \param array input data in ArrowArray format

src/box/lua/index.c

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -437,12 +437,7 @@ lbox_insert_arrow(lua_State *L)
437437
struct ArrowSchema schema;
438438
if (arrow_ipc_decode(&array, &schema, data, data + len) != 0)
439439
return luaT_error(L);
440-
int rc = box_insert_arrow(space_id, &array, &schema);
441-
if (schema.release != NULL)
442-
schema.release(&schema);
443-
if (array.release != NULL)
444-
array.release(&array);
445-
if (rc != 0)
440+
if (box_insert_arrow(space_id, &array, &schema) != 0)
446441
return luaT_error(L);
447442
return 0;
448443
}

src/box/space.c

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,16 +1363,26 @@ space_execute_insert_arrow(struct space *space, struct txn *txn,
13631363
int rc;
13641364
struct region *gc = &fiber()->gc;
13651365
size_t gc_svp = region_used(gc);
1366-
bool do_ipc_decode = request->arrow_ipc != NULL;
13671366
struct ArrowArray *array = request->arrow_array;
13681367
struct ArrowSchema *schema = request->arrow_schema;
13691368

1370-
if (do_ipc_decode) {
1369+
if (request->arrow_ipc == NULL) {
1370+
assert(array != NULL);
1371+
assert(schema != NULL);
1372+
assert(request->arrow_ipc_end == NULL);
1373+
struct region *txn_region = tx_region_acquire(txn);
1374+
rc = arrow_ipc_encode(array, schema, txn_region,
1375+
&request->arrow_ipc,
1376+
&request->arrow_ipc_end);
1377+
tx_region_release(txn, TX_ALLOC_SYSTEM);
1378+
if (rc != 0)
1379+
goto eof;
1380+
} else {
13711381
assert(array == NULL);
13721382
assert(schema == NULL);
13731383
assert(request->arrow_ipc_end != NULL);
1374-
array = xregion_alloc_object(&fiber()->gc, struct ArrowArray);
1375-
schema = xregion_alloc_object(&fiber()->gc, struct ArrowSchema);
1384+
array = xregion_alloc_object(gc, struct ArrowArray);
1385+
schema = xregion_alloc_object(gc, struct ArrowSchema);
13761386
rc = arrow_ipc_decode(array, schema, request->arrow_ipc,
13771387
request->arrow_ipc_end);
13781388
if (rc != 0)
@@ -1381,12 +1391,10 @@ space_execute_insert_arrow(struct space *space, struct txn *txn,
13811391

13821392
rc = space->vtab->execute_insert_arrow(space, txn, array, schema);
13831393

1384-
if (do_ipc_decode) {
1385-
assert(array->release != NULL);
1386-
assert(schema->release != NULL);
1394+
if (array->release != NULL)
13871395
array->release(array);
1396+
if (schema->release != NULL)
13881397
schema->release(schema);
1389-
}
13901398
eof:
13911399
region_truncate(gc, gc_svp);
13921400
return rc;

src/box/space.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ struct space_vtab {
7171
int (*execute_update)(struct space *, struct txn *,
7272
struct request *, struct tuple **result);
7373
int (*execute_upsert)(struct space *, struct txn *, struct request *);
74+
/**
75+
* Executes a batch insert request.
76+
*
77+
* The implementation may move or release the input array and schema.
78+
*/
7479
int (*execute_insert_arrow)(struct space *space, struct txn *txn,
7580
struct ArrowArray *array,
7681
struct ArrowSchema *schema);

src/box/txn.c

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
#include "session.h"
4343
#include "wal_ext.h"
4444
#include "rmean.h"
45-
#include "arrow_ipc.h"
4645

4746
double too_long_threshold;
4847

@@ -280,21 +279,8 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
280279
if (space != NULL && space->wal_ext != NULL)
281280
space_wal_ext_process_request(space->wal_ext, stmt, request);
282281
struct region *txn_region = tx_region_acquire(txn);
283-
if (request->arrow_array != NULL) {
284-
assert(request->arrow_schema != NULL);
285-
assert(request->arrow_ipc == NULL);
286-
assert(request->arrow_ipc_end == NULL);
287-
if (arrow_ipc_encode(
288-
request->arrow_array, request->arrow_schema,
289-
txn_region, &request->arrow_ipc,
290-
&request->arrow_ipc_end) != 0) {
291-
tx_region_release(txn, TX_ALLOC_SYSTEM);
292-
return -1;
293-
}
294-
}
295282
xrow_encode_dml(request, txn_region, row->body, &row->bodycnt);
296283
tx_region_release(txn, TX_ALLOC_SYSTEM);
297-
txn_region = NULL;
298284
stmt->row = row;
299285
return 0;
300286
}

test/app-tap/module_api.c

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3364,19 +3364,41 @@ test_fiber_basic_api(lua_State *L)
33643364
return 1;
33653365
}
33663366

3367+
static void
3368+
arrow_schema_destroy(struct ArrowSchema *schema)
3369+
{
3370+
schema->release = NULL;
3371+
}
3372+
3373+
static void
3374+
arrow_array_destroy(struct ArrowArray *array)
3375+
{
3376+
array->release = NULL;
3377+
}
3378+
33673379
static int
33683380
test_box_insert_arrow(struct lua_State *L)
33693381
{
33703382
fail_unless(lua_gettop(L) == 1);
33713383
fail_unless(lua_isnumber(L, 1));
33723384
uint32_t space_id = lua_tointeger(L, 1);
3385+
33733386
struct ArrowSchema schema;
3374-
struct ArrowArray array;
33753387
memset(&schema, 0, sizeof(schema));
3388+
schema.format = "+s";
3389+
schema.release = arrow_schema_destroy;
3390+
3391+
struct ArrowArray array;
33763392
memset(&array, 0, sizeof(array));
3393+
array.n_buffers = 1;
3394+
const void *buffers[1] = {NULL};
3395+
array.buffers = buffers;
3396+
array.release = arrow_array_destroy;
33773397

33783398
int rc = box_insert_arrow(space_id, &array, &schema);
33793399
fail_unless(rc == -1);
3400+
fail_unless(array.release == NULL);
3401+
fail_unless(schema.release == NULL);
33803402
check_diag("ClientError", "memtx does not support arrow format");
33813403
lua_pushboolean(L, 1);
33823404
return 1;

0 commit comments

Comments
 (0)