-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathmain.go
More file actions
167 lines (132 loc) · 4.16 KB
/
main.go
File metadata and controls
167 lines (132 loc) · 4.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Copyright AGNTCY Contributors (https://github.com/agntcy)
// SPDX-License-Identifier: Apache-2.0
// Package main is the entry point for the reconciler service.
package main
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
corev1 "github.com/agntcy/dir/api/core/v1"
"github.com/agntcy/dir/reconciler/config"
"github.com/agntcy/dir/reconciler/service"
"github.com/agntcy/dir/server/database"
"github.com/agntcy/dir/server/store/oci"
"github.com/agntcy/dir/utils/logging"
)
const (
// defaultHealthPort is the default port for the health check endpoint.
defaultHealthPort = ":8080"
// healthCheckTimeout is the timeout for health check operations.
healthCheckTimeout = 5 * time.Second
)
var logger = logging.Logger("reconciler")
func main() {
if err := run(); err != nil {
logger.Error("Reconciler failed", "error", err)
os.Exit(1)
}
}
//nolint:wrapcheck,cyclop
func run() error {
logger.Info("Starting reconciler service")
// Load configuration
cfg, err := config.LoadConfig()
if err != nil {
return err
}
// Initialize OASF validator for record validation
if cfg.SchemaURL != "" {
if err := corev1.InitializeValidator(cfg.SchemaURL); err != nil {
return fmt.Errorf("failed to initialize OASF validator: %w", err)
}
logger.Info("OASF validator initialized", "schema_url", cfg.SchemaURL)
} else {
logger.Warn("OASF schema URL not configured, record validation will be skipped")
}
// Create database connection
db, err := database.New(cfg.Database)
if err != nil {
return err
}
defer db.Close()
// Create OCI store for accessing the local registry
store, err := oci.New(cfg.LocalRegistry)
if err != nil {
return err
}
// Create ORAS repository client for registry operations (e.g., listing tags)
repo, err := oci.NewORASRepository(cfg.LocalRegistry)
if err != nil {
return err
}
// Create service with all tasks registered
svc, err := service.New(cfg, db, store, repo)
if err != nil {
return err
}
// Create context that listens for signals
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start health check server with database and store readiness check
healthServer := startHealthServer(func(ctx context.Context) bool {
return db.IsReady(ctx) && store.IsReady(ctx)
})
// Start the service
if err := svc.Start(ctx); err != nil {
return err
}
// Wait for termination signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
sig := <-sigCh
logger.Info("Received signal, shutting down", "signal", sig)
// Cancel context to stop tasks
cancel()
// Stop the service
if err := svc.Stop(); err != nil {
logger.Error("Failed to stop service gracefully", "error", err)
}
// Shutdown health server
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), healthCheckTimeout)
defer shutdownCancel()
if err := healthServer.Shutdown(shutdownCtx); err != nil {
logger.Error("Failed to shutdown health server", "error", err)
}
logger.Info("Reconciler service stopped")
return nil
}
// startHealthServer starts a simple HTTP health check server.
func startHealthServer(readinessCheck func(ctx context.Context) bool) *http.Server {
mux := http.NewServeMux()
// Liveness probe - always returns OK if the process is running
mux.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
})
// Readiness probe - checks database connectivity
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout)
defer cancel()
if readinessCheck(ctx) {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
})
port := os.Getenv("HEALTH_PORT")
if port == "" {
port = defaultHealthPort
}
server := &http.Server{Addr: port, Handler: mux, ReadHeaderTimeout: healthCheckTimeout}
go func() {
logger.Info("Starting health check server", "address", port)
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("Health check server error", "error", err)
}
}()
return server
}