Skip to content

Add PostgreSQL WAL listener component#26

Merged
emsearcy merged 14 commits intomainfrom
feat/wal-listener
Jan 20, 2026
Merged

Add PostgreSQL WAL listener component#26
emsearcy merged 14 commits intomainfrom
feat/wal-listener

Conversation

@emsearcy
Copy link
Contributor

@emsearcy emsearcy commented Jan 7, 2026

Adds a PostgreSQL WAL listener component to the Helm chart for real-time database change streaming to NATS.

The WAL listener monitors PostgreSQL tables and publishes change events to NATS, complementing the existing Meltano ETL pipeline with real-time data synchronization capabilities.

🤖 Generated with GitHub Copilot (via Zed)

Issue: LFXV2-991

- Add wal-listener Deployment with ihippik/wal-listener container
- Add ConfigMap for table filter configuration (other config via env vars)
- Add Service for wal-listener health endpoints
- Configure database credentials using same secret as meltano-el-postgres
- Use environment variables with WAL_ prefix for wal-listener configuration
- Default monitored tables match meltano.yml tap-postgres configuration
- Rename deployment templates for consistency (app-deployment.yaml, wal-listener-*)
- Add wal-listener documentation to Helm chart README
- Document dynamic chart versioning in AGENTS.md
- Use correct health check endpoints (/healthz, /ready)

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
- Rename postgres secret from 'postgres-credentials-ad-hoc' to 'v1-platform-db-credentials'
- Rename 'Environment Variables' section to 'App Component' in README
- Move image, replicas, heimdall, and auth0 sections under app component
- Rename top-level 'nats' section to 'natsResources' for clarity
- Update all template references to use new nested paths

This refactoring improves chart organization by clearly separating:
- app: Main v1-sync-helper service configuration
- walListener: PostgreSQL WAL listener component configuration
- natsResources: NATS KV bucket resource management
- serviceAccount: Shared Kubernetes service account configuration

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
- Update intro to mention both Meltano and WAL listener
- Change Helm charts description to mention custom app service and WAL listener
- Remove outdated caveat about planned PostgreSQL realtime sync

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
- Add enableTLS and sslMode configuration to DatabaseCfg
- Update wal-listener configmap template with TLS fields
- Set enableTLS: false by default in values.yaml
- Support three SSL modes: require, verify-ca, verify-full
- Simplify TLS implementation without certificate file complexity

This enables encrypted PostgreSQL connections for wal-listener
deployments to resolve 'no encryption' connection errors.

