Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 59 additions & 61 deletions ckanext/datastore/backend/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
_UPSERT = 'upsert'
_UPDATE = 'update'

MAX_BUCKETS = 60 # easy to reduce to 30, 20, 15, 12, 10, …

if not os.environ.get('DATASTORE_LOAD'):
ValidationError = toolkit.ValidationError # type: ignore
Expand Down Expand Up @@ -1645,86 +1646,75 @@ def search_data_buckets(context: Context, data_dict: dict[str, Any]):
FROM
{resource} {ts_query} {where}
),
data_{index} AS (
SELECT
width_bucket({column}, data_stats_{index}.min_val, data_stats_{index}.max_val, {num_buckets}) AS bucket,
count(*) AS freq
FROM
{resource},
data_stats_{index}
{ts_query}
{where}
GROUP BY
bucket
ORDER BY
bucket
)'''

sql_datetime_fmt = '''
data_stats_{index} AS (
SELECT
min(EXTRACT(EPOCH FROM {column})) AS min_val,
max(EXTRACT(EPOCH FROM {column})) AS max_val
FROM
{resource} {ts_query} {where}
edges_{index} AS (
SELECT DISTINCT
(data_stats_{index}.min_val + (
generate_series(0, {num_buckets})
* (data_stats_{index}.max_val - data_stats_{index}.min_val)
/ {num_buckets}
))::{ftype} e
FROM data_stats_{index}
ORDER BY e
),
data_{index} AS (
SELECT
width_bucket(EXTRACT(EPOCH FROM {column}), data_stats_{index}.min_val, data_stats_{index}.max_val, {num_buckets}) AS bucket,
count(*) AS freq
SELECT val, coalesce(freq, 0) freq
FROM
{resource},
data_stats_{index}
{ts_query}
{where}
GROUP BY
bucket
ORDER BY
bucket
unnest(array(select * from edges_{index})) with ordinality as val
FULL JOIN
(
SELECT
width_bucket({column}, array(select * from edges_{index})) AS bucket,
count(*) AS freq
FROM
{resource},
data_stats_{index}
{ts_query}
{where}
GROUP BY
bucket
) b
ON ordinality = bucket
)'''

data_dict['records'] = {}
with_queries = []
select_queries = []
params = {}
fid_map = []

index = 0
rfields = []
for fid, ftype in fields_types.items():
index += 1

if fid == '_id':
continue

sql_string = None

if ftype in ['int', 'int4',
'bigint', 'int8',
'decimal', 'numeric']:
'decimal', 'numeric',
'datetime', 'date',
'timestamp', 'timestamptz']:
sql_string = sql_number_fmt
elif ftype in ['datetime', 'date',
'timestamp', 'timestamptz']:
sql_string = sql_datetime_fmt

if not sql_string:
continue

rfields.append({'id': fid, 'type': ftype})

with_queries.append(
sql_string.format(
index=index,
index=len(rfields),
column=identifier(fid),
resource=identifier(resource_id),
ts_query=ts_query,
where=where_clause,
num_buckets=data_dict['buckets']))
num_buckets=data_dict['buckets'],
ftype=ftype,
))

# FIXME: can datastore have multiple columns with same name??
select_queries.append('''
array(SELECT freq FROM data_{index}) AS {column}'''.format(
index=index,
column=identifier(fid)))

fid_map.append(fid)
array(SELECT freq FROM data_{index}) AS freq_{index},
array(SELECT val FROM data_{index}) AS edge_{index}
'''.format(index=len(rfields))
)

for chunk in where_values:
params.update(chunk)
Expand All @@ -1735,19 +1725,27 @@ def search_data_buckets(context: Context, data_dict: dict[str, Any]):
with_statements=','.join(with_queries),
array_statments=','.join(select_queries))

results = context['connection'].execute(
result = context['connection'].execute(
sa.text(aggregated_sql_string),
params
).mappings().all()[0]

data_dict['buckets'] = dict(results)

data_dict['fields'] = _result_fields(
fields_types,
_get_field_info(context['connection'], data_dict['resource_id']),
datastore_helpers.get_list(data_dict.get('fields')))

return data_dict
).mappings().one()

for i, rf in enumerate(rfields, 1):
buckets = result[f'freq_{i}']
edges = result[f'edge_{i}']
rf['nulls'] = 0
if result[f'edge_{i}'][-1:] == [None]:
edges.pop()
rf['nulls'] = buckets.pop()
if edges == [None]: # all nulls returns two null edges
edges = []
buckets = []
# last value returned contains count exactly matching max value
# combine with bucket before
rf['buckets'] = buckets[:-2] + ([sum(buckets[-2:])] if buckets else [])
rf['edges'] = edges

