Skip to content

Commit 583676e

Browse files
authored
Merge pull request #25 from PySport/feat/add-cli-flag-to-disable-events
Add flag to CLI to disable EventBus
2 parents c1cc385 + b96a281 commit 583676e

File tree

6 files changed

+89
-42
lines changed

6 files changed

+89
-42
lines changed

ingestify/application/loader.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,8 @@ def collect_and_run(
3535
provider: Optional[str] = None,
3636
source: Optional[str] = None,
3737
):
38-
# First collect all selectors, before discovering datasets
39-
selectors = {}
38+
ingestion_plans = []
4039
for ingestion_plan in self.ingestion_plans:
41-
logger.info(f"Determining selectors for {ingestion_plan}")
42-
4340
if provider is not None:
4441
if ingestion_plan.source.provider != provider:
4542
logger.info(
@@ -54,6 +51,13 @@ def collect_and_run(
5451
)
5552
continue
5653

54+
ingestion_plans.append(ingestion_plan)
55+
56+
# First collect all selectors, before discovering datasets
57+
selectors = {}
58+
for ingestion_plan in ingestion_plans:
59+
logger.info(f"Determining selectors for {ingestion_plan}")
60+
5761
static_selectors = [
5862
selector
5963
for selector in ingestion_plan.selectors

ingestify/cmdline.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,25 @@ def cli():
8888
help="Source - only run tasks for a single source",
8989
type=str,
9090
)
91+
@click.option(
92+
"--disable-events",
93+
"disable_events",
94+
required=False,
95+
help="Disable events - disable all event handlers",
96+
is_flag=True,
97+
type=bool,
98+
)
9199
def run(
92100
config_file: str,
93101
bucket: Optional[str],
94102
dry_run: Optional[bool],
95103
provider: Optional[str],
96104
source: Optional[str],
97105
debug: Optional[bool],
106+
disable_events: Optional[bool],
98107
):
99108
try:
100-
engine = get_engine(config_file, bucket)
109+
engine = get_engine(config_file, bucket, disable_events=disable_events)
101110
except ConfigurationError as e:
102111
if debug:
103112
raise

ingestify/domain/models/ingestion/ingestion_job.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def execute(
218218
# Process all items in batches. Yield a IngestionJobSummary per batch
219219

220220
logger.info("Finding metadata")
221-
with ingestion_job_summary.record_timing("get_dataset_collection"):
221+
with ingestion_job_summary.record_timing("get_dataset_collection_metadata"):
222222
dataset_collection_metadata = store.get_dataset_collection(
223223
dataset_type=self.ingestion_plan.dataset_type,
224224
provider=self.ingestion_plan.source.provider,
@@ -232,6 +232,7 @@ def execute(
232232
# 1. The discover_datasets returns a list, and the entire list can be processed at once
233233
# 2. The discover_datasets returns an iterator of batches, in this case we need to process each batch
234234
try:
235+
logger.info(f"Finding datasets for selector={self.selector}")
235236
with ingestion_job_summary.record_timing("find_datasets"):
236237
dataset_resources = self.ingestion_plan.source.find_datasets(
237238
dataset_type=self.ingestion_plan.dataset_type,
@@ -249,6 +250,8 @@ def execute(
249250
yield ingestion_job_summary
250251
return
251252

253+
logger.info("Starting tasks")
254+
252255
finish_task_timer = ingestion_job_summary.start_timing("tasks")
253256

254257
while True:
@@ -273,13 +276,16 @@ def execute(
273276
for dataset_resource in batch
274277
]
275278

276-
# Load all available datasets based on the discovered dataset identifiers
277-
dataset_collection = store.get_dataset_collection(
278-
dataset_type=self.ingestion_plan.dataset_type,
279-
# Assume all DatasetResources share the same provider
280-
provider=batch[0].provider,
281-
selector=dataset_identifiers,
282-
)
279+
logger.info(f"Searching for existing Datasets for DatasetResources")
280+
281+
with ingestion_job_summary.record_timing("get_dataset_collection"):
282+
# Load all available datasets based on the discovered dataset identifiers
283+
dataset_collection = store.get_dataset_collection(
284+
dataset_type=self.ingestion_plan.dataset_type,
285+
# Assume all DatasetResources share the same provider
286+
provider=batch[0].provider,
287+
selector=dataset_identifiers,
288+
)
283289

284290
skipped_datasets = 0
285291

ingestify/domain/models/ingestion/ingestion_job_summary.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def increase_skipped_datasets(self, skipped_datasets: int):
6666
self.skipped_datasets += skipped_datasets
6767

6868
def task_count(self):
69-
return len(self.task_summaries)
69+
return len(self.task_summaries) + self.skipped_datasets
7070

7171
def _set_ended(self):
7272
self.failed_tasks = len(
@@ -106,22 +106,22 @@ def output_report(self):
106106
f"\nIngestionJobSummary {self.state.value} in {format_duration(self.duration)}"
107107
)
108108
print("********************************")
109-
print(f"* - IngestionPlan:")
110-
print(f"* Source: {self.source_name}")
111-
print(f"* Provider: {self.provider}")
112-
print(f"* DatasetType: {self.dataset_type}")
113-
print(f"* - Selector: {self.selector}")
114-
print(f"* - Timings: ")
109+
print(f" - IngestionPlan:")
110+
print(f" Source: {self.source_name}")
111+
print(f" Provider: {self.provider}")
112+
print(f" DatasetType: {self.dataset_type}")
113+
print(f" - Selector: {self.selector}")
114+
print(f" - Timings: ")
115115
for timing in self.timings:
116-
print(f"* - {timing.name}: {format_duration(timing.duration)}")
116+
print(f" - {timing.name}: {format_duration(timing.duration)}")
117117
print(
118-
f"* - Tasks: {len(self.task_summaries)} - {(len(self.task_summaries) / self.duration.total_seconds()):.1f} tasks/sec"
118+
f" - Tasks: {len(self.task_summaries)} - {(len(self.task_summaries) / self.duration.total_seconds()):.1f} tasks/sec"
119119
)
120120

121-
print(f"* - Failed tasks: {self.failed_tasks}")
122-
print(f"* - Successful tasks: {self.successful_tasks}")
123-
print(f"* - Successful ignored tasks: {self.ignored_successful_tasks}")
124-
print(f"* - Skipped datasets: {self.skipped_datasets}")
121+
print(f" - Failed tasks: {self.failed_tasks}")
122+
print(f" - Successful tasks: {self.successful_tasks}")
123+
print(f" - Successful ignored tasks: {self.ignored_successful_tasks}")
124+
print(f" - Skipped datasets: {self.skipped_datasets}")
125125
print("********************************")
126126

127127
def __enter__(self):

ingestify/infra/store/dataset/sqlalchemy/repository.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import itertools
2+
import logging
23
import uuid
34
from typing import Optional, Union, List
45

@@ -17,7 +18,7 @@
1718
)
1819
from sqlalchemy.engine import make_url
1920
from sqlalchemy.exc import NoSuchModuleError
20-
from sqlalchemy.orm import Session
21+
from sqlalchemy.orm import Session, Query
2122

2223
from ingestify.domain import File, Revision
2324
from ingestify.domain.models import (
@@ -42,6 +43,8 @@
4243
task_summary_table,
4344
)
4445

46+
logger = logging.getLogger(__name__)
47+
4548

4649
def parse_value(v):
4750
try:
@@ -199,9 +202,6 @@ def _filter_query(
199202
if not selectors:
200203
raise ValueError("Selectors must contain at least one item")
201204

202-
attribute_keys = selectors[
203-
0
204-
].filtered_attributes.keys() # Assume all selectors have the same keys
205205
attribute_sets = {
206206
tuple(selector.filtered_attributes.items()) for selector in selectors
207207
}
@@ -303,6 +303,12 @@ def load_datasets(self, dataset_ids: list[str]) -> list[Dataset]:
303303
)
304304
return datasets
305305

306+
def _debug_query(self, q: Query):
307+
text_ = q.statement.compile(
308+
compile_kwargs={"literal_binds": True}, dialect=self.session.bind.dialect
309+
)
310+
logger.debug(f"Running query: {text_}")
311+
306312
def get_dataset_collection(
307313
self,
308314
bucket: str,
@@ -326,18 +332,33 @@ def apply_query_filter(query):
326332
dataset_query = apply_query_filter(
327333
self.session.query(dataset_table.c.dataset_id)
328334
)
335+
self._debug_query(dataset_query)
329336
dataset_ids = [row.dataset_id for row in dataset_query]
330337
datasets = self.load_datasets(dataset_ids)
338+
339+
dataset_collection_metadata = DatasetCollectionMetadata(
340+
last_modified=max(dataset.last_modified_at for dataset in datasets)
341+
if datasets
342+
else None,
343+
row_count=len(datasets),
344+
)
331345
else:
332346
datasets = []
333347

334-
metadata_result_row = apply_query_filter(
335-
self.session.query(
336-
func.max(dataset_table.c.last_modified_at).label("last_modified_at"),
337-
func.count().label("row_count"),
348+
metadata_result_query = apply_query_filter(
349+
self.session.query(
350+
func.max(dataset_table.c.last_modified_at).label(
351+
"last_modified_at"
352+
),
353+
func.count().label("row_count"),
354+
)
355+
)
356+
357+
self._debug_query(metadata_result_query)
358+
359+
dataset_collection_metadata = DatasetCollectionMetadata(
360+
*metadata_result_query.first()
338361
)
339-
).first()
340-
dataset_collection_metadata = DatasetCollectionMetadata(*metadata_result_row)
341362

342363
return DatasetCollection(dataset_collection_metadata, datasets)
343364

ingestify/main.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,9 @@ def get_event_subscriber_cls(key: str) -> Type[Subscriber]:
182182
return import_cls(key)
183183

184184

185-
def get_engine(config_file, bucket: Optional[str] = None) -> IngestionEngine:
185+
def get_engine(
186+
config_file, bucket: Optional[str] = None, disable_events: bool = False
187+
) -> IngestionEngine:
186188
config = parse_config(config_file, default_value="")
187189

188190
logger.info("Initializing sources")
@@ -201,11 +203,16 @@ def get_engine(config_file, bucket: Optional[str] = None) -> IngestionEngine:
201203

202204
# Setup an EventBus and wire some more components
203205
event_bus = EventBus()
204-
publisher = Publisher()
205-
for subscriber in config.get("event_subscribers", []):
206-
cls = get_event_subscriber_cls(subscriber["type"])
207-
publisher.add_subscriber(cls(store))
208-
event_bus.register(publisher)
206+
if not disable_events:
207+
# When we disable all events we don't register any publishers
208+
publisher = Publisher()
209+
for subscriber in config.get("event_subscribers", []):
210+
cls = get_event_subscriber_cls(subscriber["type"])
211+
publisher.add_subscriber(cls(store))
212+
event_bus.register(publisher)
213+
else:
214+
logger.info("Disabling all event handlers")
215+
209216
store.set_event_bus(event_bus)
210217

211218
ingestion_engine = IngestionEngine(

0 commit comments

Comments
 (0)