Skip to content

Commit 276514d

Browse files
authored
Merge pull request #23 from linuxfoundation/ems/meltano-jobs
feat: Add Kubernetes Job manifests for Meltano operations
2 parents 89778a1 + f9a4a76 commit 276514d

File tree

16 files changed

+471
-13
lines changed

16 files changed

+471
-13
lines changed

AGENTS.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ LFX v1 Sources → Meltano → NATS KV → v1-sync-helper → LFX One APIs
2222
- **NATS KV Watcher****v1-sync-helper****LFX One Project/Committee Services**
2323
- **JWT Authentication** via Heimdall impersonation for secure API calls
2424
- **ID Mappings** stored in NATS KV bucket (`v1-mappings`)
25+
- **Data Encoding** supports both JSON and MessagePack formats with automatic detection
2526

2627
## Repository Structure
2728

@@ -122,6 +123,11 @@ lfx-v1-sync-helper/
122123
- **Non-ephemeral consumer** for reliability
123124
- **Load-balanced message processing** across instances
124125

126+
#### Data Format Handling
127+
- **Automatic Detection**: Tries MessagePack first, falls back to JSON
128+
- **Backward Compatible**: Can read both JSON and MessagePack encoded data
129+
- **Format Agnostic**: Processing logic unchanged regardless of encoding format
130+
125131
### Python ETL (Meltano)
126132

127133
#### Configuration Structure
@@ -134,6 +140,11 @@ lfx-v1-sync-helper/
134140
- **PostgreSQL**: Projects and committees data
135141
- **NATS KV**: Target for all extracted data
136142

143+
#### Data Format Support
144+
- **JSON** (default): Standard JSON encoding for record storage
145+
- **MessagePack**: Compact binary serialization with `msgpack: true` configuration
146+
- **Automatic Detection**: Both Go service and Python plugin automatically detect format when reading existing data
147+
137148
## CI/CD Integration
138149

139150
### GitHub Actions Workflows
@@ -175,6 +186,12 @@ lfx-v1-sync-helper/
175186
- Maintain `pyproject.toml` and `uv.lock` consistency
176187
- Environment-based configuration
177188

189+
### Data Serialization
190+
- **target-nats-kv** supports both JSON and MessagePack encoding
191+
- Set `msgpack: true` in Meltano configuration to enable MessagePack
192+
- Automatic format detection when reading existing data for compatibility
193+
- Go service handles both formats transparently
194+
178195
### Container Standards
179196
- Multi-stage builds for size optimization
180197
- Non-root execution for security

cmd/lfx-v1-sync-helper/handlers.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strings"
1111

1212
"github.com/nats-io/nats.go/jetstream"
13+
"github.com/vmihailenco/msgpack/v5"
1314
)
1415

