Skip to content

Commit b5b5814

Browse files
committed
Initial implementation
1 parent 60d78ff commit b5b5814

File tree

9 files changed

+462
-13
lines changed

9 files changed

+462
-13
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ TESTS = test-$(PYTHON_TEST_VERSION)/sql/multicorn_cache_invalidation.sql
123123
test-$(PYTHON_TEST_VERSION)/sql/multicorn_sequence_test.sql \
124124
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_date.sql \
125125
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_dict.sql \
126+
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_limit.sql \
126127
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_list.sql \
127128
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_sort.sql \
128129
test-$(PYTHON_TEST_VERSION)/sql/write_savepoints.sql \

flake.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@
156156
pythonVersion = pkgs.lib.versions.majorMinor test_python.version;
157157
isPython312OrHigher = pkgs.lib.versionAtLeast pythonVersion "3.12";
158158

159-
baseTestCount = if pkgs.lib.versionOlder pgMajorVersion "14" then 18 else 19;
159+
baseTestCount = if pkgs.lib.versionOlder pgMajorVersion "14" then 19 else 20;
160160
expectedTestCount = toString (baseTestCount - (if isPython312OrHigher then 1 else 0));
161161
in pkgs.stdenv.mkDerivation {
162162
name = "multicorn2-python-test-pg${test_postgresql.version}-py${test_python.version}";

python/multicorn/__init__.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,21 @@ def can_sort(self, sortkeys):
212212
"""
213213
return []
214214

215+
def can_limit(self, limit, offset):
216+
"""
217+
Method called from the planner to ask the FDW whether it supports LIMIT pushdown.
218+
This method is only called if the rest of the query can be pushed down including the sort and quals. For example,
219+
if the query has a GROUP BY clause, this method will not be called.
220+
221+
Args:
222+
limit (int or None): The limit to apply to the query.
223+
offset (int or None): The offset to apply to the query.
224+
225+
Return:
226+
True if the FDW can support both LIMIT and OFFSET pushdown, Falseotherwise.
227+
"""
228+
return False
229+
215230
def get_path_keys(self):
216231
u"""
217232
Method called from the planner to add additional Path to the planner.
@@ -269,7 +284,7 @@ def get_path_keys(self):
269284
"""
270285
return []
271286

272-
def explain(self, quals, columns, sortkeys=None, verbose=False):
287+
def explain(self, quals, columns, sortkeys=None, verbose=False, limit=None, offset=None):
273288
"""Hook called on explain.
274289
275290
The arguments are the same as the :meth:`execute`, with the addition of
@@ -280,7 +295,7 @@ def explain(self, quals, columns, sortkeys=None, verbose=False):
280295
"""
281296
return []
282297

283-
def execute(self, quals, columns, sortkeys=None):
298+
def execute(self, quals, columns, sortkeys=None, limit=None, offset=None):
284299
"""Execute a query in the foreign data wrapper.
285300
286301
This method is called at the first iteration.
@@ -313,6 +328,8 @@ def execute(self, quals, columns, sortkeys=None):
313328
should be in the sequence.
314329
sortkeys (list): A list of :class:`SortKey`
315330
that the FDW said it can enforce.
331+
limit (int): The limit to apply to the query.
332+
offset (int): The offset to apply to the query.
316333
317334
Returns:
318335
An iterable of python objects which can be converted back to PostgreSQL.

python/multicorn/testfdw.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from itertools import cycle
66
from datetime import datetime
77
from operator import itemgetter
8-
8+
from itertools import islice
99

1010
class TestForeignDataWrapper(ForeignDataWrapper):
1111

@@ -15,6 +15,12 @@ def __init__(self, options, columns):
1515
super(TestForeignDataWrapper, self).__init__(options, columns)
1616
self.columns = columns
1717
self.test_type = options.get('test_type', None)
18+
self.canlimit = options.get('canlimit', False)
19+
if isinstance(self.canlimit, str):
20+
self.canlimit = self.canlimit.lower() == 'true'
21+
self.cansort = options.get('cansort', True)
22+
if isinstance(self.cansort, str):
23+
self.cansort = self.cansort.lower() == 'true'
1824
self.test_subtype = options.get('test_subtype', None)
1925
self.tx_hook = options.get('tx_hook', False)
2026
self._modify_batch_size = int(options.get('modify_batch_size', 1))
@@ -79,7 +85,7 @@ def _as_generator(self, quals, columns):
7985
index)
8086
yield line
8187

