Skip to content

Commit 47e2360

Browse files
jhfclaude
andcommitted
feat: Add --dataset flag to concurrent worker test
Supports both BRREG selection (~29K rows) and full downloads (~1M rows) datasets via `--dataset selection` (default) or `--dataset downloads`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7b5ce95 commit 47e2360

File tree

1 file changed

+60
-18
lines changed

1 file changed

+60
-18
lines changed

test/test_concurrent_worker.py

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,47 @@ def drop_all_test_databases():
238238
conn.close()
239239

240240

241-
def setup_test_data():
242-
"""Setup test data like test 401 (Norway BRREG selection ~29K rows)"""
243-
log_print(f"\n{BLUE}Setting up test data (like test 401)...{NC}")
241+
# ============================================================================
242+
# Dataset configurations
243+
# ============================================================================
244+
245+
DATASETS = {
246+
"selection": {
247+
"description": "BRREG selection (~29K rows, like test 401)",
248+
"definition_year": "2024",
249+
"lu_slug": "import_hovedenhet_concurrent",
250+
"es_slug": "import_underenhet_concurrent",
251+
"lu_csv": "samples/norway/legal_unit/enheter-selection.csv",
252+
"es_csv": "samples/norway/establishment/underenheter-selection.csv",
253+
},
254+
"downloads": {
255+
"description": "BRREG full downloads (~1M rows, like test 403)",
256+
"definition_year": "2025",
257+
"lu_slug": "import_hovedenhet_2025",
258+
"es_slug": "import_underenhet_2025",
259+
"lu_csv": "tmp/enheter.csv",
260+
"es_csv": "tmp/underenheter_filtered.csv",
261+
},
262+
}
263+
264+
265+
def setup_test_data(dataset="selection"):
266+
"""Setup test data from the specified dataset."""
267+
ds = DATASETS[dataset]
268+
year = ds["definition_year"]
269+
270+
log_print(f"\n{BLUE}Setting up test data: {ds['description']}...{NC}")
271+
272+
# Check that CSV files exist
273+
for label, path in [("LU", ds["lu_csv"]), ("ES", ds["es_csv"])]:
274+
full_path = WORKSPACE / path
275+
if not full_path.exists():
276+
log_print(f"{RED}ERROR: {label} CSV not found: {full_path}{NC}", "error")
277+
if dataset == "downloads":
278+
log_print(f" Download from BRREG and place in tmp/", "error")
279+
sys.exit(1)
280+
size_mb = full_path.stat().st_size / (1024 * 1024)
281+
log_print(f" {label} CSV: {path} ({size_mb:.1f} MB)")
244282

245283
# Run setup files via psql
246284
log_print(" Running test/setup.sql...")
@@ -249,25 +287,25 @@ def setup_test_data():
249287
log_print(" Running samples/norway/getting-started.sql...")
250288
run_psql_file("samples/norway/getting-started.sql")
251289

252-
log_print(" Running import definition for hovedenhet...")
253-
run_psql_file("samples/norway/brreg/create-import-definition-hovedenhet-2024.sql")
290+
log_print(f" Running import definition for hovedenhet ({year})...")
291+
run_psql_file(f"samples/norway/brreg/create-import-definition-hovedenhet-{year}.sql")
254292

255-
log_print(" Running import definition for underenhet...")
256-
run_psql_file("samples/norway/brreg/create-import-definition-underenhet-2024.sql")
293+
log_print(f" Running import definition for underenhet ({year})...")
294+
run_psql_file(f"samples/norway/brreg/create-import-definition-underenhet-{year}.sql")
257295

