Skip to content

Commit 273e79e

Browse files
committed
Don't delete checkpoint when other steps fail
1 parent c433dbf commit 273e79e

File tree

5 files changed

+55
-4
lines changed

5 files changed

+55
-4
lines changed

nodestream/pipeline/extractors/extractor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ async def start(self, context: StepContext):
2424
await self.resume_from_checkpoint(checkpoint)
2525

2626
async def finish(self, context: StepContext):
27-
context.debug("Clearing checkpoint for extractor since extractor is finished.")
28-
context.object_store.delete(CHECKPOINT_OBJECT_KEY)
27+
if not context.pipeline_encountered_fatal_error:
28+
context.debug(
29+
"Clearing checkpoint for extractor since extractor is finished."
30+
)
31+
context.object_store.delete(CHECKPOINT_OBJECT_KEY)
2932

3033
async def make_checkpoint(self) -> T:
3134
return None

nodestream/pipeline/object_storage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ def delete(self, key: str):
309309
self.store.delete(key)
310310

311311

312-
class S3ObjectStore(ObjectStore):
312+
class S3ObjectStore(ObjectStore, alias="s3"):
313313
__slots__ = ("client", "bucket_name")
314314

315315
def __init__(self, bucket_name: str, **client_factory_args):

nodestream/pipeline/progress_reporter.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ class PipelineProgressReporter:
3737
on_start_callback: Callable[[], None] = field(default=no_op)
3838
on_finish_callback: Callable[[Metrics], None] = field(default=no_op)
3939
on_fatal_error_callback: Callable[[Exception], None] = field(default=no_op)
40+
encountered_fatal_error: bool = field(default=False)
41+
42+
def on_fatal_error(self, exception: Exception):
43+
self.encountered_fatal_error = True
44+
self.on_fatal_error_callback(exception)
4045

4146
@classmethod
4247
def for_testing(cls, results_list: list) -> "PipelineProgressReporter":

nodestream/pipeline/step.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ def __init__(
2626
self.index = index
2727
self.object_store = object_store
2828

29+
@property
30+
def pipeline_encountered_fatal_error(self) -> bool:
31+
"""Whether the pipeline has encountered a fatal error."""
32+
return self.reporter.encountered_fatal_error
33+
2934
def report_error(
3035
self,
3136
message: str,
@@ -51,7 +56,7 @@ def report_error(
5156
stack_info=True,
5257
)
5358
if fatal:
54-
self.reporter.on_fatal_error_callback(exception)
59+
self.reporter.on_fatal_error(exception)
5560
else:
5661
Metrics.get().increment(Metric.NON_FATAL_ERRORS)
5762

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import pytest
2+
3+
from nodestream.pipeline import Pipeline
4+
from nodestream.pipeline.extractors import IterableExtractor
5+
from nodestream.pipeline.object_storage import ObjectStore
6+
from nodestream.pipeline.progress_reporter import PipelineProgressReporter
7+
from nodestream.pipeline.step import Step
8+
9+
10+
class ExplodesAfter(Step):
11+
def __init__(self, explode_after: int):
12+
self.count = 0
13+
self.explode_after = explode_after
14+
15+
async def process_record(self, record, context):
16+
self.count += 1
17+
if self.count > self.explode_after:
18+
raise Exception("Exploded!")
19+
yield record
20+
21+
22+
@pytest.mark.asyncio
23+
async def test_snapshot_handling_during_errors(mocker):
24+
extractor = IterableExtractor.range(0, 100000000)
25+
kaboom = ExplodesAfter(5000)
26+
storage = mocker.Mock(spec=ObjectStore)
27+
storage.namespaced.return_value = storage
28+
pipeline = Pipeline((extractor, kaboom), 1000, storage)
29+
30+
def raise_fatal_error(e):
31+
# raise e
32+
pass
33+
34+
await pipeline.run(
35+
PipelineProgressReporter(on_fatal_error_callback=raise_fatal_error)
36+
)
37+
storage.delete.assert_not_called()
38+
assert storage.put_picklable.call_count == 6

0 commit comments

Comments
 (0)