Skip to content

Commit db9f556

Browse files
authored
GH-41364: [GLib][Ruby] Allow passing thread pool to ExecutePlan (#48462)
### Rationale for this change Aggregators like `first` and `last` are unusable in Ruby because they don't work when the execution plan is executed using multiple threads. ### What changes are included in this PR? This adds the `ThreadPool` class to be able create a thread pool with a single thread. This can then be passed when creating an `ExecuteContext`, which in turn can be passed when creating an `ExecutePlan`. ### Are these changes tested? A Ruby test that shows that the `first` aggregator works. ### Are there any user-facing changes? A new `GArrowThreadPool` class, and changed signatures of the functions `garrow_execute_context_new` and `garrow_execute_plan_new`. However since the new arguments are nullable, it is backwards compatible for the Ruby API. **This PR includes breaking changes to public APIs.** * GitHub Issue: #41364 Authored-by: Sten Larsson <[email protected]> Signed-off-by: Sutou Kouhei <[email protected]>
1 parent d2ec684 commit db9f556

File tree

9 files changed

+378
-15
lines changed

9 files changed

+378
-15
lines changed

c_glib/arrow-glib/arrow-glib.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <arrow-glib/decoder.h>
3434
#include <arrow-glib/enums.h>
3535
#include <arrow-glib/error.h>
36+
#include <arrow-glib/executor.h>
3637
#include <arrow-glib/expression.h>
3738
#include <arrow-glib/field.h>
3839
#include <arrow-glib/interval.h>

c_glib/arrow-glib/compute.cpp

Lines changed: 99 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <arrow-glib/datum.hpp>
2727
#include <arrow-glib/enums.h>
2828
#include <arrow-glib/error.hpp>
29+
#include <arrow-glib/executor.hpp>
2930
#include <arrow-glib/expression.hpp>
3031
#include <arrow-glib/reader.hpp>
3132
#include <arrow-glib/record-batch.hpp>
@@ -294,52 +295,132 @@ garrow_compute_initialize(GError **error)
294295
return garrow::check(error, status, "[compute][initialize]");
295296
}
296297

297-
typedef struct GArrowExecuteContextPrivate_
298+
struct GArrowExecuteContextPrivate
298299
{
299-
arrow::compute::ExecContext context;
300-
} GArrowExecuteContextPrivate;
300+
std::shared_ptr<arrow::compute::ExecContext> context;
301+
GArrowExecutor *executor;
302+
};
301303

302304
G_DEFINE_TYPE_WITH_PRIVATE(GArrowExecuteContext, garrow_execute_context, G_TYPE_OBJECT)
303305

304306
#define GARROW_EXECUTE_CONTEXT_GET_PRIVATE(object) \
305307
static_cast<GArrowExecuteContextPrivate *>( \
306308
garrow_execute_context_get_instance_private(GARROW_EXECUTE_CONTEXT(object)))
307309

310+
enum {
311+
PROP_EXECUTOR = 1,
312+
};
313+
314+
static void
315+
garrow_execute_context_dispose(GObject *object)
316+
{
317+
auto priv = GARROW_EXECUTE_CONTEXT_GET_PRIVATE(object);
318+
319+
if (priv->executor) {
320+
g_object_unref(priv->executor);
321+
priv->executor = nullptr;
322+
}
323+
324+
G_OBJECT_CLASS(garrow_execute_context_parent_class)->dispose(object);
325+
}
326+
308327
static void
309328
garrow_execute_context_finalize(GObject *object)
310329
{
311330
auto priv = GARROW_EXECUTE_CONTEXT_GET_PRIVATE(object);
312-
priv->context.~ExecContext();
331+
priv->context.~shared_ptr();
313332
G_OBJECT_CLASS(garrow_execute_context_parent_class)->finalize(object);
314333
}
315334

335+
static void
336+
garrow_execute_context_set_property(GObject *object,
337+
guint prop_id,
338+
const GValue *value,
339+
GParamSpec *pspec)
340+
{
341+
auto priv = GARROW_EXECUTE_CONTEXT_GET_PRIVATE(object);
342+
343+
switch (prop_id) {
344+
case PROP_EXECUTOR:
345+
{
346+
priv->executor = GARROW_EXECUTOR(g_value_dup_object(value));
347+
auto arrow_executor = garrow_executor_get_raw(priv->executor);
348+
priv->context =
349+
std::make_shared<arrow::compute::ExecContext>(arrow::default_memory_pool(),
350+
arrow_executor.get());
351+
break;
352+
}
353+
default:
354+
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
355+
break;
356+
}
357+
}
358+
359+
static void
360+
garrow_execute_context_get_property(GObject *object,
361+
guint prop_id,
362+
GValue *value,
363+
GParamSpec *pspec)
364+
{
365+
auto priv = GARROW_EXECUTE_CONTEXT_GET_PRIVATE(object);
366+
367+
switch (prop_id) {
368+
case PROP_EXECUTOR:
369+
g_value_set_object(value, priv->executor);
370+
break;
371+
default:
372+
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
373+
break;
374+
}
375+
}
376+
316377
static void
317378
garrow_execute_context_init(GArrowExecuteContext *object)
318379
{
319380
auto priv = GARROW_EXECUTE_CONTEXT_GET_PRIVATE(object);
320-
new (&priv->context) arrow::compute::ExecContext(arrow::default_memory_pool(), nullptr);
381+
new (&priv->context) std::shared_ptr<arrow::compute::ExecContext>;
321382
}
322383

