Skip to content

Commit 93cf0ef

Browse files
feat: add migrate_external_pointers_v2 helper
Added helper function for migrating external storage pointers when copying production data to _v2 schemas during git branch-based migration. Function: migrate_external_pointers_v2() - Converts BINARY(16) UUID → JSON metadata - Points to existing files (no file copying required) - Enables access to external data in _v2 test schemas - Supports deferred external storage migration approach Use case: When using git branch workflow (main: 0.14.6, migrate-to-v2: 2.0), this function allows copied production data to access external storage without moving the actual blob files until production cutover. Example: migrate_external_pointers_v2( schema='my_pipeline_v2', table='recording', attribute='signal', source_store='external-raw', dest_store='raw', copy_files=False # Keep files in place ) Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 119eb9b commit 93cf0ef

File tree

1 file changed

+225
-0
lines changed

1 file changed

+225
-0
lines changed

src/datajoint/migrate.py

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2081,3 +2081,228 @@ def verify_schema_v20(
20812081
result["compatible"] = False
20822082

20832083
return result
2084+
2085+
2086+
def migrate_external_pointers_v2(
2087+
schema: str,
2088+
table: str,
2089+
attribute: str,
2090+
source_store: str,
2091+
dest_store: str,
2092+
copy_files: bool = False,
2093+
connection=None,
2094+
) -> dict:
2095+
"""
2096+
Migrate external storage pointers from 0.14.6 to 2.0 format.
2097+
2098+
Converts BINARY(16) UUID references to JSON metadata format.
2099+
Optionally copies blob files to new storage location.
2100+
2101+
This is useful when copying production data to _v2 schemas and you need
2102+
to access external storage attributes but don't want to move the files yet.
2103+
2104+
Parameters
2105+
----------
2106+
schema : str
2107+
Schema name (e.g., 'my_pipeline_v2')
2108+
table : str
2109+
Table name
2110+
attribute : str
2111+
External attribute name (e.g., 'signal')
2112+
source_store : str
2113+
0.14.6 store name (e.g., 'external-raw')
2114+
dest_store : str
2115+
2.0 store name (e.g., 'raw')
2116+
copy_files : bool, optional
2117+
If True, copy blob files to new location.
2118+
If False (default), JSON points to existing files.
2119+
connection : Connection, optional
2120+
Database connection. If None, uses default connection.
2121+
2122+
Returns
2123+
-------
2124+
dict
2125+
- rows_migrated: int - number of pointers migrated
2126+
- files_copied: int - number of files copied (if copy_files=True)
2127+
- errors: list - any errors encountered
2128+
2129+
Examples
2130+
--------
2131+
>>> # Migrate pointers without moving files
2132+
>>> result = migrate_external_pointers_v2(
2133+
... schema='my_pipeline_v2',
2134+
... table='recording',
2135+
... attribute='signal',
2136+
... source_store='external-raw',
2137+
... dest_store='raw',
2138+
... copy_files=False
2139+
... )
2140+
>>> print(f"Migrated {result['rows_migrated']} pointers")
2141+
2142+
Notes
2143+
-----
2144+
This function:
2145+
1. Reads BINARY(16) UUID from table column
2146+
2. Looks up file in ~external_{source_store} table
2147+
3. Creates JSON metadata with file path
2148+
4. Optionally copies file to new store location
2149+
5. Updates column with JSON metadata
2150+
2151+
The JSON format is:
2152+
{
2153+
"path": "schema/table/key_hash/file.ext",
2154+
"size": 12345,
2155+
"hash": null,
2156+
"ext": ".dat",
2157+
"is_dir": false,
2158+
"timestamp": "2025-01-14T10:30:00+00:00"
2159+
}
2160+
"""
2161+
import json
2162+
from datetime import datetime, timezone
2163+
from . import conn as get_conn
2164+
from .settings import get_store_spec
2165+
2166+
if connection is None:
2167+
connection = get_conn()
2168+
2169+
logger.info(
2170+
f"Migrating external pointers: {schema}.{table}.{attribute} "
2171+
f"({source_store}{dest_store})"
2172+
)
2173+
2174+
# Get source store specification (0.14.6)
2175+
# Note: This assumes old external table exists
2176+
external_table = f"~external_{source_store}"
2177+
2178+
# Check if external tracking table exists
2179+
check_query = """
2180+
SELECT COUNT(*) FROM information_schema.TABLES
2181+
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
2182+
"""
2183+
exists = connection.query(check_query, args=(schema, external_table)).fetchone()[0]
2184+
2185+
if not exists:
2186+
raise DataJointError(
2187+
f"External tracking table {schema}.{external_table} not found. "
2188+
f"Cannot migrate external pointers from 0.14.6 format."
2189+
)
2190+
2191+
# Get dest store spec for path construction
2192+
dest_spec = get_store_spec(dest_store)
2193+
2194+
result = {
2195+
"rows_migrated": 0,
2196+
"files_copied": 0,
2197+
"errors": [],
2198+
}
2199+
2200+
# Query rows with external attributes
2201+
query = f"""
2202+
SELECT * FROM `{schema}`.`{table}`
2203+
WHERE `{attribute}` IS NOT NULL
2204+
"""
2205+
2206+
rows = connection.query(query).fetchall()
2207+
2208+
# Get column info to identify UUID column
2209+
col_query = """
2210+
SELECT ORDINAL_POSITION, COLUMN_NAME
2211+
FROM information_schema.COLUMNS
2212+
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
2213+
ORDER BY ORDINAL_POSITION
2214+
"""
2215+
columns = connection.query(col_query, args=(schema, table)).fetchall()
2216+
col_names = [col[1] for col in columns]
2217+
2218+
# Find attribute column index
2219+
try:
2220+
attr_idx = col_names.index(attribute)
2221+
except ValueError:
2222+
raise DataJointError(f"Attribute {attribute} not found in {schema}.{table}")
2223+
2224+
for row in rows:
2225+
uuid_bytes = row[attr_idx]
2226+
2227+
if uuid_bytes is None:
2228+
continue
2229+
2230+
# Look up file info in external tracking table
2231+
lookup_query = f"""
2232+
SELECT hash, size, timestamp, filepath
2233+
FROM `{schema}`.`{external_table}`
2234+
WHERE hash = %s
2235+
"""
2236+
2237+
file_info = connection.query(lookup_query, args=(uuid_bytes,)).fetchone()
2238+
2239+
if file_info is None:
2240+
result["errors"].append(
2241+
f"External file not found for UUID: {uuid_bytes.hex()}"
2242+
)
2243+
continue
2244+
2245+
hash_hex, size, timestamp, filepath = file_info
2246+
2247+
# Build JSON metadata
2248+
# Extract extension from filepath
2249+
import os
2250+
2251+
ext = os.path.splitext(filepath)[1] if filepath else ""
2252+
2253+
metadata = {
2254+
"path": filepath,
2255+
"size": size,
2256+
"hash": hash_hex.hex() if hash_hex else None,
2257+
"ext": ext,
2258+
"is_dir": False,
2259+
"timestamp": timestamp.isoformat() if timestamp else datetime.now(timezone.utc).isoformat(),
2260+
}
2261+
2262+
# Update row with JSON metadata
2263+
# Build WHERE clause from primary keys
2264+
pk_columns = []
2265+
pk_values = []
2266+
2267+
# Get primary key info
2268+
pk_query = """
2269+
SELECT COLUMN_NAME
2270+
FROM information_schema.KEY_COLUMN_USAGE
2271+
WHERE TABLE_SCHEMA = %s
2272+
AND TABLE_NAME = %s
2273+
AND CONSTRAINT_NAME = 'PRIMARY'
2274+
ORDER BY ORDINAL_POSITION
2275+
"""
2276+
pk_cols = connection.query(pk_query, args=(schema, table)).fetchall()
2277+
2278+
for pk_col in pk_cols:
2279+
pk_name = pk_col[0]
2280+
pk_idx = col_names.index(pk_name)
2281+
pk_columns.append(pk_name)
2282+
pk_values.append(row[pk_idx])
2283+
2284+
# Build UPDATE statement
2285+
where_parts = [f"`{col}` = %s" for col in pk_columns]
2286+
where_clause = " AND ".join(where_parts)
2287+
2288+
update_query = f"""
2289+
UPDATE `{schema}`.`{table}`
2290+
SET `{attribute}` = %s
2291+
WHERE {where_clause}
2292+
"""
2293+
2294+
connection.query(update_query, args=(json.dumps(metadata), *pk_values))
2295+
2296+
result["rows_migrated"] += 1
2297+
2298+
# Copy file if requested
2299+
if copy_files:
2300+
# TODO: Implement file copying using fsspec
2301+
# This requires knowing source and dest store locations
2302+
logger.warning("File copying not yet implemented in migrate_external_pointers_v2")
2303+
2304+
logger.info(
2305+
f"Migrated {result['rows_migrated']} external pointers for {schema}.{table}.{attribute}"
2306+
)
2307+
2308+
return result

0 commit comments

Comments
 (0)