return {'fields': rfields}


def _execute_single_statement_copy_to(
Expand Down
5 changes: 0 additions & 5 deletions ckanext/datastore/logic/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,11 +777,6 @@ def datastore_search_buckets(context: Context, data_dict: dict[str, Any]):
:rtype: A dictionary with the following keys
:param fields: fields/columns and their extra metadata
:type fields: list of dictionaries
:param filters: query filters
:type filters: list of dictionaries
:param buckets: dict of matching results
:type buckets: dict of field ids and bucketed data

'''
backend = DatastoreBackend.get_active_backend()
schema = context.get('schema', dsschema.datastore_search_buckets_schema())
Expand Down
6 changes: 5 additions & 1 deletion ckanext/datastore/logic/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ def datastore_search_buckets_schema() -> Schema:
'resource_id': [not_missing, not_empty, unicode_safe],
'id': [ignore_missing],
'q': [ignore_missing, unicode_or_json_validator],
'buckets': [default(12), int_validator],
'buckets': [
default(12),
natural_number_validator,
limit_to_configured_maximum('ckan.datastore.search.buckets_max', 300),
],
'plain': [ignore_missing, boolean_validator],
'filters': [ignore_missing, json_validator],
'language': [ignore_missing, unicode_safe],
Expand Down
92 changes: 92 additions & 0 deletions ckanext/datastore/tests/test_histogram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# encoding: utf-8

import pytest
import sqlalchemy as sa
import sqlalchemy.orm as orm
from datetime import date, datetime
from unittest import mock

import ckan.logic as logic
import ckan.model as model
import ckan.plugins as p
import ckan.tests.factories as factories
import ckan.tests.helpers as helpers
from ckan.lib.helpers import url_for
import ckanext.datastore.backend.postgres as db
from ckanext.datastore.tests.helpers import extract


@pytest.mark.ckan_config("ckan.plugins", "datastore")
@pytest.mark.usefixtures("with_plugins", "with_request_context")
def test_histograms():
resource = factories.Resource(url_type="datastore")
helpers.call_action(
"datastore_create",
resource_id=resource["id"],
fields=[
{"id": "all_null", "type": "int"},
{"id": "one", "type": "numeric"},
{"id": "short", "type": "int"},
{"id": "nums", "type": "numeric"},
{"id": "days", "type": "date"},
{"id": "ts", "type": "timestamp"}
],
records=[
{"one": 1, "short": 2, "nums": -4, "days": "2026-01-01", "ts": "2026-01-01"},
{"one": 1, "short": 2, "nums": 20, "days": "2026-01-03", "ts": "2026-01-03"},
{"one": 1, "short": 2, "nums": 10, "days": "2026-02-01", "ts": "2026-02-01"},
{"one": 1, "short": 8, "nums": 15, "days": "2026-01-06", "ts": "2026-01-06"},
{"one": 1, "short": 9, "nums": 15, "days": "2026-02-01", "ts": "2026-02-01"},
{"one": 1, "short": 3, "nums": -1, "days": "2026-01-18", "ts": "2026-01-18"},
],
)
results = helpers.call_action(
"datastore_search_buckets",
resource_id=resource["id"],
buckets=4,
)
assert results["fields"] == [
{"id": "all_null", "buckets": [], "edges": [], "nulls": 6, "type": "int4"},
{"id": "one", "buckets": [6], "edges": [1], "nulls": 0, "type": "numeric"},
{
"id": "short",
"buckets": [3, 1, 0, 2],
"edges": [2, 3, 5, 7, 9],
"nulls": 0,
"type": "int4",
},
{
"id": "nums",
"buckets": [2, 0, 1, 3],
"edges": [-4, 2, 8, 14, 20],
"nulls": 0,
"type": "numeric",
},
{
"id": "days",
"buckets": [3, 0, 1, 2],
"edges": [
date(2026, 1, 1),
date(2026, 1, 8),
date(2026, 1, 16),
date(2026, 1, 24),
date(2026, 2, 1),
],
"nulls": 0,
"type": "date",
},
{
"id": "ts",
"buckets": [3, 0, 1, 2],
"edges": [
datetime(2026, 1, 1, 0),
datetime(2026, 1, 8, 18),
datetime(2026, 1, 16, 12),
datetime(2026, 1, 24, 6),
datetime(2026, 2, 1, 0),

],
"nulls": 0,
"type": "timestamp",
}
]