82-
def execute(self, quals, columns, sortkeys=None):
88+
def execute(self, quals, columns, sortkeys=None, limit=None, offset=None):
8389
sortkeys = sortkeys or []
8490
log_to_postgres(str(sorted(quals)))
8591
log_to_postgres(str(sorted(columns)))
@@ -99,14 +105,15 @@ def execute(self, quals, columns, sortkeys=None):
99105
k = sortkeys[0];
100106
res = self._as_generator(quals, columns)
101107
if (self.test_type == 'sequence'):
102-
return sorted(res, key=itemgetter(k.attnum - 1),
108+
res = sorted(res, key=itemgetter(k.attnum - 1),
103109
reverse=k.is_reversed)
104110
else:
105-
return sorted(res, key=itemgetter(k.attname),
111+
res = sorted(res, key=itemgetter(k.attname),
106112
reverse=k.is_reversed)
107-
return self._as_generator(quals, columns)
113+
return res[offset:offset + limit] if offset else res[:limit]
114+
return islice(self._as_generator(quals, columns), offset, (offset or 0) + limit if limit else None)
108115

109-
def explain(self, quals, columns, sortkeys=None, verbose=False):
116+
def explain(self, quals, columns, sortkeys=None, verbose=False, limit=None, offset=None):
110117
if self.noisy_explain:
111118
log_to_postgres("EXPLAIN quals=%r" % (sorted(quals),))
112119
log_to_postgres("EXPLAIN columns=%r" % (sorted(columns),))
@@ -127,8 +134,13 @@ def get_path_keys(self):
127134

128135
def can_sort(self, sortkeys):
129136
# assume sort pushdown ok for all cols, in any order, any collation
137+
if not self.cansort:
138+
return []
130139
return sortkeys
131140

141+
def can_limit(self, limit, offset):
142+
return self.canlimit
143+
132144
def update(self, rowid, newvalues):
133145
if self.test_type == 'nowrite':
134146
super(TestForeignDataWrapper, self).update(rowid, newvalues)

src/multicorn.c

Lines changed: 187 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ static void multicornGetForeignRelSize(PlannerInfo *root,
5656
static void multicornGetForeignPaths(PlannerInfo *root,
5757
RelOptInfo *baserel,
5858
Oid foreigntableid);
59+
static void multicornGetForeignUpperPaths(PlannerInfo *root,
60+
UpperRelationKind stage,
61+
RelOptInfo *input_rel,
62+
RelOptInfo *output_rel,
63+
void *extra);
64+
5965
static ForeignScan *multicornGetForeignPlan(PlannerInfo *root,
6066
RelOptInfo *baserel,
6167
Oid foreigntableid,
@@ -119,6 +125,11 @@ static void multicorn_xact_callback(XactEvent event, void *arg);
119125
void *serializePlanState(MulticornPlanState * planstate);
120126
MulticornExecState *initializeExecState(void *internal_plan_state);
121127

128+
static void add_foreign_final_paths(PlannerInfo *root,
129+
RelOptInfo *input_rel,
130+
RelOptInfo *final_rel,
131+
FinalPathExtraData *extra);
132+
122133
/* Hash table mapping oid to fdw instances */
123134
HTAB *InstancesHash;
124135

@@ -174,6 +185,7 @@ multicorn_handler(PG_FUNCTION_ARGS)
174185
/* Plan phase */
175186
fdw_routine->GetForeignRelSize = multicornGetForeignRelSize;
176187
fdw_routine->GetForeignPaths = multicornGetForeignPaths;
188+
fdw_routine->GetForeignUpperPaths = multicornGetForeignUpperPaths;
177189
fdw_routine->GetForeignPlan = multicornGetForeignPlan;
178190
fdw_routine->ExplainForeignScan = multicornExplainForeignScan;
179191

@@ -359,7 +371,7 @@ multicornGetForeignPaths(PlannerInfo *root,
359371
/* Try to find parameterized paths */
360372
pathes = findPaths(root, baserel, possiblePaths, planstate->startupCost,
361373
planstate, apply_pathkeys, deparsed_pathkeys);
362-
374+
363375
/* Add a simple default path */
364376
pathes = lappend(pathes, create_foreignscan_path(root, baserel,
365377
NULL, /* default pathtarget */
@@ -403,6 +415,11 @@ multicornGetForeignPaths(PlannerInfo *root,
403415
{
404416
ForeignPath *newpath;
405417

418+
MulticornPathState *pathstate = (MulticornPathState *)calloc(1, sizeof(MulticornPathState));
419+
pathstate->pathkeys = deparsed_pathkeys;
420+
pathstate->limit = -1;
421+
pathstate->offset = -1;
422+
406423
newpath = create_foreignscan_path(root, baserel,
407424
NULL, /* default pathtarget */
408425
path->path.rows,
@@ -415,7 +432,7 @@ multicornGetForeignPaths(PlannerInfo *root,
415432
#if PG_VERSION_NUM >= 170000
416433
NULL,
417434
#endif
418-
(void *) deparsed_pathkeys);
435+
(void *)pathstate);
419436

420437
newpath->path.param_info = path->path.param_info;
421438
add_path(baserel, (Path *) newpath);
@@ -424,6 +441,150 @@ multicornGetForeignPaths(PlannerInfo *root,
424441
errorCheck();
425442
}
426443

444+
/*
445+
* multicornGetForeignUpperPaths
446+
* Add paths for post-join operations like aggregation, grouping etc. if
447+
* corresponding operations are safe to push down.
448+
*
449+
* Right now, we only support limit/offset pushdown. We'll add others later.
450+
*/
451+
static void multicornGetForeignUpperPaths(PlannerInfo *root,
452+
UpperRelationKind stage,
453+
RelOptInfo *input_rel,
454+
RelOptInfo *output_rel,
455+
void *extra)
456+
{
457+
switch (stage)
458+
{
459+
case UPPERREL_FINAL:
460+
add_foreign_final_paths(root, input_rel, output_rel, (FinalPathExtraData *)extra);
461+
break;
462+
default:
463+
break;
464+
}
465+
}
466+
467+
/*
468+
* add_foreign_final_paths
469+
* Add foreign paths for performing the final processing remotely.
470+
*
471+
* Given input_rel contains the source-data Paths. The paths are added to the
472+
* given final_rel.
473+
*/
474+
static void
475+
add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
476+
RelOptInfo *final_rel,
477+
FinalPathExtraData *extra)
478+
{
479+
Query *parse = root->parse;
480+
MulticornPathState *pathstate;
481+
MulticornPlanState *planstate;
482+
ForeignPath *final_path;
483+
int limitCount = -1;
484+
int limitOffset = -1;
485+
Path *cheapest_path;
486+
List *deparsed_pathkeys = NIL;
487+
List *applied_pathkeys = NIL;
488+
ListCell *lc;
489+
490+
/* No work if there is no need to add a LIMIT node */
491+
if (!extra->limit_needed)
492+
return;
493+
494+
/* We only support limits for SELECT commands */
495+
if (parse->commandType != CMD_SELECT)
496+
return;
497+
498+
/* We do not support pushing down FETCH FIRST .. WITH TIES */
499+
if (parse->limitOption == LIMIT_OPTION_WITH_TIES)
500+
return;
501+
502+
/* only push down constant LIMITs... */
503+
if ((parse->limitCount && !IsA(parse->limitCount, Const)) || (parse->limitOffset && !IsA(parse->limitOffset, Const)))
504+
return;
505+
506+
/* ... which are not NULL */
507+
if((parse->limitCount && ((Const *)parse->limitCount)->constisnull) || (parse->limitOffset && ((Const *)parse->limitOffset)->constisnull))
508+
return;
509+
510+
/* Extract the limit and offset */
511+
if (parse->limitCount)
512+
limitCount = DatumGetInt32(((Const *)parse->limitCount)->constvalue);
513+
514+
if (parse->limitOffset)
515+
limitOffset = DatumGetInt32(((Const *)parse->limitOffset)->constvalue);
516+
517+
/* Get the current input_rel and it's planstate */
518+
planstate = input_rel->fdw_private;
519+
// TODO: Maybe this isn't needed if we handle the previous stages correctly?
520+
if (!planstate)
521+
{
522+
for (int i = 1; i < root->simple_rel_array_size; i++)
523+
{
524+
RelOptInfo *rel = root->simple_rel_array[i];
525+
if (rel && rel->reloptkind == RELOPT_BASEREL)
526+
{
527+
planstate = rel->fdw_private;
528+
input_rel->fdw_private = planstate;
529+
input_rel = rel;
530+
break;
531+
}
532+
}
533+
}
534+
535+
/* Extract pathkeys from the cheapest path's fdw_private if it exists */
536+
cheapest_path = input_rel->cheapest_total_path;
537+
if (cheapest_path && IsA(cheapest_path, ForeignPath))
538+
{
539+
ForeignPath *foreign_path = (ForeignPath *)cheapest_path;
540+
if (foreign_path->fdw_private)
541+
{
542+
MulticornPathState *input_pathstate = (MulticornPathState *)foreign_path->fdw_private;
543+
deparsed_pathkeys = input_pathstate->pathkeys;
544+
}
545+
}
546+
547+
/* Extract the pathkeys from the input_rel */
548+
foreach(lc, input_rel->pathlist)
549+
{
550+
Path *path = (Path *) lfirst(lc);
551+
if (IsA(path, ForeignPath))
552+
{
553+
ForeignPath *fpath = (ForeignPath *) path;
554+
if (fpath->path.pathkeys != NIL)
555+
applied_pathkeys = fpath->path.pathkeys;
556+
}
557+
}
558+
559+
/* We only support limit/offset if the sort is completely pushed down */
560+
if (!pathkeys_contained_in(root->sort_pathkeys, applied_pathkeys))
561+
return;
562+
563+
/* Check if Python FWD can push down the LIMIT/OFFSET */
564+
if (!canLimit(planstate, limitCount, limitOffset))
565+
return;
566+
567+
/* Create foreign final path with the correct number of rows, and include state for limit/offset pushdown */
568+
pathstate = (MulticornPathState *)calloc(1, sizeof(MulticornPathState));
569+
pathstate->pathkeys = deparsed_pathkeys;
570+
pathstate->limit = limitCount;
571+
pathstate->offset = limitOffset;
572+
final_path = create_foreign_upper_path(root,
573+
input_rel,
574+
root->upper_targets[UPPERREL_FINAL],
575+
limitCount,
576+
planstate->startupCost,
577+
limitCount * planstate->width,
578+
NULL, /* pathkeys will be applied in the input_rel */
579+
NULL, /* no extra plan */
580+
#if PG_VERSION_NUM >= 170000
581+
NULL, /* no fdw_restrictinfo list */
582+
#endif
583+
(void*)pathstate);
584+
/* and add it to the final_rel */
585+
add_path(final_rel, (Path *) final_path);
586+
}
587+
427588
/*
428589
* multicornGetForeignPlan
429590
* Create a ForeignScan plan node for scanning the foreign table
@@ -458,7 +619,21 @@ multicornGetForeignPlan(PlannerInfo *root,
458619
&planstate->qual_list);
459620
}
460621
}
461-
planstate->pathkeys = (List *) best_path->fdw_private;
622+
623+
if (best_path->fdw_private)
624+
{
625+
MulticornPathState *pathstate = (MulticornPathState *) best_path->fdw_private;
626+
planstate->pathkeys = pathstate->pathkeys;
627+
planstate->limit = pathstate->limit;
628+
planstate->offset = pathstate->offset;
629+
}
630+
else
631+
{
632+
planstate->pathkeys = NIL;
633+
planstate->limit = -1;
634+
planstate->offset = -1;
635+
}
636+
462637
return make_foreignscan(tlist,
463638
scan_clauses,
464639
scan_relid,
@@ -1162,13 +1337,19 @@ serializePlanState(MulticornPlanState * state)
11621337
List *result = NULL;
11631338

11641339
result = lappend(result, makeConst(INT4OID,
1165-
-1, InvalidOid, 4, Int32GetDatum(state->numattrs), false, true));
1340+
-1, InvalidOid, 4, Int32GetDatum(state->numattrs), false, true));
11661341
result = lappend(result, makeConst(INT4OID,
11671342
-1, InvalidOid, 4, Int32GetDatum(state->foreigntableid), false, true));
11681343
result = lappend(result, state->target_list);
11691344

11701345
result = lappend(result, serializeDeparsedSortGroup(state->pathkeys));
11711346

1347+
result = lappend(result, makeConst(INT4OID,
1348+
-1, InvalidOid, 4, Int32GetDatum(state->limit), false, true));
1349+
1350+
result = lappend(result, makeConst(INT4OID,
1351+
-1, InvalidOid, 4, Int32GetDatum(state->offset), false, true));
1352+
11721353
return result;
11731354
}
11741355

@@ -1195,5 +1376,7 @@ initializeExecState(void *internalstate)
11951376
execstate->cinfos = palloc0(sizeof(ConversionInfo *) * attnum);
11961377
execstate->values = palloc(attnum * sizeof(Datum));
11971378
execstate->nulls = palloc(attnum * sizeof(bool));
1379+
execstate->limit = DatumGetInt32(((Const*)list_nth(values,4))->constvalue);
1380+
execstate->offset = DatumGetInt32(((Const*)list_nth(values,5))->constvalue);
11981381
return execstate;
11991382
}

0 commit comments

Comments
 (0)