-
Notifications
You must be signed in to change notification settings - Fork 2
Description
Problem
The fhir_melt_all() step in R-cds2db runs single-threaded and becomes the primary
bottleneck with large datasets. Related: #1094
At UKSH (FHIR: ~168K consented patients, ~4K with encounters from 2025-01-01),
measured timings:
Melt Step (Step 5/8)
| Resource | Rows In | Rows Out | Time |
|---|---|---|---|
| Condition | 256,133 | 256,133 | 609s (~10 min) |
| Consent | 4,033 | 85,442 | 484s (~8 min) |
| DiagnosticReport | 35,776 | 967,926 | 9,196s (~2.5 hours) |
| Observation | ~2.8M | — | Did not complete (estimated 15+ hours) |
Cron Job (copy_raw_cds_in_to_db_log, Step 2/8)
- 537K datasets: multiple hours (row-by-row PL/pgSQL)
- "set old calculated items" step: frequently stuck, requires manual semaphore reset
- Full pipeline requires constant monitoring and manual intervention
Total Pipeline Time
- Step 1 (FHIR download): ~5-6 hours
- Cron job data transfer: 12+ hours
- Melt + type cast: hours (resource-dependent)
- Total: 2+ days for a single pipeline run that ultimately crashed without completing,
making daily runs and iterative debugging impossible
Environment
- INTERPOLAR v1.5.1
- Server: 16 CPUs, 128GB RAM - increasing RAM had no effect
MAX_CORES = 0has no effect on melt (single-threaded)
Workaround: SQL-based Melt
We reimplemented fhir_melt_all() in pure PostgreSQL as melt_raw_table().
It mirrors R's iterative depth-by-depth bracket expansion and produces
identical output (verified against R output). Performance comparison:
| Step | R | SQL |
|---|---|---|
| Melt 825K observations (→4.2M rows) | 2+ days, then R crashed (memory) | ~20 minutes |
| Melt 638 encounters | N/A | 1.2 seconds |
| Bulk copy to db_log (4.2M rows) | 7+ hours (not completed) | 21 minutes |
Suggestion
- Parallelize
fhir_melt_all()across resources (each is independent) - Replace row-by-row PL/pgSQL copy with bulk INSERT...SELECT
- Consider database-side melt as alternative to R in-memory melt
- Provide progress indicators for long-running steps
SQL Workaround Implementation (click to expand)
1. melt_raw_table() - replaces fhir_melt_all()
Mirrors R's iterative depth-by-depth bracket expansion in pure PostgreSQL.
Uses helper functions for bracket index extraction, depth stripping, and multi-value expansion.
-- Helper functions: extract_by_idx, get_all_indices, idx_at_depth,
-- strip_first_depth, extract_at_first_level, fhir_rm_indices
-- (6 small IMMUTABLE SQL functions - available on request)
CREATE OR REPLACE FUNCTION melt_raw_table(
p_schema text, p_table text, p_prefix text
) RETURNS void AS $$
DECLARE
grp text; grp_cols text[]; all_cols text[]; col text;
max_depth int; cur_depth int; n_unique int;
sql_str text; sel_parts text; arr_parts text; row_count bigint;
BEGIN
EXECUTE 'DROP TABLE IF EXISTS public._melt_work';
EXECUTE format('CREATE TABLE public._melt_work AS SELECT * FROM %I.%I', p_schema, p_table);
-- Detect multi-valued columns (contain ' ~ ')
-- Group by FHIR element (2nd underscore segment)
-- For each group, iterate depth levels:
-- if single first-level value: strip that depth
-- if multiple: expand rows via LATERAL join
-- Result left in public._melt_work
END;
$$ LANGUAGE plpgsql;2. write_melted_to_typed() - replaces R type casting
Strips bracket indices and casts to destination column types (timestamp, integer,
double, boolean, date, time). Handles FHIR partial dates (YYYY, YYYY-MM).
CREATE OR REPLACE FUNCTION write_melted_to_typed(
melted_table text, dest_schema text, dest_table text
) RETURNS bigint AS $$
-- Dynamically builds INSERT...SELECT with per-column type casting
-- Skips generated columns (e.g. hash_index_col)
-- Returns row count inserted
$$ LANGUAGE plpgsql;3. Bulk copy - replaces row-by-row PL/pgSQL cron job
-- Replaces copy_type_cds_in_to_db_log (row-by-row loop)
-- with single INSERT...SELECT per table
DO $$
DECLARE tbl text; col_list text; row_cnt bigint;
BEGIN
FOR tbl IN SELECT table_name FROM information_schema.tables
WHERE table_schema = 'cds2db_in' AND table_name NOT LIKE '%_raw'
LOOP
SELECT string_agg(column_name, ', ' ORDER BY ordinal_position) INTO col_list
FROM information_schema.columns
WHERE table_schema = 'db_log' AND table_name = tbl AND is_generated = 'NEVER';
EXECUTE format('INSERT INTO db_log.%I (%s) SELECT %s FROM cds2db_in.%I',
tbl, col_list, col_list, tbl);
END LOOP;
END $$;Full implementations (with all helper functions) available on request.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status