Skip to content

Commit 10eb0d0

Browse files
authored
feat: 1325 add global and per feed configurationintegrate feeds download (#1363)
1 parent 225860b commit 10eb0d0

File tree

8 files changed

+128
-35
lines changed

8 files changed

+128
-35
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from typing import Optional, Any
2+
from sqlalchemy.orm import Session
3+
from shared.database.database import with_db_session
4+
from shared.database_gen.sqlacodegen_models import ConfigKey, ConfigValueFeed
5+
6+
7+
@with_db_session
8+
def get_config_value(
9+
namespace: str,
10+
key: str,
11+
feed_id: Optional[str] = None,
12+
db_session: Session = None,
13+
) -> Optional[Any]:
14+
"""
15+
Retrieves a configuration value from the database using the provided session.
16+
17+
It first looks for a feed-specific override. If a feed_stable_id is provided and a
18+
value is found, it returns that value.
19+
20+
If no feed-specific value is found or no feed_stable_id is provided, it looks for
21+
the global default value in the `config_key` table.
22+
23+
:param namespace: The namespace of the configuration key.
24+
:param key: The configuration key.
25+
:param feed_stable_id: The optional feed_stable_id for a specific override.
26+
:param db_session: The SQLAlchemy session, injected by the `with_db_session` decorator.
27+
:return: The configuration value, or None if not found.
28+
"""
29+
# 1. Try to get feed-specific value if feed_id is provided
30+
if feed_id:
31+
feed_config = (
32+
db_session.query(ConfigValueFeed.value)
33+
.filter(
34+
ConfigValueFeed.feed_id == feed_id,
35+
ConfigValueFeed.namespace == namespace,
36+
ConfigValueFeed.key == key,
37+
)
38+
.first()
39+
)
40+
if feed_config:
41+
return feed_config.value
42+
43+
# 2. If not found or no feed_id, get the default value
44+
default_config = (
45+
db_session.query(ConfigKey.default_value).filter(ConfigKey.namespace == namespace, ConfigKey.key == key).first()
46+
)
47+
48+
return default_config.default_value if default_config else None

functions-python/batch_process_dataset/src/main.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,18 +122,20 @@ def create_dataset_stable_id(feed_stable_id, timestamp):
122122
"""
123123
return f"{feed_stable_id}-{timestamp}"
124124

125-
def download_content(self, temporary_file_path):
125+
def download_content(self, temporary_file_path, feed_id):
126126
"""
127127
Downloads the content of a URL and return the hash of the file
128128
"""
129129
file_hash = download_and_get_hash(
130130
self.producer_url,
131-
temporary_file_path,
131+
file_path=temporary_file_path,
132+
feed_id=feed_id,
132133
authentication_type=self.authentication_type,
133134
api_key_parameter_name=self.api_key_parameter_name,
134135
credentials=self.feed_credentials,
135136
logger=self.logger,
136137
)
138+
self.logger.info(f"hash is: {file_hash}")
137139
is_zip = zipfile.is_zipfile(temporary_file_path)
138140
return file_hash, is_zip
139141

@@ -193,7 +195,7 @@ def upload_files_to_storage(
193195
)
194196
return blob, extracted_files
195197

196-
def upload_dataset(self, public=True) -> DatasetFile or None:
198+
def upload_dataset(self, feed_id, public=True) -> DatasetFile or None:
197199
"""
198200
Uploads a dataset to a GCP bucket as <feed_stable_id>/latest.zip and
199201
<feed_stable_id>/<feed_stable_id>-<upload_datetime>.zip
@@ -204,7 +206,7 @@ def upload_dataset(self, public=True) -> DatasetFile or None:
204206
try:
205207
self.logger.info("Accessing URL %s", self.producer_url)
206208
temp_file_path = self.generate_temp_filename()
207-
file_sha256_hash, is_zip = self.download_content(temp_file_path)
209+
file_sha256_hash, is_zip = self.download_content(temp_file_path, feed_id)
208210
if not is_zip:
209211
self.logger.error(
210212
f"[{self.feed_stable_id}] The downloaded file from {self.producer_url} is not a valid ZIP file."
@@ -417,12 +419,14 @@ def _get_unzipped_size(dataset_file):
417419
)
418420

419421
@with_db_session
420-
def process_from_producer_url(self, db_session) -> Optional[DatasetFile]:
422+
def process_from_producer_url(
423+
self, feed_id, db_session: Session
424+
) -> Optional[DatasetFile]:
421425
"""
422426
Process the dataset and store new version in GCP bucket if any changes are detected
423427
:return: the DatasetFile object created
424428
"""
425-
dataset_file = self.upload_dataset()
429+
dataset_file = self.upload_dataset(feed_id)
426430

427431
if dataset_file is None:
428432
self.logger.info(f"[{self.feed_stable_id}] No database update required.")
@@ -543,7 +547,7 @@ def process_dataset(cloud_event: CloudEvent):
543547
if json_payload.get("use_bucket_latest", False):
544548
dataset_file = processor.process_from_bucket()
545549
else:
546-
dataset_file = processor.process_from_producer_url()
550+
dataset_file = processor.process_from_producer_url(json_payload["feed_id"])
547551
except Exception as e:
548552
# This makes sure the logger is initialized
549553
logger = get_logger("process_dataset", stable_id if stable_id else "UNKNOWN")

functions-python/batch_process_dataset/src/scripts/download_verifier.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ def verify_download_content(producer_url: str):
3333
)
3434
tempfile = processor.generate_temp_filename()
3535
logging.info(f"Temp filename: {tempfile}")
36-
file_hash, is_zip = processor.download_content(tempfile)
36+
file_hash, is_zip = processor.download_content(tempfile, "feed_id")
37+
logging.info(f"Downloaded file from {producer_url} is a valid ZIP file: {is_zip}")
3738
logging.info(f"File hash: {file_hash}")
3839

