|
17 | 17 | "collections": Param( |
18 | 18 | default=None, |
19 | 19 | type=["null", "array"], |
20 | | - description="List of collection IDs to tag (optional if catalog_endpoint is provided)" |
21 | | - ), |
22 | | - "catalog_endpoint": Param( |
23 | | - default=None, |
24 | | - type=["null", "string"], |
25 | | - description="STAC catalog endpoint URL to fetch all collections from (optional if collections is provided)" |
| 20 | + description="List of collection IDs to tag" |
26 | 21 | ), |
27 | 22 | "tenant": Param(default=None, type="string", description="Tenant ID to tag the collection with"), |
28 | 23 | "tenant_field": Param( |
|
42 | 37 | Tags existing collections with tenant information by updating their properties field. |
43 | 38 |
|
44 | 39 | This pipeline: |
45 | | -1. Fetches existing collections from the STAC catalog (either from a list or from a catalog endpoint) |
| 40 | +1. Fetches existing collections from the STAC catalog |
46 | 41 | 2. Updates each collection's properties with tenant tags |
47 | 42 | 3. Re-ingests the updated collections |
48 | 43 |
|
|
51 | 46 | **Required Parameters:** |
52 | 47 | - `tenant` (string): Tenant ID to tag collections with |
53 | 48 |
|
54 | | -**Collection Source (provide one of the following):** |
| 49 | +**Collection Source:** |
55 | 50 | - `collections` (array of strings): List of collection IDs to tag |
56 | | -- `catalog_endpoint` (string): STAC catalog endpoint URL (e.g., `https://dev.openveda.cloud/api/stac/collections`) to fetch all collections from |
57 | | -
|
58 | 51 | **Optional Parameters:** |
59 | 52 | - `tenant_field` (string): Properties key to write tenant into (default: `eic:tenant`) |
60 | 53 | - `properties` (object): Additional properties to add/update on collections |
|
78 | 71 | } |
79 | 72 | ``` |
80 | 73 |
|
81 | | -**Tag all collections from a catalog:** |
82 | | -```json |
83 | | -{ |
84 | | - "catalog_endpoint": "https://dev.openveda.cloud/api/stac/collections", |
85 | | - "tenant": "tenant-123" |
86 | | -} |
87 | | -``` |
88 | 74 |
|
89 | 75 | **With additional properties:** |
90 | 76 | ```json |
|
109 | 95 |
|
110 | 96 | @task() |
111 | 97 | def get_collection_ids(ti=None): |
112 | | - """Extract and validate collection IDs from configuration or fetch from catalog endpoint""" |
| 98 | + """Extract and validate collection IDs from configuration""" |
113 | 99 | try: |
114 | 100 | config = ti.dag_run.conf |
115 | 101 | collections = config.get("collections") |
116 | | - catalog_endpoint = config.get("catalog_endpoint") |
117 | 102 | tenant = config.get("tenant") |
118 | 103 |
|
119 | 104 | logger.info(f"Starting collection ID validation. Tenant: {tenant}") |
120 | 105 |
|
121 | | - # If catalog_endpoint is provided, fetch all collections |
122 | | - if catalog_endpoint: |
123 | | - logger.info(f"Fetching all collections from catalog endpoint: {catalog_endpoint}") |
124 | | - |
125 | | - try: |
126 | | - response = requests.get(catalog_endpoint, timeout=30) |
127 | | - response.raise_for_status() |
128 | | - except requests.exceptions.RequestException as e: |
129 | | - error_msg = f"Failed to fetch collections from catalog endpoint {catalog_endpoint}: {str(e)}" |
130 | | - logger.error(error_msg) |
131 | | - raise ValueError(error_msg) from e |
132 | | - |
133 | | - try: |
134 | | - catalog_data = response.json() |
135 | | - except (ValueError, requests.exceptions.JSONDecodeError) as json_error: |
136 | | - error_msg = f"Failed to parse JSON response from catalog endpoint {catalog_endpoint}" |
137 | | - logger.error(error_msg) |
138 | | - raise ValueError(error_msg) from json_error |
139 | | - |
140 | | - # Extract collection IDs |
141 | | - if "collections" in catalog_data: |
142 | | - collections = [coll.get("id") for coll in catalog_data["collections"] if coll.get("id")] |
143 | | - else: |
144 | | - error_msg = f"Unexpected response format from catalog endpoint {catalog_endpoint}" |
145 | | - logger.error(error_msg) |
146 | | - raise ValueError(error_msg) |
147 | | - |
148 | | - if not collections: |
149 | | - error_msg = f"No collections found at catalog endpoint {catalog_endpoint}" |
150 | | - logger.error(error_msg) |
151 | | - raise ValueError(error_msg) |
152 | | - |
153 | | - logger.info(f"Found {len(collections)} collections from catalog endpoint") |
154 | | - |
155 | 106 | if not collections: |
156 | | - error_msg = "Either 'collections' list or 'catalog_endpoint' must be provided in DAG configuration" |
| 107 | + error_msg = "The 'collections' list must be provided in DAG configuration" |
157 | 108 | logger.error(error_msg) |
158 | 109 | raise ValueError(error_msg) |
159 | 110 |
|
|
0 commit comments