323384
static void
324385
garrow_execute_context_class_init(GArrowExecuteContextClass *klass)
325386
{
326387
auto gobject_class = G_OBJECT_CLASS(klass);
327388

389+
gobject_class->dispose = garrow_execute_context_dispose;
328390
gobject_class->finalize = garrow_execute_context_finalize;
391+
gobject_class->set_property = garrow_execute_context_set_property;
392+
gobject_class->get_property = garrow_execute_context_get_property;
393+
394+
GParamSpec *spec;
395+
/**
396+
* GArrowExecuteContext:executor:
397+
*
398+
* The executor for execution.
399+
*
400+
* Since: 23.0.0
401+
*/
402+
spec = g_param_spec_object(
403+
"executor",
404+
"Executor",
405+
"The executor for execution",
406+
GARROW_TYPE_EXECUTOR,
407+
static_cast<GParamFlags>(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
408+
g_object_class_install_property(gobject_class, PROP_EXECUTOR, spec);
329409
}
330410

331411
/**
332412
* garrow_execute_context_new:
413+
* @executor: (nullable): A #GArrowExecutor or %NULL.
333414
*
334415
* Returns: A newly created #GArrowExecuteContext.
335416
*
336417
* Since: 1.0.0
337418
*/
338419
GArrowExecuteContext *
339-
garrow_execute_context_new(void)
420+
garrow_execute_context_new(GArrowExecutor *executor)
340421
{
341-
auto execute_context = g_object_new(GARROW_TYPE_EXECUTE_CONTEXT, NULL);
342-
return GARROW_EXECUTE_CONTEXT(execute_context);
422+
return GARROW_EXECUTE_CONTEXT(
423+
g_object_new(GARROW_TYPE_EXECUTE_CONTEXT, "executor", executor, nullptr));
343424
}
344425

345426
typedef struct GArrowFunctionOptionsPrivate_
@@ -1898,6 +1979,7 @@ garrow_execute_plan_class_init(GArrowExecutePlanClass *klass)
18981979

18991980
/**
19001981
* garrow_execute_plan_new:
1982+
* @context: (nullable): A #GArrowExecuteContext or %NULL.
19011983
* @error: (nullable): Return location for a #GError or %NULL.
19021984
*
19031985
* Returns: (nullable): A newly created #GArrowExecutePlan on success,
@@ -1906,9 +1988,15 @@ garrow_execute_plan_class_init(GArrowExecutePlanClass *klass)
19061988
* Since: 6.0.0
19071989
*/
19081990
GArrowExecutePlan *
1909-
garrow_execute_plan_new(GError **error)
1991+
garrow_execute_plan_new(GArrowExecuteContext *context, GError **error)
19101992
{
1911-
auto arrow_plan_result = arrow::acero::ExecPlan::Make();
1993+
arrow::Result<std::shared_ptr<arrow::acero::ExecPlan>> arrow_plan_result;
1994+
if (context) {
1995+
auto arrow_context = garrow_execute_context_get_raw(context);
1996+
arrow_plan_result = arrow::acero::ExecPlan::Make(*arrow_context);
1997+
} else {
1998+
arrow_plan_result = arrow::acero::ExecPlan::Make();
1999+
}
19122000
if (garrow::check(error, arrow_plan_result, "[execute-plan][new]")) {
19132001
return GARROW_EXECUTE_PLAN(
19142002
g_object_new(GARROW_TYPE_EXECUTE_PLAN, "plan", &(*arrow_plan_result), NULL));
@@ -7207,7 +7295,7 @@ arrow::compute::ExecContext *
72077295
garrow_execute_context_get_raw(GArrowExecuteContext *context)
72087296
{
72097297
auto priv = GARROW_EXECUTE_CONTEXT_GET_PRIVATE(context);
7210-
return &priv->context;
7298+
return priv->context.get();
72117299
}
72127300

72137301
GArrowFunctionOptions *

c_glib/arrow-glib/compute.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <arrow-glib/compute-definition.h>
2323
#include <arrow-glib/datum.h>
24+
#include <arrow-glib/executor.h>
2425
#include <arrow-glib/reader.h>
2526

2627
G_BEGIN_DECLS
@@ -40,7 +41,7 @@ struct _GArrowExecuteContextClass
4041

4142
GARROW_AVAILABLE_IN_1_0
4243
GArrowExecuteContext *
43-
garrow_execute_context_new(void);
44+
garrow_execute_context_new(GArrowExecutor *executor);
4445

4546
GARROW_AVAILABLE_IN_7_0
4647
gboolean
@@ -322,7 +323,7 @@ struct _GArrowExecutePlanClass
322323

323324
GARROW_AVAILABLE_IN_6_0
324325
GArrowExecutePlan *
325-
garrow_execute_plan_new(GError **error);
326+
garrow_execute_plan_new(GArrowExecuteContext *context, GError **error);
326327
GARROW_AVAILABLE_IN_6_0
327328
GArrowExecuteNode *
328329
garrow_execute_plan_build_node(GArrowExecutePlan *plan,

c_glib/arrow-glib/executor.cpp

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <arrow-glib/error.hpp>
21+
#include <arrow-glib/executor.hpp>
22+
23+
#include <arrow/util/thread_pool.h>
24+
25+
G_BEGIN_DECLS
26+
27+
/**
28+
* SECTION: executor
29+
* @section_id: executor-classes
30+
* @title: Executor classes
31+
* @include: arrow-glib/arrow-glib.h
32+
*
33+
* #GArrowExecutor is the base class for executor implementations.
34+
*
35+
* #GArrowThreadPool is a class for thread pool management.
36+
*/
37+
38+
struct GArrowExecutorPrivate
39+
{
40+
std::shared_ptr<arrow::internal::Executor> executor;
41+
};
42+
43+
G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE(GArrowExecutor, garrow_executor, G_TYPE_OBJECT)
44+
45+
#define GARROW_EXECUTOR_GET_PRIVATE(obj) \
46+
static_cast<GArrowExecutorPrivate *>( \
47+
garrow_executor_get_instance_private(GARROW_EXECUTOR(obj)))
48+
49+
enum {
50+
PROP_EXECUTOR = 1,
51+
};
52+
53+
static void
54+
garrow_executor_init(GArrowExecutor *object)
55+
{
56+
auto priv = GARROW_EXECUTOR_GET_PRIVATE(object);
57+
new (&priv->executor) std::shared_ptr<arrow::internal::Executor>;
58+
}
59+
60+
static void
61+
garrow_executor_finalize(GObject *object)
62+
{
63+
auto priv = GARROW_EXECUTOR_GET_PRIVATE(object);
64+
priv->executor.~shared_ptr();
65+
G_OBJECT_CLASS(garrow_executor_parent_class)->finalize(object);
66+
}
67+
68+
static void
69+
garrow_executor_set_property(GObject *object,
70+
guint prop_id,
71+
const GValue *value,
72+
GParamSpec *pspec)
73+
{
74+
auto priv = GARROW_EXECUTOR_GET_PRIVATE(GARROW_EXECUTOR(object));
75+
76+
switch (prop_id) {
77+
case PROP_EXECUTOR:
78+
priv->executor = *static_cast<std::shared_ptr<arrow::internal::Executor> *>(
79+
g_value_get_pointer(value));
80+
break;
81+
default:
82+
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
83+
break;
84+
}
85+
}
86+
87+
static void
88+
garrow_executor_class_init(GArrowExecutorClass *klass)
89+
{
90+
auto gobject_class = G_OBJECT_CLASS(klass);
91+
92+
gobject_class->finalize = garrow_executor_finalize;
93+
gobject_class->set_property = garrow_executor_set_property;
94+
95+
GParamSpec *spec;
96+
spec = g_param_spec_pointer(
97+
"executor",
98+
"Executor",
99+
"The raw std::shared_ptr<arrow::internal::Executor> *",
100+
static_cast<GParamFlags>(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
101+
g_object_class_install_property(gobject_class, PROP_EXECUTOR, spec);
102+
}
103+
104+
G_DEFINE_TYPE(GArrowThreadPool, garrow_thread_pool, GARROW_TYPE_EXECUTOR)
105+
106+
static void
107+
garrow_thread_pool_init(GArrowThreadPool *object)
108+
{
109+
}
110+
111+
static void
112+
garrow_thread_pool_class_init(GArrowThreadPoolClass *klass)
113+
{
114+
}
115+
116+
/**
117+
* garrow_thread_pool_new:
118+
* @n_threads: The number of threads in the pool.
119+
* @error: (nullable): Return location for a #GError or %NULL.
120+
*
121+
* Returns: (nullable): A newly created #GArrowThreadPool on success,
122+
* %NULL on error.
123+
*
124+
* Since: 23.0.0
125+
*/
126+
GArrowThreadPool *
127+
garrow_thread_pool_new(guint n_threads, GError **error)
128+
{
129+
auto arrow_thread_pool_result = arrow::internal::ThreadPool::Make(n_threads);
130+
if (garrow::check(error, arrow_thread_pool_result, "[thread-pool][new]")) {
131+
auto arrow_thread_pool = *arrow_thread_pool_result;
132+
auto thread_pool = GARROW_THREAD_POOL(
133+
g_object_new(GARROW_TYPE_THREAD_POOL, "executor", &arrow_thread_pool, nullptr));
134+
return thread_pool;
135+
} else {
136+
return nullptr;
137+
}
138+
}
139+
140+
G_END_DECLS
141+
142+
std::shared_ptr<arrow::internal::Executor>
143+
garrow_executor_get_raw(GArrowExecutor *executor)
144+
{
145+
if (!executor)
146+
return nullptr;
147+
148+
auto priv = GARROW_EXECUTOR_GET_PRIVATE(executor);
149+
return priv->executor;
150+
}

0 commit comments

Comments
 (0)