1516
// shouldSkipSync checks if the record was last modified by this service and
@@ -51,11 +52,17 @@ func kvHandler(entry jetstream.KeyValueEntry) {
5152
func handleKVPut(ctx context.Context, entry jetstream.KeyValueEntry) {
5253
key := entry.Key()
5354

54-
// Parse the JSON data
55+
// Parse the data (try JSON first, then msgpack)
5556
var v1Data map[string]any
5657
if err := json.Unmarshal(entry.Value(), &v1Data); err != nil {
57-
logger.With(errKey, err, "key", key).ErrorContext(ctx, "failed to unmarshal KV entry data")
58-
return
58+
// JSON failed, try msgpack
59+
if msgErr := msgpack.Unmarshal(entry.Value(), &v1Data); msgErr != nil {
60+
logger.With(errKey, err, "msgpack_error", msgErr, "key", key).ErrorContext(ctx, "failed to unmarshal KV entry data as JSON or msgpack")
61+
return
62+
}
63+
logger.With("key", key).DebugContext(ctx, "successfully unmarshalled msgpack data")
64+
} else {
65+
logger.With("key", key).DebugContext(ctx, "successfully unmarshalled JSON data")
5966
}
6067

6168
// Check if we should skip this sync operation.

cmd/lfx-v1-sync-helper/handlers_users.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func updateUserAlternateEmails(ctx context.Context, userSfid, emailSfid string,
125125

126126
// If we get here, there was an unexpected error.
127127
logger.With("error", saveErr, "key", mappingKey, "attempt", attempt).
128-
ErrorContext(ctx, "unexpected error during save operation")
128+
WarnContext(ctx, "unexpected error during save operation")
129129

130130
if attempt == maxRetries {
131131
logger.With("key", mappingKey, "maxRetries", maxRetries).

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/nats-io/nats.go v1.47.0
1515
github.com/patrickmn/go-cache v2.1.0+incompatible
1616
github.com/teambition/rrule-go v1.8.2
17+
github.com/vmihailenco/msgpack/v5 v5.4.1
1718
goa.design/goa/v3 v3.23.2
1819
golang.org/x/oauth2 v0.33.0
1920
)
@@ -36,6 +37,7 @@ require (
3637
github.com/nats-io/nuid v1.0.1 // indirect
3738
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
3839
github.com/segmentio/asm v1.2.1 // indirect
40+
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
3941
go.devnw.com/structs v1.0.0 // indirect
4042
golang.org/x/crypto v0.45.0 // indirect
4143
golang.org/x/sys v0.38.0 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
6565
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
6666
github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8=
6767
github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4=
68+
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
69+
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
70+
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
71+
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
6872
go.devnw.com/structs v1.0.0 h1:FFkBoBOkapCdxFEIkpOZRmMOMr9b9hxjKTD3bJYl9lk=
6973
go.devnw.com/structs v1.0.0/go.mod h1:wHBkdQpNeazdQHszJ2sxwVEpd8zGTEsKkeywDLGbrmg=
7074
goa.design/goa/v3 v3.23.2 h1:i/JWSoD6lLc9O7ckm/+5N5lKw0mzgRPI5KZHmN7wF50=

manifests/meltano-dragon-job.yaml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# Copyright The Linux Foundation and each contributor to LFX.
2+
# SPDX-License-Identifier: MIT
3+
#
4+
# Summon a dragon ... Kubernetes style!
5+
#
6+
# PLEASE NOTE: this is a fun sample invocation, but it also emits numerous
7+
# non-JSON "log" lines into our centralized logging platform.
8+
---
9+
apiVersion: batch/v1
10+
kind: Job
11+
metadata:
12+
name: meltano-dragon-job
13+
namespace: v1-sync-helper
14+
labels:
15+
app.kubernetes.io/name: lfx-v1-sync-helper
16+
app.kubernetes.io/component: meltano
17+
spec:
18+
ttlSecondsAfterFinished: 600 # Clean up job after 10 minutes
19+
backoffLimit: 3
20+
template:
21+
metadata:
22+
labels:
23+
app.kubernetes.io/name: lfx-v1-sync-helper
24+
app.kubernetes.io/component: meltano
25+
spec:
26+
restartPolicy: Never
27+
containers:
28+
- name: meltano
29+
image: ghcr.io/linuxfoundation/lfx-v1-sync-helper/meltano:latest
30+
imagePullPolicy: Always
31+
workingDir: /app/meltano
32+
args: ["dragon"]
33+
resources:
34+
requests:
35+
memory: "512Mi"
36+
cpu: "250m"
37+
limits:
38+
memory: "2Gi"
39+
cpu: "1000m"
40+
# Health check - Meltano jobs don't typically have health endpoints
41+
# but we can check if the process is running
42+
livenessProbe:
43+
exec:
44+
command: ["pgrep", "-f", "meltano"]
45+
initialDelaySeconds: 30
46+
periodSeconds: 60
47+
failureThreshold: 3
48+
# Use service account with appropriate permissions for AWS
49+
serviceAccountName: v1-sync-helper-sa

manifests/meltano-el-postgres.yaml

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Copyright The Linux Foundation and each contributor to LFX.
2+
# SPDX-License-Identifier: MIT
3+
---
4+
apiVersion: batch/v1
5+
kind: Job
6+
metadata:
7+
name: meltano-el-postgres
8+
namespace: v1-sync-helper
9+
labels:
10+
app.kubernetes.io/name: lfx-v1-sync-helper
11+
app.kubernetes.io/component: meltano
12+
spec:
13+
ttlSecondsAfterFinished: 3600 # Clean up job after 1 hour
14+
backoffLimit: 3
15+
template:
16+
metadata:
17+
labels:
18+
app.kubernetes.io/name: lfx-v1-sync-helper
19+
app.kubernetes.io/component: meltano
20+
spec:
21+
restartPolicy: Never
22+
containers:
23+
- name: meltano
24+
image: ghcr.io/linuxfoundation/lfx-v1-sync-helper/meltano:v0.4.0
25+
imagePullPolicy: Always
26+
workingDir: /app/meltano
27+
args:
28+
- el
29+
# Catalog from mounted ConfigMap
30+
- "--catalog"
31+
- "/catalogs/tap-postgres/catalog.json"
32+
# State ID for this el job
33+
- "--state-id"
34+
- platform-db-incremental
35+
# Source
36+
- tap-postgres
37+
# Target
38+
- target-nats-kv
39+
env:
40+
# Environment (prod/staging/dev per meltano.yml "environments" list)
41+
- name: MELTANO_ENVIRONMENT
42+
value: "prod"
43+
# S3 state backend (prod/staging/dev suffix from OpenTofu workspace names)
44+
- name: MELTANO_STATE_BACKEND_URI
45+
value: "s3://lfx-v2-meltano-state-prod"
46+
# AWS configuration for DynamoDB access
47+
- name: AWS_DEFAULT_REGION
48+
value: "us-west-2"
49+
- name: AWS_REGION
50+
value: "us-west-2"
51+
# Postgres configuration
52+
- name: TAP_POSTGRES_HOST
53+
valueFrom:
54+
secretKeyRef:
55+
name: postgres-credentials-ad-hoc
56+
key: host
57+
optional: true
58+
- name: TAP_POSTGRES_PORT
59+
value: "5432"
60+
- name: TAP_POSTGRES_DATABASE
61+
value: "sfdc"
62+
- name: TAP_POSTGRES_USER
63+
valueFrom:
64+
secretKeyRef:
65+
name: postgres-credentials-ad-hoc
66+
key: username
67+
optional: true
68+
- name: TAP_POSTGRES_PASSWORD
69+
valueFrom:
70+
secretKeyRef:
71+
name: postgres-credentials-ad-hoc
72+
key: password
73+
optional: true
74+
# Target NATS KV configuration
75+
- name: TARGET_NATS_KV_URL
76+
value: "nats://lfx-platform-nats.lfx.svc.cluster.local:4222"
77+
- name: TARGET_NATS_KV_BUCKET
78+
value: "v1-objects"
79+
- name: TARGET_NATS_KV_REFRESH_MODE
80+
value: "newer"
81+
- name: TARGET_NATS_KV_VALIDATE_RECORDS
82+
# (true for PostgreSQL; false for DynamoDB)
83+
value: "true"
84+
- name: TARGET_NATS_KV_MSGPACK
85+
value: "true"
86+
resources:
87+
requests:
88+
memory: "512Mi"
89+
cpu: "250m"
90+
limits:
91+
memory: "2Gi"
92+
cpu: "1000m"
93+
# Health check - Meltano jobs don't typically have health endpoints
94+
# but we can check if the process is running
95+
livenessProbe:
96+
exec:
97+
command: ["pgrep", "-f", "meltano"]
98+
initialDelaySeconds: 30
99+
periodSeconds: 60
100+
failureThreshold: 3
101+
volumeMounts:
102+
- name: tap-postgres-catalog
103+
mountPath: /catalogs/tap-postgres
104+
readOnly: true
105+
volumes:
106+
- name: tap-postgres-catalog
107+
configMap:
108+
name: tap-postgres-catalog
109+
# Use service account with appropriate permissions for AWS
110+
serviceAccountName: v1-sync-helper-sa

manifests/meltano-list-state.yaml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Copyright The Linux Foundation and each contributor to LFX.
2+
# SPDX-License-Identifier: MIT
3+
---
4+
apiVersion: batch/v1
5+
kind: Job
6+
metadata:
7+
name: meltano-list-state
8+
namespace: v1-sync-helper
9+
labels:
10+
app.kubernetes.io/name: lfx-v1-sync-helper
11+
app.kubernetes.io/component: meltano
12+
spec:
13+
ttlSecondsAfterFinished: 600 # Clean up job after 10 minutes
14+
backoffLimit: 3
15+
template:
16+
metadata:
17+
labels:
18+
app.kubernetes.io/name: lfx-v1-sync-helper
19+
app.kubernetes.io/component: meltano
20+
spec:
21+
restartPolicy: Never
22+
containers:
23+
- name: meltano
24+
image: ghcr.io/linuxfoundation/lfx-v1-sync-helper/meltano:latest
25+
imagePullPolicy: Always
26+
workingDir: /app/meltano
27+
args: ["state", "list"]
28+
env:
29+
# Environment (prod/staging/dev per meltano.yml "environments" list)
30+
- name: MELTANO_ENVIRONMENT
31+
value: "prod"
32+
# S3 state backend (prod/staging/dev suffix from OpenTofu workspace names)
33+
- name: MELTANO_STATE_BACKEND_URI
34+
value: "s3://lfx-v2-meltano-state-prod"
35+
# AWS configuration for DynamoDB access
36+
- name: AWS_DEFAULT_REGION
37+
value: "us-west-2"
38+
- name: AWS_REGION
39+
value: "us-west-2"
40+
resources:
41+
requests:
42+
memory: "512Mi"
43+
cpu: "250m"
44+
limits:
45+
memory: "2Gi"
46+
cpu: "1000m"
47+
# Health check - Meltano jobs don't typically have health endpoints
48+
# but we can check if the process is running
49+
livenessProbe:
50+
exec:
51+
command: ["pgrep", "-f", "meltano"]
52+
initialDelaySeconds: 30
53+
periodSeconds: 60
54+
failureThreshold: 3
55+
# Use service account with appropriate permissions for AWS
56+
serviceAccountName: v1-sync-helper-sa

0 commit comments

Comments
 (0)