3940

@@ -48,7 +49,7 @@ def verify_upload_dataset(producer_url: str):
4849
"""
4950
processor = DatasetProcessor(
5051
producer_url=producer_url,
51-
feed_id="feed_id",
52+
feed_id="feed_id_2126",
5253
feed_stable_id="feed_stable_id",
5354
execution_id=None,
5455
latest_hash="123",
@@ -59,7 +60,7 @@ def verify_upload_dataset(producer_url: str):
5960
)
6061
tempfile = processor.generate_temp_filename()
6162
logging.info(f"Temp filename: {tempfile}")
62-
dataset_file = processor.upload_dataset(public=False)
63+
dataset_file = processor.upload_dataset("feed_id_2126", False)
6364
logging.info(f"Dataset File: {dataset_file}")
6465

6566

@@ -72,6 +73,7 @@ def verify_upload_dataset(producer_url: str):
7273
# create working dir if not exists
7374
if not os.path.exists(os.environ["WORKING_DIR"]):
7475
os.makedirs(os.environ["WORKING_DIR"])
76+
7577
server = create_server(
7678
host=HOST, port=PORT, in_memory=False, default_bucket=BUCKET_NAME
7779
)
@@ -80,7 +82,6 @@ def verify_upload_dataset(producer_url: str):
8082
verify_download_content(producer_url=PRODUCER_URL)
8183
logging.info("Download content verification completed successfully.")
8284
verify_upload_dataset(producer_url=PRODUCER_URL)
83-
verify_upload_dataset(producer_url=PRODUCER_URL)
8485
except Exception as e:
8586
logging.error(f"Error verifying download content: {e}")
8687
finally:

functions-python/batch_process_dataset/tests/test_batch_process_dataset_main.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def test_upload_dataset_diff_hash(
7373
test_hosted_public_url,
7474
)
7575
with patch.object(processor, "date", "mocked_timestamp"):
76-
result = processor.upload_dataset()
76+
result = processor.upload_dataset("feed_id")
7777

7878
self.assertIsNotNone(result)
7979
mock_download_url_content.assert_called_once()
@@ -111,7 +111,7 @@ def test_upload_dataset_same_hash(
111111
test_hosted_public_url,
112112
)
113113

114-
result = processor.upload_dataset()
114+
result = processor.upload_dataset("feed_id")
115115

116116
self.assertIsNone(result)
117117
upload_files_to_storage.blob.assert_not_called()
@@ -143,7 +143,7 @@ def test_upload_dataset_not_zip(
143143
test_hosted_public_url,
144144
)
145145

146-
result = processor.upload_dataset()
146+
result = processor.upload_dataset("feed_id")
147147

148148
self.assertIsNone(result)
149149
upload_files_to_storage.blob.assert_not_called()
@@ -176,7 +176,7 @@ def test_upload_dataset_download_exception(
176176
)
177177

178178
with self.assertRaises(Exception):
179-
processor.upload_dataset()
179+
processor.upload_dataset("feed_id")
180180

181181
def test_upload_files_to_storage(self):
182182
bucket_name = "test-bucket"
@@ -256,7 +256,7 @@ def test_process(self, db_session):
256256
)
257257
db_url = os.getenv("TEST_FEEDS_DATABASE_URL", default=default_db_url)
258258
os.environ["FEEDS_DATABASE_URL"] = db_url
259-
result = processor.process_from_producer_url()
259+
result = processor.process_from_producer_url(feed_id)
260260

261261
self.assertIsNotNone(result)
262262
self.assertEqual(result.file_sha256_hash, new_hash)
@@ -357,7 +357,7 @@ def test_process_no_change(self):
357357

358358
processor.upload_dataset = MagicMock(return_value=None)
359359
processor.create_dataset_entities = MagicMock()
360-
result = processor.process_from_producer_url()
360+
result = processor.process_from_producer_url(feed_id)
361361

362362
self.assertIsNone(result)
363363
processor.create_dataset_entities.assert_not_called()

functions-python/helpers/tests/test_helpers.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ def test_create_bucket_already_exists(self, mock_storage_client):
4545
)
4646
mock_storage_client.return_value.create_bucket.assert_not_called()
4747

48-
def test_download_and_get_hash(self):
48+
@patch("shared.common.config_reader.get_config_value", return_value=None)
49+
def test_download_and_get_hash(self, mock_get_config):
4950
mock_binary_data = b"file content data"
5051
expected_hash = hashlib.sha256(mock_binary_data).hexdigest()
5152
file_path = "file_path"
@@ -65,7 +66,8 @@ def test_download_and_get_hash(self):
6566
if os.path.exists(file_path):
6667
os.remove(file_path)
6768

68-
def test_download_and_get_hash_auth_type_header(self):
69+
@patch("shared.common.config_reader.get_config_value", return_value=None)
70+
def test_download_and_get_hash_auth_type_header(self, mock_get_config):
6971
"""
7072
Test the download_and_get_hash function for authentication type 2 (headers).
7173
This test verifies that the download_and_get_hash function correctly handles authentication type 2,
@@ -88,7 +90,11 @@ def test_download_and_get_hash_auth_type_header(self):
8890
"urllib3.PoolManager.request", return_value=mock_response
8991
) as mock_request:
9092
result_hash = download_and_get_hash(
91-
url, file_path, "sha256", 8192, 2, api_key_parameter_name, credentials
93+
url=url,
94+
file_path=file_path,
95+
authentication_type=2,
96+
api_key_parameter_name=api_key_parameter_name,
97+
credentials=credentials,
9298
)
9399

