|
| 1 | +"""Deduplicate charging equipment rows per group/version |
| 2 | +
|
| 3 | +Revision ID: c7e4d9a1b2f6 |
| 4 | +Revises: a2b3c4d5e6f7 |
| 5 | +Create Date: 2026-03-10 12:00:00.000000 |
| 6 | +""" |
| 7 | + |
| 8 | +import logging |
| 9 | + |
| 10 | +import sqlalchemy as sa |
| 11 | +from alembic import op |
| 12 | +from lcfs.db.dependencies import ( |
| 13 | + execute_sql_sections, |
| 14 | + find_and_read_sql_file, |
| 15 | + parse_sql_sections, |
| 16 | +) |
| 17 | + |
| 18 | +# revision identifiers, used by Alembic. |
| 19 | +revision = "c7e4d9a1b2f6" |
| 20 | +down_revision = "a2b3c4d5e6f7" |
| 21 | +branch_labels = None |
| 22 | +depends_on = None |
| 23 | + |
| 24 | +SECTIONS_TO_EXECUTE = [ |
| 25 | + "FSE Reporting Base View", |
| 26 | + "FSE Reporting Base Preferred View", |
| 27 | +] |
| 28 | + |
| 29 | + |
| 30 | +def upgrade() -> None: |
| 31 | + """ |
| 32 | + Remove duplicate charging_equipment rows that share the same group_uuid and |
| 33 | + version for versioned records above v1. The most recent physical row is |
| 34 | + retained based on update_date/create_date, with charging_equipment_id as a |
| 35 | + deterministic tie-breaker. |
| 36 | + """ |
| 37 | + conn = op.get_bind() |
| 38 | + logger = logging.getLogger("alembic.runtime.migration") |
| 39 | + |
| 40 | + # Snapshot tables are temporary rollback aids and should be dropped manually |
| 41 | + # after 30 days once the migration has been validated in all environments. |
| 42 | + conn.execute( |
| 43 | + sa.text("DROP TABLE IF EXISTS charging_equipment_snapshot_3d7b65a9d2ef;") |
| 44 | + ) |
| 45 | + conn.execute( |
| 46 | + sa.text( |
| 47 | + """ |
| 48 | + CREATE TABLE IF NOT EXISTS charging_equipment_snapshot_c7e4d9a1b2f6 |
| 49 | + (LIKE charging_equipment INCLUDING ALL); |
| 50 | + """ |
| 51 | + ) |
| 52 | + ) |
| 53 | + conn.execute( |
| 54 | + sa.text( |
| 55 | + """ |
| 56 | + COMMENT ON TABLE charging_equipment_snapshot_c7e4d9a1b2f6 IS |
| 57 | + 'Temporary rollback snapshot for migration c7e4d9a1b2f6. Drop after 30 days.'; |
| 58 | + """ |
| 59 | + ) |
| 60 | + ) |
| 61 | + conn.execute( |
| 62 | + sa.text("TRUNCATE TABLE charging_equipment_snapshot_c7e4d9a1b2f6;") |
| 63 | + ) |
| 64 | + conn.execute( |
| 65 | + sa.text( |
| 66 | + """ |
| 67 | + INSERT INTO charging_equipment_snapshot_c7e4d9a1b2f6 |
| 68 | + SELECT * FROM charging_equipment; |
| 69 | + """ |
| 70 | + ) |
| 71 | + ) |
| 72 | + |
| 73 | + dedup_cte = """ |
| 74 | + WITH ranked_duplicates AS ( |
| 75 | + SELECT |
| 76 | + charging_equipment_id, |
| 77 | + group_uuid, |
| 78 | + version, |
| 79 | + ROW_NUMBER() OVER ( |
| 80 | + PARTITION BY group_uuid, version |
| 81 | + ORDER BY |
| 82 | + COALESCE(update_date, create_date) DESC NULLS LAST, |
| 83 | + charging_equipment_id DESC |
| 84 | + ) AS retention_rank |
| 85 | + FROM charging_equipment |
| 86 | + WHERE group_uuid IS NOT NULL |
| 87 | + AND version > 1 |
| 88 | + ), |
| 89 | + duplicates AS ( |
| 90 | + SELECT charging_equipment_id, group_uuid, version |
| 91 | + FROM ranked_duplicates |
| 92 | + WHERE retention_rank > 1 |
| 93 | + ) |
| 94 | + """ |
| 95 | + |
| 96 | + log_rows = conn.execute( |
| 97 | + sa.text( |
| 98 | + dedup_cte |
| 99 | + + """ |
| 100 | + SELECT |
| 101 | + group_uuid, |
| 102 | + version, |
| 103 | + COUNT(*) AS deleted_row_count |
| 104 | + FROM duplicates |
| 105 | + GROUP BY group_uuid, version |
| 106 | + ORDER BY group_uuid, version; |
| 107 | + """ |
| 108 | + ) |
| 109 | + ).fetchall() |
| 110 | + |
| 111 | + if log_rows: |
| 112 | + for row in log_rows: |
| 113 | + logger.info( |
| 114 | + "Deleting %s duplicate charging_equipment rows for group_uuid=%s version=%s", |
| 115 | + row.deleted_row_count, |
| 116 | + row.group_uuid, |
| 117 | + row.version, |
| 118 | + ) |
| 119 | + else: |
| 120 | + logger.info( |
| 121 | + "No duplicate charging_equipment rows detected for identical group_uuid/version pairs." |
| 122 | + ) |
| 123 | + |
| 124 | + deactivated_rows = conn.execute( |
| 125 | + sa.text( |
| 126 | + """ |
| 127 | + WITH ranked_report_equipment AS ( |
| 128 | + SELECT |
| 129 | + crce.charging_equipment_compliance_id, |
| 130 | + crce.compliance_report_id, |
| 131 | + ce.group_uuid, |
| 132 | + ce.version, |
| 133 | + ROW_NUMBER() OVER ( |
| 134 | + PARTITION BY crce.compliance_report_id, ce.group_uuid |
| 135 | + ORDER BY |
| 136 | + ce.version DESC, |
| 137 | + crce.charging_equipment_version DESC, |
| 138 | + crce.charging_equipment_compliance_id DESC |
| 139 | + ) AS version_rank |
| 140 | + FROM compliance_report_charging_equipment crce |
| 141 | + JOIN charging_equipment ce |
| 142 | + ON ce.charging_equipment_id = crce.charging_equipment_id |
| 143 | + WHERE ce.group_uuid IS NOT NULL |
| 144 | + ) |
| 145 | + UPDATE compliance_report_charging_equipment crce |
| 146 | + SET is_active = FALSE |
| 147 | + FROM ranked_report_equipment rre |
| 148 | + WHERE crce.charging_equipment_compliance_id = rre.charging_equipment_compliance_id |
| 149 | + AND rre.version_rank > 1 |
| 150 | + AND crce.is_active IS DISTINCT FROM FALSE |
| 151 | + RETURNING crce.compliance_report_id, rre.group_uuid, rre.version; |
| 152 | + """ |
| 153 | + ) |
| 154 | + ).fetchall() |
| 155 | + |
| 156 | + if deactivated_rows: |
| 157 | + for row in deactivated_rows: |
| 158 | + logger.info( |
| 159 | + "Set is_active=false for older compliance_report_charging_equipment row on compliance_report_id=%s group_uuid=%s version=%s", |
| 160 | + row.compliance_report_id, |
| 161 | + row.group_uuid, |
| 162 | + row.version, |
| 163 | + ) |
| 164 | + |
| 165 | + conn.execute( |
| 166 | + sa.text( |
| 167 | + dedup_cte |
| 168 | + + """ |
| 169 | + DELETE FROM compliance_report_charging_equipment crce |
| 170 | + USING duplicates d |
| 171 | + WHERE crce.charging_equipment_id = d.charging_equipment_id; |
| 172 | + """ |
| 173 | + ) |
| 174 | + ) |
| 175 | + |
| 176 | + conn.execute( |
| 177 | + sa.text( |
| 178 | + dedup_cte |
| 179 | + + """ |
| 180 | + DELETE FROM charging_equipment ce |
| 181 | + USING duplicates d |
| 182 | + WHERE ce.charging_equipment_id = d.charging_equipment_id; |
| 183 | + """ |
| 184 | + ) |
| 185 | + ) |
| 186 | + |
| 187 | + op.execute( |
| 188 | + """ |
| 189 | + CREATE INDEX IF NOT EXISTS idx_charging_equipment_group_uuid_version_id |
| 190 | + ON charging_equipment (group_uuid, version DESC, charging_equipment_id DESC); |
| 191 | + """ |
| 192 | + ) |
| 193 | + op.execute( |
| 194 | + """ |
| 195 | + CREATE INDEX IF NOT EXISTS idx_charging_site_group_uuid_org_version |
| 196 | + ON charging_site (group_uuid, organization_id, version DESC); |
| 197 | + """ |
| 198 | + ) |
| 199 | + op.execute( |
| 200 | + """ |
| 201 | + CREATE INDEX IF NOT EXISTS idx_crce_org_report_active_equipment_version |
| 202 | + ON compliance_report_charging_equipment ( |
| 203 | + organization_id, |
| 204 | + compliance_report_id, |
| 205 | + is_active, |
| 206 | + charging_equipment_id, |
| 207 | + charging_equipment_version |
| 208 | + ); |
| 209 | + """ |
| 210 | + ) |
| 211 | + op.execute( |
| 212 | + """ |
| 213 | + CREATE INDEX IF NOT EXISTS idx_cpo_level_user_use_lookup |
| 214 | + ON charging_power_output ( |
| 215 | + level_of_equipment_id, |
| 216 | + end_user_type_id, |
| 217 | + end_use_type_id |
| 218 | + ); |
| 219 | + """ |
| 220 | + ) |
| 221 | + |
| 222 | + content = find_and_read_sql_file(sqlFile="metabase.sql") |
| 223 | + sections = parse_sql_sections(content) |
| 224 | + execute_sql_sections(sections, SECTIONS_TO_EXECUTE) |
| 225 | + |
| 226 | + |
| 227 | +def downgrade() -> None: |
| 228 | + """ |
| 229 | + Restore rows removed by this migration from the snapshot table while leaving |
| 230 | + any newer rows intact. |
| 231 | + """ |
| 232 | + op.execute( |
| 233 | + """ |
| 234 | + DO $$ |
| 235 | + BEGIN |
| 236 | + IF EXISTS ( |
| 237 | + SELECT 1 |
| 238 | + FROM information_schema.tables |
| 239 | + WHERE table_schema = 'public' |
| 240 | + AND table_name = 'charging_equipment_snapshot_c7e4d9a1b2f6' |
| 241 | + ) THEN |
| 242 | + INSERT INTO charging_equipment |
| 243 | + SELECT * FROM charging_equipment_snapshot_c7e4d9a1b2f6 |
| 244 | + ON CONFLICT (charging_equipment_id) DO NOTHING; |
| 245 | + END IF; |
| 246 | + END; |
| 247 | + $$; |
| 248 | + """ |
| 249 | + ) |
| 250 | + op.execute("DROP VIEW IF EXISTS v_fse_reporting_base_pref;") |
| 251 | + op.execute("DROP VIEW IF EXISTS v_fse_reporting_base;") |
| 252 | + op.execute("DROP INDEX IF EXISTS idx_cpo_level_user_use_lookup;") |
| 253 | + op.execute("DROP INDEX IF EXISTS idx_crce_org_report_active_equipment_version;") |
| 254 | + op.execute("DROP INDEX IF EXISTS idx_charging_site_group_uuid_org_version;") |
| 255 | + op.execute("DROP INDEX IF EXISTS idx_charging_equipment_group_uuid_version_id;") |
0 commit comments