Note this is NOT in the upstream wal-listener container (see PR #66).
Depending on the progress of that PR, may need to use a forked
repository and container deployment and override the container image in
the Helm chart values to use our custom build.

This commit also makes a minor change to the Postgres column used for incremental
Meltano syncs: in the Salesforce world, lastmodifieddate is for user
changes, while systemmodstamp is for all updates, including both user
changes and noninteractive system account changes; therefore you would
not expect lastmodifieddate to ever be later than systemmodstamp. In our
LFX decoupled ecosystem, it seems this is *mostly* true: there are
unfortunately plenty of records where lastmodifieddate is later than
systemmodstamp, but many many more where systemmodstamp is later than
lastmodifieddate. Therefore neither is perfect, but systemmodstamp seems
to be the better choice, and we will rely on WAL replication or
monthly Meltano full-refreshes for the edge cases.

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
- Add stream_wal_listener configuration to natsResources in values.yaml
- Create JetStream stream named 'wal_listener' subscribing to 'wal_listener.*'
- Default 2-week retention (336h) with unlimited storage and messages
- Use S2 compression by default with limits-based retention policy
- Proper naming: k8s resource uses hyphens, NATS stream uses underscores
- Rename nats-kv-buckets.yaml to nats-resources.yaml for better organization

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
… synchronization

- Add new ingest_wal.go with WAL event processing for INSERT, UPDATE, DELETE operations
- Enable WAL listener consumer in main.go with proper JetStream pull consumer configuration
- Implement smart upsert logic that only updates if systemmodstamp OR lastmodifieddate is newer
- Add configurable JSON/MessagePack encoding support (USE_MSGPACK environment variable)
- Use PostgreSQL commit timestamps for _sdc_extracted_at and _sdc_deleted_at fields
- Support dynamic key generation using {schema}-{table}.{sfid} format
- Add proper boolean parsing for environment variables (true/yes/t/y/1)
- Include data validation and comprehensive logging for monitoring
- Remove old placeholder code, and drop the unnecessary "queue" (delivery group) from the pull consumer configuration

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
- Move KV watch mock abstractions to new kv_watcher.go file
- Replace manual Fetch loops with Consume method for both KV and WAL consumers
- Add inline error handlers matching nats.Connect style
- Implement message-level retry logic with NAK/ACK handling:
  - Retry on KV revision mismatches (JSErrCodeStreamWrongLastSequence)
  - Retry on failed creates (ErrKeyExists)
  - Random sleep before retry to reduce collision chances
- Collapse walIngestHandler retry logic to reduce main() complexity
- Add graceful shutdown timing with debug logs around gracefulCloseWG
- Proper consumer draining sequence: consumers → context → NATS connection

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
- Fix revive errors for unused consCtx parameters in main.go
- Remove Agency and Country field references for committee service v0.2.19+ compatibility
- Clean up unnecessary fmt.Sprintf calls and unused variables/functions
- Update dependencies to latest versions

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
- Convert static manifest to parameterized Helm template
- Replace direct environment variables with structured parameters
- Build Meltano arguments dynamically in correct order
- Remove hardcoded production values and ad-hoc secret references
- Use consistent secret naming patterns matching app service
- Make catalog ConfigMap usage optional and user-provided
- Default to dev environment with proper S3 bucket naming
- Hardcode workingDir and livenessProbe for simplicity
- Follow Helm best practices with proper labeling and conditionals

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
…roved mappings

- Add tombstone functionality using 🪦 emoji, instead of deleting the
  v1->v2 mappings (also update docs)
- Implement deletion handlers for projects, committees, and committee members
- Update upsert handlers to skip tombstoned mappings instead of treating as creates
- Consolidate v1 principal extraction for both upserts and soft deletes
- Enhance v2 committee-member mappings with new compound value formats
  (needed for deletions)
- Add reverse mappings for all object types, using compound values where
  needed
- Change Meltano cronjob Helm defaults to use JSON v1-objects encoding
  instead of messagepack for consistency with the similar configuration
  for the v1-sync-helper WAL ingestion, which also defaults to JSON

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
Added component: app label to both selector and pod template in app
deployment to create unique selectors. This prevents kubectl logs
deploy/lfx-v1-sync-helper-app from incorrectly matching wal-listener
pods due to overlapping label selectors.

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
- Add etag fetching and IfMatch header to deleteCommittee function
- Add etag fetching and IfMatch header to deleteCommitteeMember function
- Add etag fetching and IfMatch header to deleteProject function
- Ensures all delete operations handle conditional requests properly
- Follows same pattern as existing update operations

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
@emsearcy emsearcy marked this pull request as ready for review January 16, 2026 21:30
@emsearcy emsearcy requested a review from a team as a code owner January 16, 2026 21:30
Copilot AI review requested due to automatic review settings January 16, 2026 21:30
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a PostgreSQL WAL (Write-Ahead Log) listener component to enable real-time database change streaming to NATS, complementing the existing Meltano ETL pipeline. The changes include significant refactoring of the sync helper service to support deletion operations, improved error handling with retry logic, and restructured Helm charts to accommodate multiple components.

Changes:

  • Adds PostgreSQL WAL listener deployment with configurable filtering and NATS publishing
  • Implements deletion handling (both hard and soft deletes) with tombstoning for mappings
  • Refactors consumer patterns from manual fetch loops to the JetStream Consume API with proper error handling
  • Updates dependencies across Go modules and Python packages

Reviewed changes

Copilot reviewed 29 out of 32 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
go.mod, go.sum Updates Go dependencies to newer versions
uv.lock, pyproject.toml Updates Python dependencies including Meltano 4.0.6→4.0.8
meltano/meltano.yml Adds default incremental replication configuration
meltano/load/target-nats-kv/uv.lock Updates target plugin dependencies
cmd/lfx-v1-sync-helper/main.go Refactors consumer patterns and adds WAL listener subscription
cmd/lfx-v1-sync-helper/kv_watcher.go Extracts KV message handling logic into separate file
cmd/lfx-v1-sync-helper/ingest_wal.go New file implementing WAL event processing
cmd/lfx-v1-sync-helper/handlers*.go Adds deletion handlers and retry logic
cmd/lfx-v1-sync-helper/config.go Adds boolean environment variable parsing and UseMsgpack config
cmd/lfx-v1-sync-helper/client_*.go Adds delete operations for projects and committees
charts/lfx-v1-sync-helper/templates/*.yaml Restructures templates for multi-component deployment
charts/lfx-v1-sync-helper/values.yaml Reorganizes values for app, WAL listener, and Meltano components
manifests/*.yaml Removes standalone manifests (replaced by Helm chart)
README.md, AGENTS.md Updates documentation for new components

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +423 to +440
func isRevisionMismatchError(err error) bool {
// Attempt direct JetStreamError comparison.
if jsErr, ok := err.(jetstream.JetStreamError); ok {
if apiErr := jsErr.APIError(); apiErr != nil {
return apiErr.ErrorCode == jetstream.JSErrCodeStreamWrongLastSequence
}
}

// Check for NATS error strings containing the expected error codes.
errStr := err.Error()
if strings.Contains(errStr, "err_code=10071") ||
strings.Contains(errStr, "wrong last sequence") ||
strings.Contains(errStr, "key exists") {
return true
}

return false
}
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function duplicates error detection logic that already exists in handlers_users.go (used in handleAlternateEmailUpdate). Consider extracting this to a shared utility function to avoid code duplication and ensure consistent error handling across the codebase.

Copilot uses AI. Check for mistakes.
Comment on lines +32 to +36
host: localhost
port: {{ .Values.walListener.config.database.port }}
name: {{ .Values.walListener.config.database.name | quote }}
user: placeholder
password: placeholder
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The database host is hardcoded to "localhost" and credentials use placeholder values in the config file. While these are overridden by environment variables, this configuration could cause confusion or be accidentally used. Consider using template variables or adding clear comments explaining the values are overridden.

Suggested change
host: localhost
port: {{ .Values.walListener.config.database.port }}
name: {{ .Values.walListener.config.database.name | quote }}
user: placeholder
password: placeholder
# NOTE: The database connection details below are placeholders and are
# overridden by environment variables; these values are not used in production.
host: "OVERRIDDEN_BY_ENV"
port: {{ .Values.walListener.config.database.port }}
name: {{ .Values.walListener.config.database.name | quote }}
user: "OVERRIDDEN_BY_ENV"
password: "OVERRIDDEN_BY_ENV"

Copilot uses AI. Check for mistakes.
andrest50
andrest50 previously approved these changes Jan 16, 2026
- Add tombstoneMarker constant to centralize tombstone character definition
- Replace emoji "🪦" with string "!del" for better compatibility
- Update tombstoneMapping() and isTombstonedMapping() functions to use the constant
- Improves code maintainability and eliminates emoji encoding issues

🤖 Generated with [GitHub Copilot](https://github.com/features/copilot) (via Zed)

Signed-off-by: Eric Searcy <eric@linuxfoundation.org>
@emsearcy emsearcy merged commit 30c24a4 into main Jan 20, 2026
3 checks passed
@emsearcy emsearcy deleted the feat/wal-listener branch January 20, 2026 19:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants