-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.go
More file actions
455 lines (382 loc) · 13.2 KB
/
main.go
File metadata and controls
455 lines (382 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
package main
import (
"context"
"flag"
"fmt"
"log"
"log/slog"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/brunoscheufler/gopherconuk25/cli"
"github.com/brunoscheufler/gopherconuk25/constants"
"github.com/brunoscheufler/gopherconuk25/proxy"
"github.com/brunoscheufler/gopherconuk25/restapi"
"github.com/brunoscheufler/gopherconuk25/store"
"github.com/brunoscheufler/gopherconuk25/telemetry"
)
// Config holds all configuration parameters for running the application
type Config struct {
// CLI configuration
CLIMode bool
Theme string
Port string
LogLevel string
// Proxy configuration
ProxyMode bool
ProxyID int
ProxyPort int
// Load generator configuration
EnableLoadGen bool
AccountCount int
NotesPerAccount int
RequestsPerMin int
}
// AppConfig groups common application dependencies to reduce parameter lists
type AppConfig struct {
AccountStore store.AccountStore
NoteStore store.NoteStore
DeploymentController *proxy.DeploymentController
Telemetry *telemetry.Telemetry
}
func main() {
cliMode := flag.Bool("cli", false, "Run in CLI mode with TUI")
theme := flag.String("theme", "dark", "Theme for CLI mode (dark or light)")
port := flag.String("port", constants.DefaultPort, "Port to run the HTTP server on")
logLevel := flag.String("log-level", "", "Log level (DEBUG, INFO, WARN, ERROR). Defaults to DEBUG")
// Proxy flags
proxyMode := flag.Bool("proxy", false, "Run as data proxy")
proxyID := flag.Int("proxy-id", 0, "proxy ID to use (required with --proxy)")
proxyPort := flag.Int("proxy-port", 0, "Port for data proxy (required with --proxy)")
// Load generator flags
enableLoadGen := flag.Bool("gen", false, "Enable load generator")
accountCount := flag.Int("concurrency", 5, "Number of accounts for load generator")
notesPerAccount := flag.Int("notes-per-account", 3, "Number of notes per account for load generator")
requestsPerMin := flag.Int("rpm", 60, "Requests per minute for load generator")
// Clean flag
clean := flag.Bool("clean", false, "Delete the .data directory before starting")
flag.Parse()
// Handle clean flag if specified
if *clean {
dataDir := ".data"
if err := os.RemoveAll(dataDir); err != nil {
log.Printf("Warning: Failed to remove %s directory: %v", dataDir, err)
} else {
log.Printf("Cleaned %s directory", dataDir)
}
}
config := Config{
CLIMode: *cliMode,
Theme: *theme,
Port: *port,
LogLevel: *logLevel,
ProxyMode: *proxyMode,
ProxyPort: *proxyPort,
ProxyID: *proxyID,
EnableLoadGen: *enableLoadGen,
AccountCount: *accountCount,
NotesPerAccount: *notesPerAccount,
RequestsPerMin: *requestsPerMin,
}
if err := Run(config); err != nil {
log.Fatal(err)
}
}
// checkPortAvailable checks if the given port is available for binding
func checkPortAvailable(port string) error {
listener, err := net.Listen("tcp", port)
if err != nil {
return fmt.Errorf("port %s is not available: %w", port, err)
}
listener.Close()
return nil
}
// checkServerHealth validates that the server is ready by calling /healthz
func checkServerHealth(port string) error {
client := &http.Client{Timeout: constants.HealthCheckTimeout}
url := fmt.Sprintf("http://localhost%s/healthz", port)
// Retry health check with configured retries and intervals
for i := 0; i < constants.MaxHealthCheckRetries; i++ {
resp, err := client.Get(url)
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
return nil
}
}
time.Sleep(constants.HealthCheckRetryInterval)
}
return fmt.Errorf("server health check failed after retries")
}
// ApplicationComponents holds the initialized components needed to run the application
type ApplicationComponents struct {
AccountStore store.AccountStore
NoteStore store.NoteStore
DeploymentController *proxy.DeploymentController
Telemetry *telemetry.Telemetry
HTTPServer *http.Server
Simulator *Simulator
}
// initializeStores creates and initializes the account and note stores
func initializeStores(tel *telemetry.Telemetry) (store.AccountStore, store.NoteStore, *proxy.DeploymentController, error) {
// Create account store first
accountStore, err := store.NewAccountStore(store.DefaultStoreOptions("accounts", tel.GetLogger()))
if err != nil {
return nil, nil, nil, fmt.Errorf("could not create account store: %w", err)
}
// Create deployment controller with telemetry and account store
deploymentController := proxy.NewDeploymentController(tel, accountStore)
// Perform initial deployment
if err := deploymentController.Deploy(); err != nil {
accountStore.Close() // Clean up account store if deployment fails
return nil, nil, nil, fmt.Errorf("could not perform initial deployment: %w", err)
}
// Use deployment controller as note store
noteStore := deploymentController
return accountStore, noteStore, deploymentController, nil
}
// setupTelemetry creates and starts the telemetry system
func setupTelemetry(cliMode bool, logLevel string) *telemetry.Telemetry {
var options []telemetry.TelemetryOption
if cliMode {
options = append(options, telemetry.WithCLIMode(true))
}
if logLevel != "" {
options = append(options, telemetry.WithLogLevel(logLevel))
}
tel := telemetry.New(options...)
tel.SetupGlobalLogger()
tel.Start()
return tel
}
// createSimulator creates a load generator simulator if enabled
func createSimulator(config Config, tel *telemetry.Telemetry, port string) *Simulator {
if !config.EnableLoadGen {
return nil
}
simOptions := SimulatorOptions{
AccountCount: config.AccountCount,
NotesPerAccount: config.NotesPerAccount,
RequestsPerMin: config.RequestsPerMin,
ServerPort: port,
}
return NewSimulator(tel, simOptions)
}
// preparePort ensures the port has the correct format and checks availability
func preparePort(port string) (string, error) {
// Ensure port has colon prefix
if len(port) == 0 || port[0] != ':' {
port = ":" + port
}
// Check if port is available before doing any other work
if err := checkPortAvailable(port); err != nil {
return "", err
}
return port, nil
}
// initializeApplication sets up all application components
func initializeApplication(config Config) (*ApplicationComponents, error) {
port, err := preparePort(config.Port)
if err != nil {
return nil, err
}
// Create telemetry first so it can be passed to all components
tel := setupTelemetry(config.CLIMode, config.LogLevel)
accountStore, noteStore, deploymentController, err := initializeStores(tel)
if err != nil {
return nil, err
}
deploymentController.StartInstrument()
appConfig := &AppConfig{
AccountStore: accountStore,
NoteStore: noteStore,
DeploymentController: deploymentController,
Telemetry: tel,
}
httpServer := createHTTPServer(appConfig, port)
simulator := createSimulator(config, tel, port)
return &ApplicationComponents{
AccountStore: accountStore,
NoteStore: noteStore,
DeploymentController: deploymentController,
Telemetry: tel,
HTTPServer: httpServer,
Simulator: simulator,
}, nil
}
func Run(config Config) error {
// Handle proxy mode first
if config.ProxyMode {
if config.ProxyPort == 0 {
return fmt.Errorf("--proxy-port is required when using --proxy")
}
tel := setupTelemetry(false, config.LogLevel)
return runDataProxy(config.ProxyID, config.ProxyPort, tel.GetLogger())
}
components, err := initializeApplication(config)
if err != nil {
return err
}
if config.CLIMode {
options := cli.CLIOptions{
Theme: config.Theme,
}
// Run both HTTP server and CLI concurrently
appConfig := &AppConfig{
AccountStore: components.AccountStore,
NoteStore: components.NoteStore,
DeploymentController: components.DeploymentController,
Telemetry: components.Telemetry,
}
return runWithCLI(components.HTTPServer, appConfig, options, components.Simulator)
}
// Otherwise run the HTTP server only
return runHTTPServer(components.HTTPServer, components.Simulator, components.Telemetry)
}
// runDataProxy starts a data proxy server on the specified port
func runDataProxy(id int, port int, logger *slog.Logger) error {
// Create context that cancels on signals
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()
// Create data proxy with notes shard
dataProxy, err := proxy.NewDataProxy(id, port, logger)
if err != nil {
return fmt.Errorf("failed to create data proxy: %w", err)
}
// Run the proxy server
return dataProxy.Run(ctx)
}
func runWithCLI(httpServer *http.Server, appConfig *AppConfig, options cli.CLIOptions, simulator *Simulator) error {
logger := appConfig.Telemetry.GetLogger()
// Start server first, then validate health before starting CLI
serverError := make(chan error, 1)
go func() {
logger.Info("Server starting", "addr", httpServer.Addr)
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
serverError <- err
}
}()
// Wait for server to be healthy before starting CLI
logger.Info("Waiting for server health check...")
if err := checkServerHealth(httpServer.Addr); err != nil {
return fmt.Errorf("server failed health check: %w", err)
}
logger.Info("Server health check passed, starting CLI...")
// Start load generator if provided
if simulator != nil {
go func() {
if err := simulator.Start(); err != nil {
logger.Error("Load generator failed to start", "error", err)
}
}()
}
// Start CLI in foreground
cliError := make(chan error, 1)
go func() {
cliError <- cli.RunCLI(appConfig.AccountStore, appConfig.NoteStore, appConfig.Telemetry, appConfig.DeploymentController, options)
}()
// Wait for either server error or CLI exit
select {
case err := <-serverError:
return fmt.Errorf("server failed: %w", err)
case err := <-cliError:
// CLI exited, give it a moment to fully close, then switch logging to stderr
time.Sleep(100 * time.Millisecond)
appConfig.Telemetry.SwitchToStderr()
logger = appConfig.Telemetry.GetLogger() // Update logger reference
// Update simulator's logger reference if it exists
if simulator != nil {
simulator.UpdateLogger()
}
// CLI exited, shutdown server gracefully
logger.Info("CLI exited, initiating graceful shutdown...")
// Stop load generator first
if simulator != nil {
logger.Info("Stopping load generator...")
simulator.Stop()
}
logger.Info("Shutting down HTTP server...")
ctx, cancel := context.WithTimeout(context.Background(), constants.GracefulShutdownTimeout)
defer cancel()
if shutdownErr := httpServer.Shutdown(ctx); shutdownErr != nil {
logger.Error("Server shutdown failed", "error", shutdownErr)
return fmt.Errorf("server shutdown failed: %w", shutdownErr)
}
logger.Info("Application shutdown complete")
return err // Return the CLI error (if any)
}
}
func runHTTPServer(httpServer *http.Server, simulator *Simulator, tel *telemetry.Telemetry) error {
// Set up signal handling for graceful shutdown
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
stopError := make(chan error, 1)
go func() {
<-stop
// Stop load generator on shutdown signal
if simulator != nil {
simulator.Stop()
}
stopError <- nil // Normal shutdown signal
}()
return runServer(httpServer, stopError, simulator, tel)
}
func runServer(httpServer *http.Server, shutdownTrigger <-chan error, simulator *Simulator, tel *telemetry.Telemetry) error {
logger := tel.GetLogger()
// Start HTTP server in background
serverError := make(chan error, 1)
go func() {
logger.Info("Server starting", "addr", httpServer.Addr)
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
serverError <- err
}
}()
// Start load generator if provided (after server starts)
if simulator != nil {
go func() {
// Wait a moment for server to be ready
time.Sleep(constants.LoadGenStartupDelay)
if err := simulator.Start(); err != nil {
logger.Error("Load generator failed to start", "error", err)
}
}()
}
// Wait for either server error or shutdown trigger
select {
case err := <-serverError:
return fmt.Errorf("server failed to start: %w", err)
case err := <-shutdownTrigger:
// Shutdown triggered (CLI exit or signal)
logger.Info("Shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), constants.GracefulShutdownTimeout)
defer cancel()
if shutdownErr := httpServer.Shutdown(ctx); shutdownErr != nil {
return fmt.Errorf("server shutdown failed: %w", shutdownErr)
}
return err // Return the original trigger error (if any)
}
}
func createHTTPServer(appConfig *AppConfig, port string) *http.Server {
server := restapi.NewServer(
restapi.WithAccountStore(appConfig.AccountStore),
restapi.WithNoteStore(appConfig.NoteStore),
restapi.WithDeploymentController(appConfig.DeploymentController),
restapi.WithTelemetry(appConfig.Telemetry),
)
mux := http.NewServeMux()
server.SetupRoutes(mux)
return &http.Server{
Addr: port,
Handler: server.LoggingMiddleware(mux),
}
}