11"""Data Loader CLI STAC_API Ingestion Tool."""
2- import json
2+
33import os
4+ from typing import Any
45
56import click
6- import requests
7+ import orjson
8+ from httpx import Client
79
810
9- def load_data (data_dir , filename ) :
11+ def load_data (filepath : str ) -> dict [ str , Any ] :
1012 """Load json data from a file within the specified data directory."""
11- filepath = os .path .join (data_dir , filename )
12- if not os .path .exists (filepath ):
13+ try :
14+ with open (filepath , "rb" ) as file :
15+ return orjson .loads (file .read ())
16+ except FileNotFoundError as e :
1317 click .secho (f"File not found: { filepath } " , fg = "red" , err = True )
14- raise click .Abort ()
15- with open (filepath ) as file :
16- return json .load (file )
18+ raise click .Abort () from e
1719
1820
19- def load_collection (base_url , collection_id , data_dir ) :
21+ def load_collection (client : Client , collection_id : str , data_dir : str ) -> None :
2022 """Load a STAC collection into the database."""
21- collection = load_data (data_dir , "collection.json" )
23+ collection = load_data (os . path . join ( data_dir , "collection.json" ) )
2224 collection ["id" ] = collection_id
23- try :
24- resp = requests .post (f"{ base_url } /collections" , json = collection )
25- if resp .status_code == 200 or resp .status_code == 201 :
26- click .echo (f"Status code: { resp .status_code } " )
27- click .echo (f"Added collection: { collection ['id' ]} " )
28- elif resp .status_code == 409 :
29- click .echo (f"Status code: { resp .status_code } " )
30- click .echo (f"Collection: { collection ['id' ]} already exists" )
31- else :
32- click .echo (f"Status code: { resp .status_code } " )
33- click .echo (
34- f"Error writing { collection ['id' ]} collection. Message: { resp .text } "
35- )
36- except requests .ConnectionError :
37- click .secho ("Failed to connect" , fg = "red" , err = True )
25+ resp = client .post ("/collections" , json = collection )
26+ if resp .status_code == 200 or resp .status_code == 201 :
27+ click .echo (f"Status code: { resp .status_code } " )
28+ click .echo (f"Added collection: { collection ['id' ]} " )
29+ elif resp .status_code == 409 :
30+ click .echo (f"Status code: { resp .status_code } " )
31+ click .echo (f"Collection: { collection ['id' ]} already exists" )
32+ else :
33+ click .echo (f"Status code: { resp .status_code } " )
34+ click .echo (f"Error writing { collection ['id' ]} collection. Message: { resp .text } " )
3835
3936
40- def load_items (base_url , collection_id , use_bulk , data_dir ):
37+ def load_items (
38+ client : Client , collection_id : str , use_bulk : bool , data_dir : str
39+ ) -> None :
4140 """Load STAC items into the database based on the method selected."""
42- # Attempt to dynamically find a suitable feature collection file
43- feature_files = [
44- file
45- for file in os .listdir (data_dir )
46- if file .endswith (".json" ) and file != "collection.json"
47- ]
48- if not feature_files :
41+ with os .scandir (data_dir ) as entries :
42+ # Attempt to dynamically find a suitable feature collection file
43+ # Use the first found feature collection file
44+ feature_file = next (
45+ (
46+ entry .path
47+ for entry in entries
48+ if entry .is_file ()
49+ and entry .name .endswith (".json" )
50+ and entry .name != "collection.json"
51+ ),
52+ None ,
53+ )
54+
55+ if feature_file is None :
4956 click .secho (
5057 "No feature collection files found in the specified directory." ,
5158 fg = "red" ,
5259 err = True ,
5360 )
5461 raise click .Abort ()
55- feature_collection_file = feature_files [
56- 0
57- ] # Use the first found feature collection file
58- feature_collection = load_data (data_dir , feature_collection_file )
5962
60- load_collection (base_url , collection_id , data_dir )
63+ feature_collection = load_data (feature_file )
64+
65+ load_collection (client , collection_id , data_dir )
6166 if use_bulk :
62- load_items_bulk_insert (base_url , collection_id , feature_collection , data_dir )
67+ load_items_bulk_insert (client , collection_id , feature_collection )
6368 else :
64- load_items_one_by_one (base_url , collection_id , feature_collection , data_dir )
69+ load_items_one_by_one (client , collection_id , feature_collection )
6570
6671
67- def load_items_one_by_one (base_url , collection_id , feature_collection , data_dir ):
72+ def load_items_one_by_one (
73+ client : Client , collection_id : str , feature_collection : dict [str , Any ]
74+ ) -> None :
6875 """Load STAC items into the database one by one."""
6976 for feature in feature_collection ["features" ]:
70- try :
71- feature ["collection" ] = collection_id
72- resp = requests .post (
73- f"{ base_url } /collections/{ collection_id } /items" , json = feature
74- )
75- if resp .status_code == 200 :
76- click .echo (f"Status code: { resp .status_code } " )
77- click .echo (f"Added item: { feature ['id' ]} " )
78- elif resp .status_code == 409 :
79- click .echo (f"Status code: { resp .status_code } " )
80- click .echo (f"Item: { feature ['id' ]} already exists" )
81- except requests .ConnectionError :
82- click .secho ("Failed to connect" , fg = "red" , err = True )
83-
84-
85- def load_items_bulk_insert (base_url , collection_id , feature_collection , data_dir ):
86- """Load STAC items into the database via bulk insert."""
87- try :
88- for i , _ in enumerate (feature_collection ["features" ]):
89- feature_collection ["features" ][i ]["collection" ] = collection_id
90- resp = requests .post (
91- f"{ base_url } /collections/{ collection_id } /items" , json = feature_collection
92- )
77+ feature ["collection" ] = collection_id
78+ resp = client .post (f"/collections/{ collection_id } /items" , json = feature )
9379 if resp .status_code == 200 :
9480 click .echo (f"Status code: { resp .status_code } " )
95- click .echo ("Bulk inserted items successfully." )
96- elif resp .status_code == 204 :
97- click .echo (f"Status code: { resp .status_code } " )
98- click .echo ("Bulk update successful, no content returned." )
81+ click .echo (f"Added item: { feature ['id' ]} " )
9982 elif resp .status_code == 409 :
10083 click .echo (f"Status code: { resp .status_code } " )
101- click .echo ("Conflict detected, some items might already exist." )
102- except requests .ConnectionError :
103- click .secho ("Failed to connect" , fg = "red" , err = True )
84+ click .echo (f"Item: { feature ['id' ]} already exists" )
85+
86+
87+ def load_items_bulk_insert (
88+ client : Client , collection_id : str , feature_collection : dict [str , Any ]
89+ ) -> None :
90+ """Load STAC items into the database via bulk insert."""
91+ for feature in feature_collection ["features" ]:
92+ feature ["collection" ] = collection_id
93+ resp = client .post (f"/collections/{ collection_id } /items" , json = feature_collection )
94+ if resp .status_code == 200 :
95+ click .echo (f"Status code: { resp .status_code } " )
96+ click .echo ("Bulk inserted items successfully." )
97+ elif resp .status_code == 204 :
98+ click .echo (f"Status code: { resp .status_code } " )
99+ click .echo ("Bulk update successful, no content returned." )
100+ elif resp .status_code == 409 :
101+ click .echo (f"Status code: { resp .status_code } " )
102+ click .echo ("Conflict detected, some items might already exist." )
104103
105104
106105@click .command ()
@@ -117,9 +116,10 @@ def load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir
117116 default = "sample_data/" ,
118117 help = "Directory containing collection.json and feature collection file" ,
119118)
120- def main (base_url , collection_id , use_bulk , data_dir ) :
119+ def main (base_url : str , collection_id : str , use_bulk : bool , data_dir : str ) -> None :
121120 """Load STAC items into the database."""
122- load_items (base_url , collection_id , use_bulk , data_dir )
121+ with Client (base_url = base_url ) as client :
122+ load_items (client , collection_id , use_bulk , data_dir )
123123
124124
125125if __name__ == "__main__" :
0 commit comments