Skip to content

Commit c47647b

Browse files
authored
Limit/Offset Pushdown Support (#82)
If an FDW implements `can_limit`, a query has no WHERE clauses, and it meets some other technical criteria, multicorn2 can push down LIMIT and OFFSET capabilities to the FDW.
1 parent 60d78ff commit c47647b

File tree

11 files changed

+1059
-14
lines changed

11 files changed

+1059
-14
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ TESTS = test-$(PYTHON_TEST_VERSION)/sql/multicorn_cache_invalidation.sql
123123
test-$(PYTHON_TEST_VERSION)/sql/multicorn_sequence_test.sql \
124124
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_date.sql \
125125
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_dict.sql \
126+
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_limit.sql \
126127
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_list.sql \
127128
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_sort.sql \
128129
test-$(PYTHON_TEST_VERSION)/sql/write_savepoints.sql \

flake.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@
156156
pythonVersion = pkgs.lib.versions.majorMinor test_python.version;
157157
isPython312OrHigher = pkgs.lib.versionAtLeast pythonVersion "3.12";
158158

159-
baseTestCount = if pkgs.lib.versionOlder pgMajorVersion "14" then 18 else 19;
159+
baseTestCount = if pkgs.lib.versionOlder pgMajorVersion "14" then 19 else 20;
160160
expectedTestCount = toString (baseTestCount - (if isPython312OrHigher then 1 else 0));
161161
in pkgs.stdenv.mkDerivation {
162162
name = "multicorn2-python-test-pg${test_postgresql.version}-py${test_python.version}";

python/multicorn/__init__.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,25 @@ def can_sort(self, sortkeys):
212212
"""
213213
return []
214214

215+
def can_limit(self, limit, offset):
216+
"""
217+
Method called from the planner to ask the FDW whether it supports LIMIT and OFFSET pushdown.
218+
219+
This method is only called if the entire query can be pushed down. For example, if the query has
220+
a GROUP BY clause, this method will not be called. Or, if only part of the sort is pushed down,
221+
this method will not be called and limit/offset will not be pushed down.
222+
223+
Currently, we do not support pushing down limit/offset if the query includes a WHERE clause (quals).
224+
225+
Args:
226+
limit (int or None): The limit to apply to the query, if any.
227+
offset (int or None): The offset to apply to the query, if any.
228+
229+
Return:
230+
True if the FDW can support both LIMIT and OFFSET pushdown, False otherwise.
231+
"""
232+
return False
233+
215234
def get_path_keys(self):
216235
u"""
217236
Method called from the planner to add additional Path to the planner.
@@ -269,7 +288,7 @@ def get_path_keys(self):
269288
"""
270289
return []
271290

272-
def explain(self, quals, columns, sortkeys=None, verbose=False):
291+
def explain(self, quals, columns, sortkeys=None, verbose=False, limit=None, offset=None):
273292
"""Hook called on explain.
274293
275294
The arguments are the same as the :meth:`execute`, with the addition of
@@ -280,7 +299,7 @@ def explain(self, quals, columns, sortkeys=None, verbose=False):
280299
"""
281300
return []
282301

283-
def execute(self, quals, columns, sortkeys=None):
302+
def execute(self, quals, columns, sortkeys=None, limit=None, offset=None):
284303
"""Execute a query in the foreign data wrapper.
285304
286305
This method is called at the first iteration.
@@ -313,6 +332,8 @@ def execute(self, quals, columns, sortkeys=None):
313332
should be in the sequence.
314333
sortkeys (list): A list of :class:`SortKey`
315334
that the FDW said it can enforce.
335+
limit (int or None): The limit to apply to the query, if any.
336+
offset (int or None): The offset to apply to the query, if any.
316337
317338
Returns:
318339
An iterable of python objects which can be converted back to PostgreSQL.

python/multicorn/testfdw.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from itertools import cycle
66
from datetime import datetime
77
from operator import itemgetter
8-
8+
from itertools import islice
99

1010
class TestForeignDataWrapper(ForeignDataWrapper):
1111

@@ -15,6 +15,12 @@ def __init__(self, options, columns):
1515
super(TestForeignDataWrapper, self).__init__(options, columns)
1616
self.columns = columns
1717
self.test_type = options.get('test_type', None)
18+
self.canlimit = options.get('canlimit', False)
19+
if isinstance(self.canlimit, str):
20+
self.canlimit = self.canlimit.lower() == 'true'
21+
self.cansort = options.get('cansort', True)
22+
if isinstance(self.cansort, str):
23+
self.cansort = self.cansort.lower() == 'true'
1824
self.test_subtype = options.get('test_subtype', None)
1925
self.tx_hook = options.get('tx_hook', False)
2026
self._modify_batch_size = int(options.get('modify_batch_size', 1))
@@ -79,7 +85,7 @@ def _as_generator(self, quals, columns):
7985
index)
8086
yield line
8187

82-
def execute(self, quals, columns, sortkeys=None):
88+
def execute(self, quals, columns, sortkeys=None, limit=None, offset=None):
8389
sortkeys = sortkeys or []
8490
log_to_postgres(str(sorted(quals)))
8591
log_to_postgres(str(sorted(columns)))
@@ -99,14 +105,15 @@ def execute(self, quals, columns, sortkeys=None):
99105
k = sortkeys[0];
100106
res = self._as_generator(quals, columns)
101107
if (self.test_type == 'sequence'):
102-
return sorted(res, key=itemgetter(k.attnum - 1),
108+
res = sorted(res, key=itemgetter(k.attnum - 1),
103109
reverse=k.is_reversed)
104110
else:
105-
return sorted(res, key=itemgetter(k.attname),
111+
res = sorted(res, key=itemgetter(k.attname),
106112
reverse=k.is_reversed)
107-
return self._as_generator(quals, columns)
113+
return res[offset:offset + limit] if offset else res[:limit]
114+
return islice(self._as_generator(quals, columns), offset, (offset or 0) + limit if limit else None)
108115

109-
def explain(self, quals, columns, sortkeys=None, verbose=False):
116+
def explain(self, quals, columns, sortkeys=None, verbose=False, limit=None, offset=None):
110117
if self.noisy_explain:
111118
log_to_postgres("EXPLAIN quals=%r" % (sorted(quals),))
112119
log_to_postgres("EXPLAIN columns=%r" % (sorted(columns),))
@@ -126,8 +133,13 @@ def get_path_keys(self):
126133
return []
127134

128135
def can_sort(self, sortkeys):
129-
# assume sort pushdown ok for all cols, in any order, any collation
130-
return sortkeys
136+
# assume sort pushdown ok only for first sort key
137+
if not self.cansort:
138+
return []
139+
return sortkeys[:1]
140+
141+
def can_limit(self, limit, offset):
142+
return self.canlimit
131143

132144
def update(self, rowid, newvalues):
133145
if self.test_type == 'nowrite':

0 commit comments

Comments
 (0)