Skip to content

Commit d6b8f26

Browse files
authored
Unit testing & review comments (distributed-system-analysis#3608)
* Unit testing & review comments PBENCH-1315 The index-map fix was submitted without unit tests so we can get it deployed to resolve the immediate PostgreSQL resource problems. This follow-on adds unit testing (and some late review comments). This proved an "interesting" journey. I found several returns that don't really make sense: for example, the old delete/update logic reported a `Sync.update` failure as `CONFLICT` rather than `INTERNAL_SERVER_ERROR`. The fixture we use install the Elasticsearch mock response and call the API needs to know whether to install the mock: it has to be right as either installing the mock and not calling it or calling it without the mock will result in failure. The current logic around installation of the mock proved difficult to reconcile with the need to handle `INTERNAL_SERVER_ERROR` both *before* and *after* the call to Elasticsearch. As a result, I ended up massively refactoring the `query_api` fixture code to simplify the logic based on a three-state override (neutral, force, or suppress) in the new `expect_call` parameter. Incidentally, after the production server PostgreSQL recovery, I found that my report generator was tripping over datasets which had no operational status: I suspect this was due to some hole in intake (although I think that failure to create the `UPLOAD` operational status should have failed the intake, I'm not going to debug that today). I did, however, adjust the report generator to detect and report this case gracefully instead of failing with an f-string formatting error. I also added a summary line of rows, tables, and size for the SQL report.
1 parent a74911d commit d6b8f26

File tree

11 files changed

+803
-160
lines changed

11 files changed

+803
-160
lines changed

lib/pbench/cli/server/report.py

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -295,13 +295,17 @@ def report_sql():
295295
"""Report the SQL table storage statistics"""
296296

297297
watcher.update("inspecting SQL tables")
298+
table_count = 0
299+
row_count = 0
300+
row_size = 0
298301
click.echo("SQL storage report:")
299302
t_w = 20
300303
r_w = 10
301304
s_w = 10
302305
click.echo(f" {'Table':<{t_w}} {'Rows':<{r_w}} {'Storage':<{s_w}}")
303306
click.echo(f" {'':-<{t_w}} {'':-<{r_w}} {'':-<{s_w}}")
304307
for t in inspect(Database.db_session.get_bind()).get_table_names():
308+
table_count += 1
305309
(rows,) = next(
306310
Database.db_session.execute(statement=text(f"SELECT COUNT(*) FROM {t}"))
307311
)
@@ -312,6 +316,11 @@ def report_sql():
312316
)
313317
)
314318
click.echo(f" {t:<{t_w}} {rows:>{r_w},d} {humanize.naturalsize(size):>{s_w}}")
319+
row_count += rows
320+
row_size += size
321+
click.echo(
322+
f" Total of {row_count:,d} rows in {table_count:,d} tables, consuming {humanize.naturalsize(row_size)}"
323+
)
315324

316325
if not detailer:
317326
return
@@ -357,6 +366,7 @@ def report_states():
357366
index_pattern: re.Pattern = re.compile(r"^(\d+):(.*)$")
358367
index_errors = defaultdict(int)
359368
index_messages = defaultdict(str)
369+
ops_anomalies = 0
360370
operations = defaultdict(lambda: defaultdict(int))
361371
rows = Database.db_session.execute(
362372
statement=text(
@@ -366,22 +376,26 @@ def report_states():
366376
)
367377
for dataset, operation, state, message in rows:
368378
watcher.update(f"inspecting {dataset}:{operation}")
369-
operations[operation][state] += 1
370-
if state == "FAILED":
371-
detailer.error(f"{operation} {state} for {dataset}: {message!r}")
372-
if operation == "INDEX":
373-
match = index_pattern.match(message)
374-
if match:
375-
try:
376-
code = int(match.group(1))
377-
message = match.group(2)
378-
index_errors[code] += 1
379-
if code not in index_messages:
380-
index_messages[code] = message
381-
except Exception as e:
382-
detailer.error(
383-
f"{dataset} unexpected 'INDEX' error {message}: {str(e)!r}"
384-
)
379+
if operation is None:
380+
ops_anomalies += 1
381+
detailer.error(f"{dataset} doesn't have operational state")
382+
else:
383+
operations[operation][state] += 1
384+
if state == "FAILED":
385+
detailer.error(f"{operation} {state} for {dataset}: {message!r}")
386+
if operation == "INDEX":
387+
match = index_pattern.match(message)
388+
if match:
389+
try:
390+
code = int(match.group(1))
391+
message = match.group(2)
392+
index_errors[code] += 1
393+
if code not in index_messages:
394+
index_messages[code] = message
395+
except Exception as e:
396+
detailer.error(
397+
f"{dataset} unexpected 'INDEX' error {message}: {str(e)!r}"
398+
)
385399
click.echo("Operational states:")
386400
for name, states in operations.items():
387401
click.echo(f" {name} states:")
@@ -392,6 +406,8 @@ def report_states():
392406
click.echo(
393407
f" CODE {code:2d}: {count:>6,d} {index_messages[code]}"
394408
)
409+
if ops_anomalies:
410+
click.echo(f" {ops_anomalies} datasets are missing operational state")
395411

396412

397413
@click.command(name="pbench-report-generator")

lib/pbench/server/api/resources/query_apis/datasets/__init__.py

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,32 +9,13 @@
99
ApiParams,
1010
ApiSchema,
1111
ParamType,
12-
SchemaError,
1312
)
1413
from pbench.server.api.resources.query_apis import ElasticBase
1514
from pbench.server.database.models.datasets import Dataset, Metadata
1615
from pbench.server.database.models.index_map import IndexMap
1716
from pbench.server.database.models.templates import Template
1817

