Skip to content

Commit d270e3c

Browse files
authored
Merge pull request #25 from linuxfoundation/feat/replace-serialization-libs-with-msgspec
Replace serialization libraries with msgspec for better Decimal handling
2 parents 276514d + de116c2 commit d270e3c

File tree

6 files changed

+250
-243
lines changed

6 files changed

+250
-243
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Copyright The Linux Foundation and each contributor to LFX.
2+
# SPDX-License-Identifier: MIT
3+
---
4+
apiVersion: batch/v1
5+
kind: CronJob
6+
metadata:
7+
name: meltano-el-postgres-cronjob
8+
namespace: v1-sync-helper
9+
labels:
10+
app.kubernetes.io/name: lfx-v1-sync-helper
11+
app.kubernetes.io/component: meltano
12+
spec:
13+
schedule: "*/5 * * * *" # Every 5 minutes
14+
concurrencyPolicy: Forbid # Prevent concurrent execution
15+
successfulJobsHistoryLimit: 3
16+
failedJobsHistoryLimit: 3
17+
jobTemplate:
18+
metadata:
19+
labels:
20+
app.kubernetes.io/name: lfx-v1-sync-helper
21+
app.kubernetes.io/component: meltano
22+
spec:
23+
ttlSecondsAfterFinished: 3600 # Clean up job after 1 hour
24+
backoffLimit: 1
25+
template:
26+
metadata:
27+
labels:
28+
app.kubernetes.io/name: lfx-v1-sync-helper
29+
app.kubernetes.io/component: meltano
30+
spec:
31+
restartPolicy: Never
32+
containers:
33+
- name: meltano
34+
image: ghcr.io/linuxfoundation/lfx-v1-sync-helper/meltano:v0.4.2
35+
imagePullPolicy: Always
36+
workingDir: /app/meltano
37+
args:
38+
- el
39+
# Catalog from mounted ConfigMap
40+
- "--catalog"
41+
- "/catalogs/tap-postgres/catalog.json"
42+
# State ID for this el job
43+
- "--state-id"
44+
- platform-db-incremental
45+
# Source
46+
- tap-postgres
47+
# Target
48+
- target-nats-kv
49+
env:
50+
# Environment (prod/staging/dev per meltano.yml "environments" list)
51+
- name: MELTANO_ENVIRONMENT
52+
value: "prod"
53+
# S3 state backend (prod/staging/dev suffix from OpenTofu workspace names)
54+
- name: MELTANO_STATE_BACKEND_URI
55+
value: "s3://lfx-v2-meltano-state-prod"
56+
# AWS configuration for DynamoDB access
57+
- name: AWS_DEFAULT_REGION
58+
value: "us-west-2"
59+
- name: AWS_REGION
60+
value: "us-west-2"
61+
# Postgres configuration
62+
- name: TAP_POSTGRES_HOST
63+
valueFrom:
64+
secretKeyRef:
65+
name: postgres-credentials-ad-hoc
66+
key: host
67+
optional: true
68+
- name: TAP_POSTGRES_PORT
69+
value: "5432"
70+
- name: TAP_POSTGRES_DATABASE
71+
value: "sfdc"
72+
- name: TAP_POSTGRES_USER
73+
valueFrom:
74+
secretKeyRef:
75+
name: postgres-credentials-ad-hoc
76+
key: username
77+
optional: true
78+
- name: TAP_POSTGRES_PASSWORD
79+
valueFrom:
80+
secretKeyRef:
81+
name: postgres-credentials-ad-hoc
82+
key: password
83+
optional: true
84+
# Target NATS KV configuration
85+
- name: TARGET_NATS_KV_URL
86+
value: "nats://lfx-platform-nats.lfx.svc.cluster.local:4222"
87+
- name: TARGET_NATS_KV_BUCKET
88+
value: "v1-objects"
89+
- name: TARGET_NATS_KV_REFRESH_MODE
90+
value: "newer"
91+
- name: TARGET_NATS_KV_VALIDATE_RECORDS
92+
# (true for PostgreSQL; false for DynamoDB)
93+
value: "true"
94+
- name: TARGET_NATS_KV_MSGPACK
95+
value: "true"
96+
resources:
97+
requests:
98+
memory: "512Mi"
99+
cpu: "250m"
100+
limits:
101+
memory: "2Gi"
102+
cpu: "1000m"
103+
# Health check - Meltano jobs don't typically have health endpoints
104+
# but we can check if the process is running
105+
livenessProbe:
106+
exec:
107+
command: ["pgrep", "-f", "meltano"]
108+
initialDelaySeconds: 30
109+
periodSeconds: 60
110+
failureThreshold: 3
111+
volumeMounts:
112+
- name: tap-postgres-catalog
113+
mountPath: /catalogs/tap-postgres
114+
readOnly: true
115+
volumes:
116+
- name: tap-postgres-catalog
117+
configMap:
118+
name: tap-postgres-catalog
119+
# Use service account with appropriate permissions for AWS
120+
serviceAccountName: v1-sync-helper-sa

manifests/meltano-el-postgres.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ spec:
2121
restartPolicy: Never
2222
containers:
2323
- name: meltano
24-
image: ghcr.io/linuxfoundation/lfx-v1-sync-helper/meltano:v0.4.0
24+
image: ghcr.io/linuxfoundation/lfx-v1-sync-helper/meltano:v0.4.2
2525
imagePullPolicy: Always
2626
workingDir: /app/meltano
2727
args:

meltano/load/target-nats-kv/README.md

Lines changed: 0 additions & 111 deletions
This file was deleted.

