|
1 | | -import logging |
2 | 1 | from aind_data_access_api.rds_tables import RDSCredentials, Client |
3 | 2 | from aind_data_access_api.document_db import MetadataDbClient |
4 | 3 | import pandas as pd |
5 | 4 | import panel as pn |
6 | | - |
7 | | -# Configure logging |
8 | | -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
9 | | -logger = logging.getLogger(__name__) |
| 5 | +from aind_metadata_upgrader.upgrade import Upgrade |
10 | 6 |
|
11 | 7 | # Redshift settings |
12 | 8 | REDSHIFT_SECRETS = "/aind/prod/redshift/credentials/readwrite" |
13 | 9 | RDS_TABLE_NAME = "metadata_upgrade_status_prod" |
14 | 10 |
|
| 11 | +pn.extension('tabulator') |
| 12 | + |
15 | 13 |
|
16 | 14 | extra_columns = { |
17 | 15 | "_id": 1, |
|
23 | 21 |
|
24 | 22 | @pn.cache() |
25 | 23 | def get_extra_col_df(): |
| 24 | + print("Retrieving extra columns from DocDB...") |
26 | 25 | client = MetadataDbClient( |
27 | 26 | host="api.allenneuraldynamics.org", |
28 | 27 | version="v1", |
29 | 28 | ) |
30 | | - records = client.retrieve_docdb_records( |
| 29 | + |
| 30 | + all_records = client.retrieve_docdb_records( |
31 | 31 | filter_query={}, |
32 | | - projection=extra_columns, |
| 32 | + projection={"_id": 1}, |
33 | 33 | limit=0, |
34 | 34 | ) |
| 35 | + all_ids = [record["_id"] for record in all_records] |
| 36 | + |
| 37 | + # Batch by 100 to avoid excessively large queries |
| 38 | + batch_size = 100 |
| 39 | + |
| 40 | + records = [] |
| 41 | + for start_idx in range(0, len(all_ids), batch_size): |
| 42 | + print(f"Retrieving records {start_idx} to {start_idx + batch_size}...") |
| 43 | + end_idx = start_idx + batch_size |
| 44 | + batch_ids = all_ids[start_idx:end_idx] |
| 45 | + filter_query = {"_id": {"$in": batch_ids}} |
| 46 | + batch_records = client.retrieve_docdb_records( |
| 47 | + filter_query=filter_query, |
| 48 | + projection=extra_columns, |
| 49 | + limit=0, |
| 50 | + ) |
| 51 | + records.extend(batch_records) |
| 52 | + |
35 | 53 | for i, record in enumerate(records): |
36 | 54 | data_description = record.get("data_description", {}) |
37 | 55 | if data_description: |
38 | 56 | record["data_level"] = data_description.get("data_level", None) |
39 | 57 | record["project_name"] = data_description.get("project_name", None) |
40 | 58 | record.pop("data_description") |
41 | 59 |
|
42 | | - record.pop("_id") |
43 | 60 | records[i] = record |
| 61 | + print(f"Retrieved {len(records)} records from DocDB.") |
44 | 62 | return pd.DataFrame(records) |
45 | 63 |
|
46 | 64 |
|
47 | 65 | @pn.cache() |
48 | 66 | def get_redshift_table(): |
| 67 | + print("Connecting to Redshift RDS...") |
49 | 68 | rds_client = Client( |
50 | 69 | credentials=RDSCredentials( |
51 | 70 | aws_secrets_name=REDSHIFT_SECRETS, |
52 | 71 | ), |
53 | 72 | ) |
54 | 73 | df = rds_client.read_table(RDS_TABLE_NAME) |
| 74 | + print(f"Retrieved {len(df)} records from Redshift table.") |
55 | 75 | return df |
56 | 76 |
|
57 | 77 |
|
58 | 78 | @pn.cache() |
59 | 79 | def get_data(): |
60 | | - logger.info("Loading extra columns from DocDB...") |
| 80 | + print("Loading extra columns from DocDB...") |
61 | 81 | extra_col_df = get_extra_col_df() |
62 | | - logger.info("Loading Redshift table...") |
| 82 | + print("Loading Redshift table...") |
63 | 83 | df = get_redshift_table() |
64 | 84 | if df is None or len(df) == 0: |
65 | 85 | return pn.pane.Markdown("**Table is empty or could not be read**") |
66 | | - logger.info("Merging extra columns...") |
| 86 | + print("Merging extra columns...") |
67 | 87 | df = df.merge(extra_col_df, how="left", left_on="v1_id", right_on="_id") |
68 | 88 | return df |
69 | 89 |
|
|
0 commit comments