1918

20-
class MissingDatasetNameParameter(SchemaError):
21-
"""The subclass schema is missing the required "name" parameter required
22-
to locate a Dataset.
23-
24-
NOTE: This is a development error, not a client error, and will be raised
25-
when the API is initialized at server startup. Arguably, this could be an
26-
assert since it prevents launching the server.
27-
"""
28-
29-
def __init__(self, subclass_name: str, message: str):
30-
super().__init__()
31-
self.subclass_name = subclass_name
32-
self.message = message
33-
34-
def __str__(self) -> str:
35-
return f"API {self.subclass_name} is {self.message}"
36-
37-
3819
class IndexMapBase(ElasticBase):
3920
"""A base class for query apis that depends on the index map.
4021
@@ -133,17 +114,17 @@ def get_index(
133114
"""
134115

135116
archive_only = Metadata.getvalue(dataset, Metadata.SERVER_ARCHIVE)
136-
if archive_only and ok_no_index:
137-
return ""
138-
139117
if archive_only:
118+
if ok_no_index:
119+
return ""
140120
raise APIAbort(HTTPStatus.CONFLICT, "Dataset indexing was disabled")
141121

142122
index_keys = list(IndexMap.indices(dataset, root_index_name))
143123

144124
if not index_keys:
145125
raise APIAbort(
146-
HTTPStatus.NOT_FOUND, f"Dataset has no {root_index_name!r} data"
126+
HTTPStatus.NOT_FOUND,
127+
f"Dataset has no {root_index_name if root_index_name else 'indexed'!r} data",
147128
)
148129

149130
indices = ",".join(index_keys)

lib/pbench/server/api/resources/query_apis/datasets/datasets.py

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from collections import defaultdict
12
from http import HTTPStatus
23

34
from flask import current_app, jsonify, Response
@@ -16,8 +17,9 @@
1617
ParamType,
1718
Schema,
1819
)
19-
from pbench.server.api.resources.query_apis import ApiContext, PostprocessError
20+
from pbench.server.api.resources.query_apis import ApiContext
2021
from pbench.server.api.resources.query_apis.datasets import IndexMapBase
22+
import pbench.server.auth.auth as Auth
2123
from pbench.server.cache_manager import CacheManager
2224
from pbench.server.database.models.audit import AuditReason, AuditStatus, AuditType
2325
from pbench.server.database.models.datasets import (
@@ -120,14 +122,13 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
120122

121123
dataset = context["dataset"]
122124
action = context["attributes"].action
123-
get_action = action == "get"
124125
context["action"] = action
125126
audit_attributes = {}
126127
access = None
127128
owner = None
128129
elastic_options = {"ignore_unavailable": "true"}
129130

130-
if not get_action:
131+
if action != "get":
131132
elastic_options["refresh"] = "true"
132133
operations = Operation.by_state(dataset, OperationState.WORKING)
133134
if context["attributes"].require_stable and operations:
@@ -147,7 +148,9 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
147148
OperationState.WORKING.name,
148149
e,
149150
)
150-
raise APIAbort(HTTPStatus.CONFLICT, "Unable to set operational state")
151+
raise APIInternalError(
152+
f"can't set {OperationState.WORKING.name} on {dataset.name}: {str(e)!r} "
153+
)
151154
context["sync"] = sync
152155
context["auditing"]["attributes"] = audit_attributes
153156
if action == "update":
@@ -156,6 +159,14 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
156159
if not access and not owner:
157160
raise MissingParameters(["access", "owner"])
158161