meltano/load/target-nats-kv/pyproject.toml

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ requires-python = ">=3.11"
99
dependencies = [
1010
"adjust-precision-for-schema>=0.3.4",
1111
"jsonschema>=2.6.0",
12-
"msgpack>=1.1.0",
12+
"msgspec>=0.20.0",
1313
"nats-py>=2.10.0",
14+
"simplejson>=3.17.0",
1415
"singer-python>=6.1.1",
1516
]
1617

@@ -38,7 +39,3 @@ follow_untyped_imports = true
3839
[[tool.mypy.overrides]]
3940
module = ["adjust_precision_for_schema.*"]
4041
follow_untyped_imports = true
41-
42-
[[tool.mypy.overrides]]
43-
module = ["msgpack.*"]
44-
follow_untyped_imports = true

meltano/load/target-nats-kv/src/target_nats_kv/__init__.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@
1212
from pathlib import Path
1313

1414
import jsonschema
15-
import msgpack
15+
import msgspec
1616
import nats
17-
import simplejson as json
17+
import simplejson
1818
import singer
1919
from adjust_precision_for_schema import adjust_decimal_precision_for_schema
20-
from jsonschema import Draft4Validator
2120
from nats.js.kv import KeyValue
2221
from singer.messages import (
2322
ActivateVersionMessage,
@@ -33,7 +32,8 @@ def emit_state(state: dict | None) -> None:
3332
"""Emit the state to stdout in JSON format."""
3433
if state is None:
3534
return
36-
line = json.dumps(state)
35+
state_encoder = msgspec.json.Encoder(decimal_format="number")
36+
line = state_encoder.encode(state).decode("utf-8")
3737
logger.debug("Emitting state %s", line)
3838
sys.stdout.write(f"{line}\n")
3939
sys.stdout.flush()
@@ -71,12 +71,16 @@ async def persist_messages(
7171
schemas: dict[str, dict] = {}
7272
key_properties: dict[str, list[str]] = {}
7373
bookmarks: dict[str, (list[str] | None)] = {}
74-
validators: dict[str, Draft4Validator] = {}
74+
validators: dict[str, jsonschema.Draft4Validator] = {}
75+
json_encoder = msgspec.json.Encoder(decimal_format="number")
76+
msgpack_encoder = msgspec.msgpack.Encoder(decimal_format="number")
77+
json_decoder = msgspec.json.Decoder()
78+
msgpack_decoder = msgspec.msgpack.Decoder()
7579

7680
for message in next_singer_message():
7781
try:
7882
o = singer.parse_message(message)
79-
except json.JSONDecodeError:
83+
except simplejson.JSONDecodeError:
8084
logger.error("Unable to parse: %s", repr(message))
8185
raise
8286

@@ -184,11 +188,10 @@ async def persist_messages(
184188
# we don't know what format was used when the data was
185189
# originally stored.
186190
try:
187-
current_record = msgpack.unpackb(current.value, raw=False)
188-
except (msgpack.exceptions.ExtraData, ValueError):
191+
current_record = msgpack_decoder.decode(current.value)
192+
except (msgspec.DecodeError, ValueError):
189193
# Fallback to JSON if msgpack fails.
190-
current_value = current.value.decode("utf-8")
191-
current_record = json.loads(current_value)
194+
current_record = json_decoder.decode(current.value)
192195

193196
# Check if the record was deleted.
194197
if "_sdc_deleted_at" in current_record:
@@ -246,9 +249,9 @@ async def persist_messages(
246249
if should_update:
247250
# Update with revision
248251
if use_msgpack:
249-
value = msgpack.packb(o.record)
252+
value = msgpack_encoder.encode(o.record)
250253
else:
251-
value = json.dumps(o.record).encode("utf-8")
254+
value = json_encoder.encode(o.record)
252255
await kv_client.update(
253256
key=key,
254257
value=value,
@@ -269,9 +272,9 @@ async def persist_messages(
269272
except nats.js.errors.KeyNotFoundError:
270273
# Key doesn't exist, create it.
271274
if use_msgpack:
272-
value = msgpack.packb(o.record)
275+
value = msgpack_encoder.encode(o.record)
273276
else:
274-
value = json.dumps(o.record).encode("utf-8")
277+
value = json_encoder.encode(o.record)
275278
await kv_client.create(
276279
key=key,
277280
value=value,
@@ -280,9 +283,9 @@ async def persist_messages(
280283
# User has requested "full" sync, so use "put" without
281284
# data checks.
282285
if use_msgpack:
283-
value = msgpack.packb(o.record)
286+
value = msgpack_encoder.encode(o.record)
284287
else:
285-
value = json.dumps(o.record).encode("utf-8")
288+
value = json_encoder.encode(o.record)
286289
await kv_client.put(
287290
key=key,
288291
value=value,
@@ -296,8 +299,8 @@ async def persist_messages(
296299
stream = o.stream
297300
schemas[stream] = o.schema
298301
adjust_decimal_precision_for_schema(schemas[stream])
302+
validators[stream] = jsonschema.Draft4Validator(o.schema)
299303
bookmarks[stream] = o.bookmark_properties
300-
validators[stream] = Draft4Validator(o.schema)
301304
key_properties[stream] = o.key_properties
302305
elif isinstance(o, ActivateVersionMessage):
303306
logger.warning(
@@ -358,8 +361,9 @@ def main() -> None:
358361
args = parser.parse_args()
359362

360363
if args.config:
361-
with open(args.config) as input_json:
362-
config = json.load(input_json)
364+
config_decoder = msgspec.json.Decoder()
365+
with open(args.config, "rb") as input_json:
366+
config = config_decoder.decode(input_json.read())
363367
else:
364368
config = {}
365369

0 commit comments

Comments
 (0)