Skip to content

Commit 9c898cb

Browse files
authored
Add upload statistics to report (distributed-system-analysis#3615)
* Add upload statistics to report Present dataset statistics in various buckets (by create or upload date), including "this year", "this month", "this week", "today" along with per year/month/day-of-month/hour just for fun.
1 parent 4a35b7e commit 9c898cb

File tree

2 files changed

+193
-46
lines changed

2 files changed

+193
-46
lines changed

lib/pbench/cli/server/reindex.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,10 @@ def worker(options: dict[str, Any]):
211211
# Defer index-map deletion outside of the SQL generator loop to avoid
212212
# breaking the SQLAlchemy cursor -- and we don't want to enable indexing
213213
# until after we've removed the old index map.
214+
if to_delete:
215+
click.echo(f"Deleting indexed data for {len(to_delete):,d} datasets")
216+
if to_sync:
217+
click.echo(f"Enabling indexing for {len(to_sync):,d} datasets")
214218
for dataset in to_delete:
215219
IndexMap.delete(dataset)
216220
for dataset in to_sync:
@@ -241,8 +245,9 @@ def worker(options: dict[str, Any]):
241245
@click.option("--id", type=str, multiple=True, help="Select dataset by resource ID")
242246
@click.option(
243247
"--indexing",
244-
type=click.Choice(["enable", "disable"], case_sensitive=False),
245-
help="Enable or disable the Pbench Server indexer for future uploads",
248+
type=click.Choice(["enable", "disable", "show"], case_sensitive=False),
249+
help="Enable or disable the Pbench Server indexer for future uploads, "
250+
"or show the current state",
246251
)
247252
@click.option(
248253
"--list", default=False, is_flag=True, help="Show dataset indexing status"
@@ -295,18 +300,27 @@ def reindex(context: object, **kwargs):
295300
kwargs["_es"] = (es_url, ca_bundle)
296301
kwargs["_logger"] = logger
297302

298-
# Check whether to enable or disable automatic indexing on upload.
303+
# Check whether to enable, disable, or display automatic indexing
304+
# on upload.
299305
indexing = kwargs.get("indexing")
300306
if indexing:
301-
state = indexing == "enable"
302-
detailer.message(f"{indexing} upload indexing")
303-
ServerSetting.set(key=OPTION_SERVER_INDEXING, value=state)
307+
if indexing == "show":
308+
setting = ServerSetting.get(
309+
key=OPTION_SERVER_INDEXING, use_default=True
310+
)
311+
state = setting.value
312+
else:
313+
state = indexing == "enable"
314+
detailer.message(f"{indexing} upload indexing")
315+
ServerSetting.set(key=OPTION_SERVER_INDEXING, value=state)
316+
abled = "enabled" if state else "disabled"
317+
click.echo(f"Indexing of new datasets is {abled}")
304318

305319
# Operate on individual datasets if selected
306320
if (SELECTORS | OPERATORS) & set(k for k, v in kwargs.items() if v):
307321
verifier.status("updating selected datasets")
308322
worker(kwargs)
309-
else:
323+
elif not indexing:
310324
click.echo("nothing to do", err=True)
311325
rv = 0
312326
except Exception as exc:

lib/pbench/cli/server/report.py

Lines changed: 172 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
import re
66
import shutil
77
import time
8-
from typing import Optional, Union
8+
from typing import Any, Iterator, Optional, Union
99

1010
import click
1111
import humanize
12-
from sqlalchemy import inspect, select, text
12+
from sqlalchemy import cast, inspect, Row, select, text
1313

1414
from pbench.cli import pass_cli_context
1515
from pbench.cli.server import config_setup, Detail, Verify, Watch
@@ -18,6 +18,7 @@
1818
from pbench.server import BadConfig
1919
from pbench.server.cache_manager import CacheManager
2020
from pbench.server.database.database import Database
21+
from pbench.server.database.models import TZDateTime
2122
from pbench.server.database.models.audit import Audit, AuditStatus
2223
from pbench.server.database.models.datasets import Dataset, Metadata
2324
from pbench.server.database.models.index_map import IndexMap
@@ -34,6 +35,26 @@
3435
# SQL "chunk size"
3536
SQL_CHUNK = 2000
3637

38+
# Translate datetime.datetime.month (1 - 12) into a name.
39+
MONTHS = (
40+
"00",
41+
"Jan",
42+
"Feb",
43+
"Mar",
44+
"Apr",
45+
"May",
46+
"Jun",
47+
"Jul",
48+
"Aug",
49+
"Sep",
50+
"Oct",
51+
"Nov",
52+
"Dec",
53+
)
54+
55+
# Translate datetime.datetime.weekday() (0 - 6) into a name
56+
DAYS_OF_WEEK = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")
57+
3758
detailer: Optional[Detail] = None
3859
watcher: Optional[Watch] = None
3960
verifier: Optional[Verify] = None
@@ -308,6 +329,126 @@ def report_cache(tree: CacheManager):
308329
)
309330

310331

332+
def columnize(
333+
items: dict[int, int],
334+
width: int = 80,
335+
ifmt: str = "4d",
336+
cfmt: str = ">8,d",
337+
lookup: Optional[list[str]] = None,
338+
):
339+
"""Combine multiple outputs across a line to minimize vertical space
340+
341+
Args:
342+
items: dictionary of items to report as "key: value"
343+
width: width of line to fill (default 80)
344+
ifmt: format string for key
345+
cfmt: format string for value
346+
lookup: list of string values to represent key
347+
"""
348+
line = ""
349+
for item, count in sorted(items.items()):
350+
try:
351+
k = lookup[item] if lookup else item
352+
except Exception as e:
353+
click.echo(f"{item} from {lookup}: {str(e)!r}", err=True)
354+
k = str(item)
355+
add = f" {k:{ifmt}}: {count:{cfmt}}"
356+
if len(line) + len(add) >= width:
357+
click.echo(line)
358+
line = add
359+
else:
360+
line += add
361+
if line:
362+
click.echo(line)
363+
364+
365+
def summarize_dates(rows: Iterator[Row], width: int = 80):
366+
by_year = defaultdict(int)
367+
by_month = defaultdict(int)
368+
by_day = defaultdict(int)
369+
by_weekday = defaultdict(int)
370+
by_hour = defaultdict(int)
371+
372+
day = datetime.datetime.now(datetime.timezone.utc).replace(
373+
hour=0, minute=0, second=0, microsecond=0
374+
)
375+
month = day.replace(day=1)
376+
year = month.replace(month=1)
377+
week = day - datetime.timedelta(days=7)
378+
379+
this_year = 0
380+
this_month = 0
381+
this_week = 0
382+
this_day = 0
383+
384+
for row in rows:
385+
date: datetime.datetime = row[0]
386+
if not isinstance(date, datetime.datetime):
387+
detailer.message(f"Got non-datetime row {row}")
388+
continue
389+
by_year[date.year] += 1
390+
by_month[date.month] += 1
391+
by_day[date.day] += 1
392+
by_weekday[date.weekday()] += 1
393+
by_hour[date.hour] += 1
394+
395+
if date >= year:
396+
this_year += 1
397+
if date >= month:
398+
this_month += 1
399+
if date >= week:
400+
this_week += 1
401+
if date >= day:
402+
this_day += 1
403+
404+
click.echo(f" {this_year:,d} this year ({year:%Y})")
405+
click.echo(f" {this_month:,d} this month ({month:%B %Y})")
406+
click.echo(f" {this_week:,d} this week ({week:%B %d} to {day:%B %d})")
407+
click.echo(f" {this_day:,d} today ({day:%d %B %Y})")
408+
409+
click.echo(" Total by year:")
410+
columnize(by_year, width)
411+
click.echo(" Total by month of year:")
412+
columnize(by_month, width, ifmt="s", lookup=MONTHS)
413+
click.echo(" Total by day of month:")
414+
columnize(by_day, width, ifmt="02d")
415+
click.echo(" Total by day of week:")
416+
columnize(by_weekday, width, ifmt="s", lookup=DAYS_OF_WEEK)
417+
click.echo(" Total by hour of day:")
418+
columnize(by_hour, width, ifmt="02d")
419+
420+
421+
def report_creation(options: dict[str, Any]):
422+
"""Report dataset statistics by creation date"""
423+
424+
watcher.update("analyzing upload patterns")
425+
426+
rows = (
427+
Database.db_session.query(
428+
cast(Metadata.value["pbench", "date"].as_string(), TZDateTime)
429+
)
430+
.filter(Metadata.key == "metalog")
431+
.execution_options(stream_results=True)
432+
.yield_per(SQL_CHUNK)
433+
)
434+
click.echo("Dataset statistics by creation date:")
435+
summarize_dates(rows, options.get("width"))
436+
437+
438+
def report_uploads(options: dict[str, Any]):
439+
"""Report dataset statistics by upload date"""
440+
441+
watcher.update("analyzing upload patterns")
442+
443+
rows = (
444+
Database.db_session.query(Dataset.uploaded)
445+
.execution_options(stream_results=True)
446+
.yield_per(SQL_CHUNK)
447+
)
448+
click.echo("Dataset statistics by upload date:")
449+
summarize_dates(rows, options.get("width"))
450+
451+
311452
def report_audit():
312453
"""Report audit log statistics."""
313454

@@ -521,78 +662,70 @@ def report_states():
521662
@click.option(
522663
"--states", "-S", default=False, is_flag=True, help="Display operational states"
523664
)
665+
@click.option(
666+
"--statistics",
667+
type=click.Choice(["creation", "upload"], case_sensitive=False),
668+
help="Show upload statistics",
669+
)
524670
@click.option(
525671
"--verify", "-v", default=False, is_flag=True, help="Display intermediate messages"
526672
)
673+
@click.option("--width", type=int, default=80, help="Set output width")
527674
@common_options
528-
def report(
529-
context: object,
530-
all: bool,
531-
archive: bool,
532-
audit: bool,
533-
backup: bool,
534-
cache: bool,
535-
detail: bool,
536-
errors: bool,
537-
progress: float,
538-
sql: bool,
539-
states: bool,
540-
verify: bool,
541-
):
675+
def report(context: object, **kwargs):
542676
"""
543677
Report statistics and problems in the SQL and on-disk representation of
544678
Pbench datasets.
545679
\f
546680
547681
Args:
548682
context: click context
549-
all: report all statistics
550-
archive: report archive statistics
551-
audit: report audit log statistics
552-
backup: report backup statistics
553-
cache: report cache statistics
554-
detail: provide additional per-file diagnostics
555-
errors: show individual file errors
556-
sql: report SQL statistics
557-
states: report operational states
558-
verify: Report internal status
683+
kwargs: click options
559684
"""
560685
logger = None
561686

562687
global detailer, verifier, watcher
563-
detailer = Detail(detail, errors)
564-
verifier = Verify(verify)
565-
watcher = Watch(progress)
688+
detailer = Detail(kwargs.get("detail"), kwargs.get("errors"))
689+
verifier = Verify(kwargs.get("verify"))
690+
watcher = Watch(kwargs.get("progress"))
691+
rv = 0
566692

567693
try:
568694
config = config_setup(context)
569695
logger = get_pbench_logger("pbench-report-generator", config)
570696
cache_m = CacheManager(config, logger)
571-
if any((all, archive, backup)):
697+
if any((kwargs.get("all"), kwargs.get("archive"), kwargs.get("backup"))):
572698
verifier.status("starting discovery")
573699
watcher.update("discovering archive tree")
574700
cache_m.full_discovery(search=False)
575701
watcher.update("processing reports")
576702
verifier.status("finished discovery")
577-
if all or archive:
703+
if kwargs.get("all") or kwargs.get("archive"):
578704
report_archive(cache_m)
579-
if all or backup:
705+
if kwargs.get("all") or kwargs.get("backup"):
580706
report_backup(cache_m)
581-
if all or cache:
707+
if kwargs.get("all") or kwargs.get("cache"):
582708
report_cache(cache_m)
583-
if all or audit:
709+
stats = kwargs.get("statistics")
710+
if stats:
711+
if stats == "creation":
712+
report_creation(kwargs)
713+
elif stats == "upload":
714+
report_uploads(kwargs)
715+
else:
716+
click.echo(f"Unexpected statistics option {stats}", err=True)
717+
rv = 1
718+
if kwargs.get("all") or kwargs.get("audit"):
584719
report_audit()
585-
if all or sql:
720+
if kwargs.get("all") or kwargs.get("sql"):
586721
report_sql()
587-
if all or states:
722+
if kwargs.get("all") or kwargs.get("states"):
588723
report_states()
589724
watcher.update("done")
590-
591-
rv = 0
592725
except Exception as exc:
593726
if logger:
594727
logger.exception("An error occurred discovering the file tree: {}", exc)
595-
if verify:
728+
if kwargs.get("verify"):
596729
raise
597730
click.secho(exc, err=True, bg="red")
598731
rv = 2 if isinstance(exc, BadConfig) else 1

0 commit comments

Comments
 (0)