Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/lfx-v1-sync-helper/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ apiVersion: v2
name: lfx-v1-sync-helper
description: LFX Platform v1 Sync Helper chart
type: application
version: 0.3.3
version: 0.3.4
24 changes: 0 additions & 24 deletions charts/lfx-v1-sync-helper/templates/nats-consumer.yaml

This file was deleted.

28 changes: 0 additions & 28 deletions charts/lfx-v1-sync-helper/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,34 +100,6 @@ nats:
# compression is a boolean to determine if the KV bucket should be compressed
compression: true

# consumer is the configuration for the NATS JetStream consumer that processes v1-objects KV bucket changes
consumer:
# creation is a boolean to determine if the consumer should be created via the helm chart
# set it to false if you want to use an existing consumer
creation: true
# keep is a boolean to determine if the consumer should be preserved during helm uninstall
# set it to false if you want the consumer to be deleted when the chart is uninstalled
keep: true
# name is the name of the consumer
name: v1-sync-helper-kv-consumer
# streamName is the name of the JetStream stream that contains the KV bucket
streamName: KV_v1-objects

# ackPolicy determines how messages are acknowledged
ackPolicy: explicit
# deliverPolicy determines which messages to deliver
# uncomment "lastPerSubject" when it's fixed upstream in NACK
# deliverPolicy: lastPerSubject
deliverPolicy: all
# maxDeliver is the maximum number of delivery attempts
maxDeliver: 3
# ackWait is how long to wait for an acknowledgment
ackWait: 30s
# maxAckPending is the maximum number of unacknowledged messages
maxAckPending: 1000
# filterSubject allows filtering specific keys (use ">" for all)
filterSubject: "$KV.v1-objects.>"

# heimdall is the configuration for JWT impersonation of Heimdall-authorized
# principals for v1 data ingest.
heimdall:
Expand Down
10 changes: 4 additions & 6 deletions v1-sync-helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,17 +241,15 @@ func main() {
streamName := "KV_v1-objects"

consumer, err := jsContext.CreateOrUpdateConsumer(ctx, streamName, jetstream.ConsumerConfig{
Name: consumerName,
Durable: consumerName,
// Uncomment when ready in the chart (see comments in charts/lfx-v1-sync-helper/values.yaml)
// DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy,
DeliverPolicy: jetstream.DeliverAllPolicy,
Name: consumerName,
Durable: consumerName,
DeliverPolicy: jetstream.DeliverLastPerSubjectPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: "$KV.v1-objects.>",
MaxDeliver: 3,
AckWait: 30 * time.Second,
MaxAckPending: 1000,
Description: "Pull consumer for v1-sync-helper to process v1-objects KV bucket changes with load balancing",
Description: "durable/shared KV bucket watcher for v1-sync-helper pods",
})
if err != nil {
logger.With(errKey, err, "consumer", consumerName, "stream", streamName).Error("error creating JetStream pull consumer")
Expand Down