258296
# Create import jobs
259297
log_print(" Creating import jobs...")
260-
run_psql("""
298+
run_psql(f"""
261299
BEGIN;
262300
263301
CALL test.set_user_from_email('test.admin@statbus.org');
264302
265303
-- Create LU import job (uploaded FIRST = higher priority)
266304
WITH def_he AS (
267-
SELECT id FROM public.import_definition WHERE slug = 'brreg_hovedenhet_2024'
305+
SELECT id FROM public.import_definition WHERE slug = 'brreg_hovedenhet_{year}'
268306
)
269307
INSERT INTO public.import_job (definition_id, slug, default_valid_from, default_valid_to, description, user_id)
270-
SELECT def_he.id, 'import_hovedenhet_concurrent', '2025-01-01'::date, 'infinity'::date, 'Concurrent test LU',
308+
SELECT def_he.id, '{ds["lu_slug"]}', '2025-01-01'::date, 'infinity'::date, 'Concurrent test LU',
271309
(SELECT id FROM public.user WHERE email = 'test.admin@statbus.org')
272310
FROM def_he
273311
ON CONFLICT (slug) DO NOTHING;
@@ -276,21 +314,21 @@ def setup_test_data():
276314
""")
277315

278316
# Load LU data FIRST (priority by upload order)
279-
log_print(" Loading LU CSV data (enheter-selection.csv)...")
280-
run_psql("\\copy public.import_hovedenhet_concurrent_upload FROM 'samples/norway/legal_unit/enheter-selection.csv' WITH CSV HEADER")
317+
log_print(f" Loading LU CSV data ({ds['lu_csv']})...")
318+
run_psql(f"\\copy public.{ds['lu_slug']}_upload FROM '{ds['lu_csv']}' WITH CSV HEADER")
281319

282320
# Create ES import job SECOND
283-
run_psql("""
321+
run_psql(f"""
284322
BEGIN;
285323
286324
CALL test.set_user_from_email('test.admin@statbus.org');
287325
288326
-- Create ES import job (uploaded SECOND = lower priority)
289327
WITH def_ue AS (
290-
SELECT id FROM public.import_definition WHERE slug = 'brreg_underenhet_2024'
328+
SELECT id FROM public.import_definition WHERE slug = 'brreg_underenhet_{year}'
291329
)
292330
INSERT INTO public.import_job (definition_id, slug, default_valid_from, default_valid_to, description, user_id)
293-
SELECT def_ue.id, 'import_underenhet_concurrent', '2025-01-01'::date, 'infinity'::date, 'Concurrent test ES',
331+
SELECT def_ue.id, '{ds["es_slug"]}', '2025-01-01'::date, 'infinity'::date, 'Concurrent test ES',
294332
(SELECT id FROM public.user WHERE email = 'test.admin@statbus.org')
295333
FROM def_ue
296334
ON CONFLICT (slug) DO NOTHING;
@@ -299,8 +337,8 @@ def setup_test_data():
299337
""")
300338

301339
# Load ES data SECOND
302-
log_print(" Loading ES CSV data (underenheter-selection.csv)...")
303-
run_psql("\\copy public.import_underenhet_concurrent_upload FROM 'samples/norway/establishment/underenheter-selection.csv' WITH CSV HEADER")
340+
log_print(f" Loading ES CSV data ({ds['es_csv']})...")
341+
run_psql(f"\\copy public.{ds['es_slug']}_upload FROM '{ds['es_csv']}' WITH CSV HEADER")
304342

305343
# Show task state
306344
log_print(f"\n{BLUE}Test data loaded. Task states:{NC}")
@@ -532,6 +570,10 @@ def list_test_databases():
532570
help="Delete test database after run")
533571
parser.add_argument("--list", action="store_true",
534572
help="List existing test databases and exit")
573+
parser.add_argument("--dataset", choices=list(DATASETS.keys()), default="selection",
574+
help="Dataset to use: " + ", ".join(
575+
f"{k} ({v['description']})" for k, v in DATASETS.items()
576+
) + " (default: selection)")
535577
parser.add_argument("--cleanup-all", action="store_true",
536578
help="Drop ALL test_concurrent_* databases and exit")
537579
args = parser.parse_args()
@@ -549,7 +591,7 @@ def list_test_databases():
549591
if not args.skip_setup:
550592
create_isolated_db()
551593
atexit.register(lambda: drop_isolated_db(force=args.cleanup))
552-
setup_test_data()
594+
setup_test_data(args.dataset)
553595
else:
554596
env = pg_env()
555597
TEST_DB = env.get("PGDATABASE", "statbus")

0 commit comments

Comments
 (0)