1+ """Deduplicate charging equipment per report group/serial combination
2+
3+ Revision ID: 3d7b65a9d2ef
4+ Revises: a1b2c3d4e5f1
5+ Create Date: 2026-02-01 09:00:00.000000
6+
7+ """
8+
9+ from alembic import op
10+ import sqlalchemy as sa
11+ import logging
12+
13+ # revision identifiers, used by Alembic.
14+ revision = "3d7b65a9d2ef"
15+ down_revision = "a1b2c3d4e5f1"
16+ branch_labels = None
17+ depends_on = None
18+
19+
20+ def upgrade () -> None :
21+ """
22+ Remove duplicate charging equipment rows that share the same serial number
23+ within compliance report groups that have multiple report versions.
24+ Priority for retention is Validated > Submitted > Draft to ensure that the
25+ most mature equipment record survives per serial number.
26+ """
27+ conn = op .get_bind ()
28+ logger = logging .getLogger ("alembic.runtime.migration" )
29+
30+ conn .execute (
31+ sa .text (
32+ """
33+ CREATE TABLE IF NOT EXISTS charging_equipment_snapshot_3d7b65a9d2ef
34+ (LIKE charging_equipment INCLUDING ALL);
35+ """
36+ )
37+ )
38+ conn .execute (sa .text ("TRUNCATE TABLE charging_equipment_snapshot_3d7b65a9d2ef;" ))
39+ conn .execute (
40+ sa .text (
41+ """
42+ INSERT INTO charging_equipment_snapshot_3d7b65a9d2ef
43+ SELECT * FROM charging_equipment;
44+ """
45+ )
46+ )
47+
48+ dedup_cte = """
49+ WITH multi_report_groups AS (
50+ SELECT compliance_report_group_uuid
51+ FROM compliance_report
52+ GROUP BY compliance_report_group_uuid
53+ HAVING COUNT(DISTINCT compliance_report_id) > 1
54+ ),
55+ equipment_candidates AS (
56+ SELECT DISTINCT
57+ mrg.compliance_report_group_uuid,
58+ ce.charging_equipment_id,
59+ UPPER(TRIM(ce.serial_number)) AS normalized_serial,
60+ ces.status AS equipment_status
61+ FROM multi_report_groups mrg
62+ JOIN compliance_report cr
63+ ON cr.compliance_report_group_uuid = mrg.compliance_report_group_uuid
64+ JOIN compliance_report_charging_equipment crce
65+ ON crce.compliance_report_id = cr.compliance_report_id
66+ JOIN charging_equipment ce
67+ ON ce.charging_equipment_id = crce.charging_equipment_id
68+ JOIN charging_equipment_status ces
69+ ON ces.charging_equipment_status_id = ce.status_id
70+ WHERE ce.action_type != 'DELETE'
71+ ),
72+ ranked_equipment AS (
73+ SELECT
74+ compliance_report_group_uuid,
75+ charging_equipment_id,
76+ normalized_serial,
77+ equipment_status,
78+ ROW_NUMBER() OVER (
79+ PARTITION BY compliance_report_group_uuid, normalized_serial
80+ ORDER BY
81+ CASE equipment_status
82+ WHEN 'Validated' THEN 1
83+ WHEN 'Submitted' THEN 2
84+ WHEN 'Draft' THEN 3
85+ ELSE 4
86+ END,
87+ charging_equipment_id
88+ ) AS retention_rank
89+ FROM equipment_candidates
90+ WHERE normalized_serial IS NOT NULL
91+ AND normalized_serial <> ''
92+ ),
93+ duplicates AS (
94+ SELECT
95+ compliance_report_group_uuid,
96+ charging_equipment_id
97+ FROM ranked_equipment
98+ WHERE retention_rank > 1
99+ )
100+ """
101+
102+ log_rows = conn .execute (
103+ sa .text (
104+ dedup_cte
105+ + """
106+ SELECT
107+ crce.compliance_report_id,
108+ COUNT(DISTINCT duplicates.charging_equipment_id) AS deleted_equipment_count
109+ FROM duplicates
110+ JOIN compliance_report_charging_equipment crce
111+ ON crce.charging_equipment_id = duplicates.charging_equipment_id
112+ GROUP BY crce.compliance_report_id
113+ ORDER BY crce.compliance_report_id
114+ """
115+ )
116+ ).fetchall ()
117+
118+ if log_rows :
119+ for row in log_rows :
120+ logger .info (
121+ "Deleting %s charging equipment rows linked to compliance_report_id=%s" ,
122+ row .deleted_equipment_count ,
123+ row .compliance_report_id ,
124+ )
125+ else :
126+ logger .info ("No duplicate charging equipment rows detected for deletion." )
127+
128+ conn .execute (
129+ sa .text (
130+ dedup_cte
131+ + """
132+ DELETE FROM charging_equipment ce
133+ USING duplicates d
134+ WHERE ce.charging_equipment_id = d.charging_equipment_id;
135+ """
136+ )
137+ )
138+
139+
140+ def downgrade () -> None :
141+ """
142+ Restore rows removed by this migration using the snapshot table while
143+ preserving any new or updated records created after the upgrade.
144+ """
145+ op .execute (
146+ """
147+ DO $$
148+ BEGIN
149+ IF EXISTS (
150+ SELECT 1
151+ FROM information_schema.tables
152+ WHERE table_schema = 'public'
153+ AND table_name = 'charging_equipment_snapshot_3d7b65a9d2ef'
154+ ) THEN
155+ INSERT INTO charging_equipment
156+ SELECT * FROM charging_equipment_snapshot_3d7b65a9d2ef
157+ ON CONFLICT (charging_equipment_id) DO NOTHING;
158+ END IF;
159+ END;
160+ $$;
161+ """
162+ )
0 commit comments