162+
if owner and owner != dataset.owner_id:
163+
auth_user = Auth.token_auth.current_user()
164+
if not auth_user.is_admin():
165+
raise APIAbort(
166+
HTTPStatus.FORBIDDEN,
167+
"ADMIN role is required to change dataset ownership",
168+
)
169+
159170
if access:
160171
audit_attributes["access"] = access
161172
context["access"] = access
@@ -167,12 +178,16 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
167178
else:
168179
owner = dataset.owner_id
169180

170-
# Get the Elasticsearch indices occupied by the dataset. If there are
171-
# none, return with an empty query to disable the Elasticsearch call.
181+
# Get the Elasticsearch indices occupied by the dataset.
182+
#
183+
# We postprocess UPDATE and DELETE even without any indexed documents
184+
# in order to update the Dataset object, so tell get_index not to fail
185+
# in that case, and return an empty query to disable the Elasticsearch
186+
# call.
172187
#
173188
# It's important that all context fields required for postprocessing
174189
# of unindexed datasets have been set before this!
175-
indices = self.get_index(dataset, ok_no_index=(not get_action))
190+
indices = self.get_index(dataset, ok_no_index=(action != "get"))
176191
context["indices"] = indices
177192
if not indices:
178193
return {}
@@ -195,11 +210,12 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT:
195210
}
196211

