Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ TESTS = test-$(PYTHON_TEST_VERSION)/sql/multicorn_cache_invalidation.sql
test-$(PYTHON_TEST_VERSION)/sql/multicorn_sequence_test.sql \
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_date.sql \
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_dict.sql \
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_limit.sql \
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_list.sql \
test-$(PYTHON_TEST_VERSION)/sql/multicorn_test_sort.sql \
test-$(PYTHON_TEST_VERSION)/sql/write_savepoints.sql \
Expand Down
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
pythonVersion = pkgs.lib.versions.majorMinor test_python.version;
isPython312OrHigher = pkgs.lib.versionAtLeast pythonVersion "3.12";

baseTestCount = if pkgs.lib.versionOlder pgMajorVersion "14" then 18 else 19;
baseTestCount = if pkgs.lib.versionOlder pgMajorVersion "14" then 19 else 20;
expectedTestCount = toString (baseTestCount - (if isPython312OrHigher then 1 else 0));
in pkgs.stdenv.mkDerivation {
name = "multicorn2-python-test-pg${test_postgresql.version}-py${test_python.version}";
Expand Down
25 changes: 23 additions & 2 deletions python/multicorn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,25 @@ def can_sort(self, sortkeys):
"""
return []

def can_limit(self, limit, offset):
"""
Method called from the planner to ask the FDW whether it supports LIMIT and OFFSET pushdown.

This method is only called if the entire query can be pushed down. For example, if the query has
a GROUP BY clause, this method will not be called. Or, if only part of the sort is pushed down,
this method will not be called and limit/offset will not be pushed down.

Currently, we do not support pushing down limit/offset if the query includes a WHERE clause (quals).

Args:
limit (int or None): The limit to apply to the query, if any.
offset (int or None): The offset to apply to the query, if any.

Return:
True if the FDW can support both LIMIT and OFFSET pushdown, False otherwise.
"""
return False

def get_path_keys(self):
u"""
Method called from the planner to add additional Path to the planner.
Expand Down Expand Up @@ -269,7 +288,7 @@ def get_path_keys(self):
"""
return []

def explain(self, quals, columns, sortkeys=None, verbose=False):
def explain(self, quals, columns, sortkeys=None, verbose=False, limit=None, offset=None):
"""Hook called on explain.

The arguments are the same as the :meth:`execute`, with the addition of
Expand All @@ -280,7 +299,7 @@ def explain(self, quals, columns, sortkeys=None, verbose=False):
"""
return []

def execute(self, quals, columns, sortkeys=None):
def execute(self, quals, columns, sortkeys=None, limit=None, offset=None):
"""Execute a query in the foreign data wrapper.

This method is called at the first iteration.
Expand Down Expand Up @@ -313,6 +332,8 @@ def execute(self, quals, columns, sortkeys=None):
should be in the sequence.
sortkeys (list): A list of :class:`SortKey`
that the FDW said it can enforce.
limit (int or None): The limit to apply to the query, if any.
offset (int or None): The offset to apply to the query, if any.

Returns:
An iterable of python objects which can be converted back to PostgreSQL.
Expand Down
28 changes: 20 additions & 8 deletions python/multicorn/testfdw.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from itertools import cycle
from datetime import datetime
from operator import itemgetter

from itertools import islice

class TestForeignDataWrapper(ForeignDataWrapper):

Expand All @@ -15,6 +15,12 @@ def __init__(self, options, columns):
super(TestForeignDataWrapper, self).__init__(options, columns)
self.columns = columns
self.test_type = options.get('test_type', None)
self.canlimit = options.get('canlimit', False)
if isinstance(self.canlimit, str):
self.canlimit = self.canlimit.lower() == 'true'
self.cansort = options.get('cansort', True)
if isinstance(self.cansort, str):
self.cansort = self.cansort.lower() == 'true'
self.test_subtype = options.get('test_subtype', None)
self.tx_hook = options.get('tx_hook', False)
self._modify_batch_size = int(options.get('modify_batch_size', 1))
Expand Down Expand Up @@ -79,7 +85,7 @@ def _as_generator(self, quals, columns):
index)
yield line

def execute(self, quals, columns, sortkeys=None):
def execute(self, quals, columns, sortkeys=None, limit=None, offset=None):
sortkeys = sortkeys or []
log_to_postgres(str(sorted(quals)))
log_to_postgres(str(sorted(columns)))
Expand All @@ -99,14 +105,15 @@ def execute(self, quals, columns, sortkeys=None):
k = sortkeys[0];
res = self._as_generator(quals, columns)
if (self.test_type == 'sequence'):
return sorted(res, key=itemgetter(k.attnum - 1),
res = sorted(res, key=itemgetter(k.attnum - 1),
reverse=k.is_reversed)
else:
return sorted(res, key=itemgetter(k.attname),
res = sorted(res, key=itemgetter(k.attname),
reverse=k.is_reversed)
return self._as_generator(quals, columns)
return res[offset:offset + limit] if offset else res[:limit]
return islice(self._as_generator(quals, columns), offset, (offset or 0) + limit if limit else None)

def explain(self, quals, columns, sortkeys=None, verbose=False):
def explain(self, quals, columns, sortkeys=None, verbose=False, limit=None, offset=None):
if self.noisy_explain:
log_to_postgres("EXPLAIN quals=%r" % (sorted(quals),))
log_to_postgres("EXPLAIN columns=%r" % (sorted(columns),))
Expand All @@ -126,8 +133,13 @@ def get_path_keys(self):
return []

def can_sort(self, sortkeys):
# assume sort pushdown ok for all cols, in any order, any collation
return sortkeys
# assume sort pushdown ok only for first sort key
if not self.cansort:
return []
return sortkeys[:1]

def can_limit(self, limit, offset):
return self.canlimit

def update(self, rowid, newvalues):
if self.test_type == 'nowrite':
Expand Down
Loading
Loading