Skip to content

Commit 7e0dc2a

Browse files
committed
fix: review state management
1 parent 4c90779 commit 7e0dc2a

File tree

5 files changed

+61
-35
lines changed

5 files changed

+61
-35
lines changed

docker-compose.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ services:
66
MINIO_ROOT_USER: minioadmin
77
MINIO_ROOT_PASSWORD: minioadmin
88
ports:
9-
- "19000:9000"
10-
- "19001:9001"
9+
- "9000:9000"
10+
- "9001:9001"
1111
volumes:
1212
- minio_data:/data
1313
healthcheck:

meltano.yml

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,23 @@ plugins:
3131
# - centre
3232
# - edition
3333

34-
- path: ./data/icon-d2*.grib2
35-
table_name: icon_d2
36-
bboxes:
37-
- [46, 11, 45, 12]
38-
skip_past: true
39-
# ignore_fields:
40-
# - ensemble
41-
# - grid_type
42-
# - centre
43-
# - edition
34+
# - path: ./data/icon-d2*.grib2
35+
# table_name: icon_d2
36+
# bboxes:
37+
# - [46, 11, 45, 12]
38+
# skip_past: true
39+
# # ignore_fields:
40+
# # - ensemble
41+
# # - grid_type
42+
# # - centre
43+
# # - edition
4444

45-
# - path: s3://local-data/test.grib
46-
# ignore_fields:
47-
# - ensemble
48-
# - grid_type
49-
# - centre
50-
# - edition
45+
- path: s3://local-data/*.grib
46+
ignore_fields:
47+
- ensemble
48+
- grid_type
49+
- centre
50+
- edition
5151

5252
loaders:
5353
- name: target-jsonl

tap_grib/client.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,7 @@ def __init__(
112112
self,
113113
tap,
114114
name: str,
115-
*,
116-
file_path: str | None,
115+
file_path: str | None = None,
117116
primary_keys: list[str] | None = None,
118117
skip_past_reference: str | None = None,
119118
skip_past: bool | None = False,
@@ -122,6 +121,9 @@ def __init__(
122121
bboxes: list[tuple[float, float, float, float]] | None = None,
123122
**kwargs,
124123
):
124+
125+
super().__init__(tap=tap, name=name, **kwargs)
126+
125127
self.file_path = file_path
126128
self.extra_files = extra_files or ([file_path] if file_path else [])
127129
self.primary_keys = primary_keys or [
@@ -157,10 +159,9 @@ def __init__(
157159
raise ValueError(f"Cannot ignore core fields: {', '.join(sorted(invalid))}")
158160
self.ignore_fields = ignore_fields
159161

160-
# now call parent init with only tap/name/kwargs
161-
super().__init__(tap=tap, name=name, **kwargs)
162+
# super().__init__(tap=tap, name=name, **kwargs)
162163

163-
self.state_partitioning_keys = [SDC_FILENAME]
164+
# self.state_partitioning_keys = [SDC_FILENAME]
164165
self.replication_key = SDC_INCREMENTAL_KEY
165166
self.forced_replication_method = "INCREMENTAL"
166167

@@ -210,12 +211,27 @@ def get_records(
210211
dict[str, t.Any] | tuple[dict[t.Any, t.Any], dict[t.Any, t.Any] | None]
211212
]:
212213

214+
start_mtime: datetime | None = self.get_starting_timestamp(context)
215+
if start_mtime and start_mtime.tzinfo is None:
216+
start_mtime = start_mtime.replace(tzinfo=timezone.utc)
217+
elif start_mtime:
218+
start_mtime = start_mtime.astimezone(timezone.utc)
219+
213220
for path in self.extra_files:
214221
self.logger.info(f"[{self.name}] Streaming records from {path}")
215222
storage = Storage(path)
216223
info = storage.describe(path)
217224
mtime = info.mtime
218225

226+
if start_mtime is not None and mtime is not None and mtime <= start_mtime:
227+
self.logger.info(
228+
"Skipping %s (mtime=%s <= bookmark=%s)",
229+
path,
230+
mtime,
231+
start_mtime,
232+
)
233+
continue
234+
219235
filename = info.path
220236
partition_context = {SDC_FILENAME: filename}
221237
last_bookmark = self.get_starting_replication_key_value(partition_context)
@@ -273,15 +289,6 @@ def get_records(
273289
if lats.size == 0:
274290
continue
275291

276-
for msg in grbs:
277-
try:
278-
lats, lons, vals = _extract_grid(msg)
279-
except Exception as e:
280-
self.logger.warning(f"Skipping message: {e}")
281-
continue
282-
if lats.size == 0:
283-
continue
284-
285292
# safe datetime extraction
286293
valid_dt = getattr(msg, "validDate", None)
287294
if valid_dt is None:

tap_grib/tap.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,15 @@ def _parse_bboxes(
111111

112112
def default_stream_name(self, pattern: str) -> str:
113113
base = os.path.splitext(os.path.basename(pattern))[0]
114-
safe = re.sub(r"[^0-9a-zA-Z]+", "_", base)
115-
return safe.strip("_").lower()
114+
115+
# sanitize
116+
safe = re.sub(r"[^0-9a-zA-Z]+", "_", base).strip("_").lower()
117+
118+
# Fallback if empty
119+
if not safe:
120+
safe = "grib_stream"
121+
122+
return safe
116123

117124
def discover_streams(self) -> list[Stream]:
118125
"""Discover a single stream per path pattern (merging all matching files)."""

taskfile.yaml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,21 @@
11
version: "3"
22

33
tasks:
4+
run:
5+
env:
6+
S3_ACCESS_KEY_ID: minioadmin
7+
S3_SECRET_ACCESS_KEY: minioadmin
8+
S3_BUCKET: local-data
9+
S3_ENDPOINT_URL: http://localhost:9000
10+
cmds:
11+
- uv run meltano run tap-grib target-jsonl {{.CLI_ARGS}}
12+
13+
run:refresh:
14+
cmds:
15+
- task run -- --full-refresh
16+
417
release:
518
cmds:
6-
# - uv lock --upgrade-package mqtt-ingestor
719
- uv run semantic-release -v version --no-vcs-release
820
- git push
921
- git push --tags

0 commit comments

Comments
 (0)