-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcommittee.go
More file actions
107 lines (94 loc) · 3.33 KB
/
committee.go
File metadata and controls
107 lines (94 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT
package main
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/linuxfoundation/lfx-v2-mailing-list-service/cmd/mailing-list-api/service"
internalService "github.com/linuxfoundation/lfx-v2-mailing-list-service/internal/service"
"github.com/linuxfoundation/lfx-v2-mailing-list-service/pkg/constants"
"github.com/nats-io/nats.go"
)
// handleCommitteeSync sets up and starts committee event subscriptions
// Pattern: mirrors handleHTTPServer - does both setup and start in one function
func handleCommitteeSync(ctx context.Context, wg *sync.WaitGroup) error {
slog.InfoContext(ctx, "starting committee sync")
// Get dependencies (all inside function - mirrors handleHTTPServer)
mailingListReader := service.GrpsIOReader(ctx)
memberWriter := service.GrpsIOWriterOrchestrator(ctx) // Use orchestrator for message publishing
memberReader := service.GrpsIOReader(ctx)
entityReader := service.EntityAttributeRetriever(ctx)
natsClient := service.GetNATSClient(ctx)
// Create committee sync service
syncService := internalService.NewCommitteeSyncService(
mailingListReader,
memberWriter,
memberReader,
entityReader,
)
// Subscribe to all committee event subjects
subjects := []string{
constants.CommitteeMemberCreatedSubject,
constants.CommitteeMemberDeletedSubject,
constants.CommitteeMemberUpdatedSubject,
}
for _, subject := range subjects {
// Capture loop variable for closure
subject := subject
_, subErr := natsClient.QueueSubscribe(
subject,
constants.MailingListAPIQueue,
func(msg *nats.Msg) {
// Check if service is shutting down
select {
case <-ctx.Done():
slog.InfoContext(ctx, "rejecting message - service shutting down",
"subject", msg.Subject)
if nakErr := msg.Nak(); nakErr != nil {
slog.ErrorContext(ctx, "failed to nak message during shutdown", "error", nakErr)
}
return
default:
// Continue processing
}
// Create fresh context with timeout for this message
// Not derived from shutdown context to avoid cancellation issues
msgCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Process message with proper error handling and acknowledgment
if handleErr := syncService.HandleMessage(msgCtx, msg); handleErr != nil {
slog.ErrorContext(msgCtx, "failed to process committee event, will retry",
"error", handleErr,
"subject", msg.Subject)
if nakErr := msg.Nak(); nakErr != nil {
slog.ErrorContext(msgCtx, "failed to nak message", "error", nakErr)
}
} else {
// Success - acknowledge message
if ackErr := msg.Ack(); ackErr != nil {
slog.ErrorContext(msgCtx, "failed to ack message", "error", ackErr)
}
}
},
)
if subErr != nil {
return fmt.Errorf("failed to subscribe to %s: %w", subject, subErr)
}
slog.InfoContext(ctx, "subscribed to committee event",
"subject", subject,
"queue", constants.MailingListAPIQueue)
}
slog.InfoContext(ctx, "committee sync started successfully")
// Graceful shutdown (mirrors handleHTTPServer)
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
slog.InfoContext(ctx, "shutting down committee sync")
// NATS client cleanup handled by existing Close() in main shutdown
}()
return nil
}