197212
if action == "update":
198-
painless = "ctx._source.authorization=params.authorization"
199-
script_params = {"authorization": {"access": access, "owner": owner}}
200-
script = {"source": painless, "lang": "painless", "params": script_params}
201213
json["path"] = f"{indices}/_update_by_query"
202-
json["kwargs"]["json"]["script"] = script
214+
json["kwargs"]["json"]["script"] = {
215+
"source": "ctx._source.authorization=params.authorization",
216+
"lang": "painless",
217+
"params": {"authorization": {"access": access, "owner": owner}},
218+
}
203219
elif action == "get":
204220
json["path"] = f"{indices}/_search"
205221
elif action == "delete":
@@ -232,32 +248,31 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
232248
current_app.logger.info("POSTPROCESS {}: {}", dataset.name, es_json)
233249
failures = 0
234250
if action == "get":
251+
count = None
252+
hits = []
235253
try:
236254
count = es_json["hits"]["total"]["value"]
255+
hits = es_json["hits"]["hits"]
237256
if int(count) == 0:
238-
current_app.logger.info("No data returned by Elasticsearch")
239-
return jsonify([])
257+
raise APIInternalError(
258+
f"Elasticsearch returned no matches for {dataset.name}"
259+
)
240260
except KeyError as e:
241-
raise PostprocessError(
242-
HTTPStatus.BAD_REQUEST,
243-
f"Can't find Elasticsearch match data {e} in {es_json!r}",
261+
raise APIInternalError(
262+
f"Can't find Elasticsearch match data for {dataset.name} ({e}) in {es_json!r}",
244263
)
245264
except ValueError as e:
246-
raise PostprocessError(
247-
HTTPStatus.BAD_REQUEST,
248-
f"Elasticsearch hit count {count!r} value: {e}",
265+
raise APIInternalError(
266+
f"Elasticsearch bad hit count {count!r} for {dataset.name}: {e}",
249267
)
250-
results = []
251-
for hit in es_json["hits"]["hits"]:
252-
s = hit["_source"]
253-
s["id"] = hit["_id"]
254-
results.append(s)
255-
268+
results = defaultdict(int)
269+
for hit in hits:
270+
results[hit["_index"]] += 1
256271
return jsonify(results)
257272
else:
258273
if es_json:
259274
fields = ("deleted", "updated", "total", "version_conflicts")
260-
results = {f: es_json[f] if f in es_json else None for f in fields}
275+
results = {f: es_json.get(f, 0) for f in fields}
261276
failures = len(es_json["failures"]) if "failures" in es_json else 0
262277
results["failures"] = failures
263278
else:
@@ -282,7 +297,7 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
282297
dataset.update()
283298
except Exception as e:
284299
raise APIInternalError(
285-
f"unable to update dataset {dataset.name}: {str(e)!r}"
300+
f"Unable to update dataset {dataset.name}: {str(e)!r}"
286301
) from e
287302
elif action == "delete":
288303
try:
@@ -295,7 +310,7 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
295310
del context["sync"]
296311
except Exception as e:
297312
raise APIInternalError(
298-
f"unable to delete dataset {dataset.name}: {str(e)!r}"
313+
f"Unable to delete dataset {dataset.name}: {str(e)!r}"
299314
) from e
300315

301316
# The DELETE API removes the "sync" context on success to signal
@@ -312,7 +327,9 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
312327
auditing["attributes"] = {"message": str(e)}
313328
auditing["status"] = AuditStatus.WARNING
314329
auditing["reason"] = AuditReason.INTERNAL
315-
raise APIInternalError(f"Unexpected sync unlock error '{e}'") from e
330+
raise APIInternalError(
331+
f"Unexpected sync error {dataset.name} {str(e)!r}"
332+
) from e
316333

317334
# Return the summary document as the success response, or abort with an
318335
# internal error if we tried to operate on Elasticsearch documents but
@@ -323,9 +340,9 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
323340
auditing["reason"] = AuditReason.INTERNAL
324341
auditing["attributes"][
325342
"message"
326-
] = f"Unable to {context['attributes'].action} some indexed documents"
343+
] = f"Unable to {action} some indexed documents"
327344
raise APIInternalError(
328-
f"Failed to {context['attributes'].action} any of {results['total']} "
345+
f"Failed to {action} any of {results['total']} "
329346
f"Elasticsearch documents: {es_json}"
330347
)
331348
elif sync:

lib/pbench/server/api/resources/query_apis/datasets/datasets_detail.py

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,17 @@
66
from pbench.server.api.resources import (
77
APIAbort,
88
ApiAuthorizationType,
9+
APIInternalError,
910
ApiMethod,
1011
ApiParams,
1112
ApiSchema,
1213
Parameter,
1314
ParamType,
1415
Schema,
1516
)
16-
from pbench.server.api.resources.query_apis import ApiContext, PostprocessError
17+
from pbench.server.api.resources.query_apis import ApiContext
1718
from pbench.server.api.resources.query_apis.datasets import IndexMapBase
18-
from pbench.server.database.models.datasets import (
19-
Dataset,
20-
DatasetNotFound,
21-
Metadata,
22-
MetadataError,
23-
)
19+
from pbench.server.database.models.datasets import Metadata, MetadataError
2420

2521

2622
class DatasetsDetail(IndexMapBase):
@@ -122,18 +118,17 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
122118
}
123119
]
124120
"""
121+
122+
dataset = context["dataset"]
125123
hits = es_json["hits"]["hits"]
126124

127125
# NOTE: we're expecting just one. We're matching by just the
128-
# dataset name, which ought to be unique.
126+
# dataset resource ID, which ought to be unique.
129127
if len(hits) == 0:
130-
raise PostprocessError(
131-
HTTPStatus.BAD_REQUEST, "The specified dataset has gone missing"
132-
)
128+
raise APIInternalError(f"Dataset {dataset.name} run document is missing")
133129
elif len(hits) > 1:
134-
raise PostprocessError(
135-
HTTPStatus.BAD_REQUEST, "Too many hits for a unique query"
136-
)
130+
raise APIInternalError(f"Dataset {dataset.name} has multiple run documents")
131+
137132
src = hits[0]["_source"]
138133

139134
# We're merging the "run" and "@metadata" sub-documents into
@@ -147,12 +142,7 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response:
147142
}
148143

149144
try:
150-
dataset = Dataset.query(resource_id=(src["run"]["id"]))
151145
m = self._get_dataset_metadata(dataset, context["metadata"])
152-
except DatasetNotFound:
153-
raise APIAbort(
154-
HTTPStatus.BAD_REQUEST, f"Dataset {src['run']['id']} not found"
155-
)
156146
except MetadataError as e:
157147
raise APIAbort(HTTPStatus.BAD_REQUEST, str(e))
158148

0 commit comments

Comments
 (0)