94100
self.assertEqual(
@@ -113,7 +119,8 @@ def test_download_and_get_hash_auth_type_header(self):
113119
if os.path.exists(file_path):
114120
os.remove(file_path)
115121

116-
def test_download_and_get_hash_auth_type_api_key(self):
122+
@patch("shared.common.config_reader.get_config_value", return_value=None)
123+
def test_download_and_get_hash_auth_type_api_key(self, mock_get_config):
117124
"""
118125
Test the download_and_get_hash function for authentication type 1 (API key).
119126
"""
@@ -137,13 +144,11 @@ def test_download_and_get_hash_auth_type_api_key(self):
137144

138145
with patch("urllib3.PoolManager", return_value=mock_http):
139146
result_hash = download_and_get_hash(
140-
base_url,
141-
file_path,
142-
"sha256",
143-
8192,
144-
1,
145-
api_key_parameter_name,
146-
credentials,
147+
url=base_url,
148+
file_path=file_path,
149+
authentication_type=1,
150+
api_key_parameter_name=api_key_parameter_name,
151+
credentials=credentials,
147152
)
148153

149154
self.assertEqual(
@@ -163,7 +168,8 @@ def test_download_and_get_hash_auth_type_api_key(self):
163168
if os.path.exists(file_path):
164169
os.remove(file_path)
165170

166-
def test_download_and_get_hash_exception(self):
171+
@patch("shared.common.config_reader.get_config_value", return_value=None)
172+
def test_download_and_get_hash_exception(self, mock_get_config):
167173
file_path = "test_file.txt"
168174
url = "https://test.com/"
169175

functions-python/helpers/utils.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,15 @@ def download_and_get_hash(
126126
file_path,
127127
hash_algorithm="sha256",
128128
chunk_size=8192,
129+
feed_id=None,
129130
authentication_type=0,
130131
api_key_parameter_name=None,
131132
credentials=None,
132133
logger=None,
133134
trusted_certs=False, # If True, disables SSL verification
134135
):
136+
from shared.common.config_reader import get_config_value
137+
135138
"""
136139
Downloads the content of a URL and stores it in a file and returns the hash of the file
137140
"""
@@ -145,13 +148,18 @@ def download_and_get_hash(
145148
ctx.load_default_certs()
146149
ctx.options |= 0x4 # ssl.OP_LEGACY_SERVER_CONNECT
147150

151+
headers = get_config_value(
152+
namespace="feed_download", key="http_headers", feed_id=feed_id
153+
)
154+
if headers is None:
155+
headers = {
156+
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) "
157+
"AppleWebKit/537.36 (KHTML, like Gecko) "
158+
"Chrome/126.0.0.0 Mobile Safari/537.36",
159+
"Referer": url,
160+
}
161+
148162
# authentication_type == 1 -> the credentials are passed in the url
149-
headers = {
150-
"User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) "
151-
"AppleWebKit/537.36 (KHTML, like Gecko) "
152-
"Chrome/126.0.0.0 Mobile Safari/537.36",
153-
"Referer": url,
154-
}
155163
# Careful, some URLs may already contain a query string
156164
# (e.g. http://api.511.org/transit/datafeeds?operator_id=CE)
157165
if authentication_type == 1 and api_key_parameter_name and credentials:

liquibase/changelog.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,5 @@
6969
<include file="changes/feat_1333.sql" relativeToChangelogFile="true"/>
7070
<include file="changes/feat_pt_152.sql" relativeToChangelogFile="true"/>
7171
<include file="changes/feat_fix_geolocation_circular_dep.sql" relativeToChangelogFile="true"/>
72+
<include file="changes/feat_1325.sql" relativeToChangelogFile="true"/>
7273
</databaseChangeLog>

liquibase/changes/feat_1325.sql

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
ALTER TABLE feed ADD CONSTRAINT feed_stable_id_unique UNIQUE (stable_id);
2+
-- 1. Catalog of allowed keys + optional global values
3+
CREATE TABLE config_key (
4+
namespace text NOT NULL, -- e.g. 'reverse_geolocation'
5+
key text NOT NULL, -- e.g. 'country_fallback'
6+
description text,
7+
default_value jsonb, -- fallback if nothing else
8+
updated_at timestamptz NOT NULL DEFAULT now(),
9+
PRIMARY KEY (namespace, key)
10+
);
11+
12+
-- 2. Per-feed overrides
13+
CREATE TABLE config_value_feed (
14+
feed_id varchar(255) NOT NULL,
15+
feed_stable_id varchar(255) NOT NULL,
16+
namespace text NOT NULL,
17+
key text NOT NULL,
18+
value jsonb NOT NULL,
19+
updated_at timestamptz NOT NULL DEFAULT now(),
20+
PRIMARY KEY (feed_id, namespace, key),
21+
FOREIGN KEY (namespace, key) REFERENCES config_key(namespace, key) ON DELETE CASCADE
22+
);
23+
24+
-- Helpful index
25+
CREATE INDEX config_value_feed_gin ON config_value_feed USING GIN (value);

0 commit comments

Comments
 (0)