|
9 | 9 | "context" |
10 | 10 | gosql "database/sql" |
11 | 11 | "fmt" |
| 12 | + "math/rand" |
| 13 | + "os" |
| 14 | + "strconv" |
12 | 15 | "strings" |
13 | 16 | "time" |
14 | 17 |
|
@@ -39,6 +42,11 @@ type jobDetails struct { |
39 | 42 | highWaterTimestamp hlc.Timestamp // high watermark timestamp |
40 | 43 | } |
41 | 44 |
|
| 45 | +// configSetter defines a callback function used by parseConfigs to apply each |
| 46 | +// parsed sink and its associated percentage. This allows custom handling of |
| 47 | +// each config entry during parsing (e.g., storing in a map or validating). |
| 48 | +type configSetter func(key string, value int) error |
| 49 | + |
42 | 50 | // getJobsUpdatedWithPayload fetches additional changefeed payload details for specific jobs from the database. |
43 | 51 | // This returns a new slice of jobDetails with the updated payload details without mutating the one in input. |
44 | 52 | func getJobsUpdatedWithPayload( |
@@ -338,10 +346,93 @@ func createChangefeed( |
338 | 346 | return err |
339 | 347 | } |
340 | 348 |
|
| 349 | +// ParseConfigs exported for testing |
| 350 | +func ParseConfigs(config string, cb configSetter) error { |
| 351 | + if config == "" { |
| 352 | + return fmt.Errorf("config string cannot be empty") |
| 353 | + } |
| 354 | + |
| 355 | + configsArr := strings.Split(config, ",") |
| 356 | + totalCount := 0 |
| 357 | + |
| 358 | + for _, c := range configsArr { |
| 359 | + parts := strings.Split(c, ":") |
| 360 | + if len(parts) != 2 { |
| 361 | + return fmt.Errorf("invalid config format: %s", c) |
| 362 | + } |
| 363 | + |
| 364 | + key := parts[0] |
| 365 | + valueStr := parts[1] |
| 366 | + |
| 367 | + value, err := strconv.Atoi(valueStr) |
| 368 | + if err != nil { |
| 369 | + return fmt.Errorf("invalid percentage value in config '%s'", c) |
| 370 | + } |
| 371 | + |
| 372 | + if value < 0 || value > 100 { |
| 373 | + return fmt.Errorf("percentage value out of range in config '%s': must be between 0 and 100", c) |
| 374 | + } |
| 375 | + |
| 376 | + err = cb(key, value) |
| 377 | + if err != nil { |
| 378 | + return err |
| 379 | + } |
| 380 | + |
| 381 | + totalCount += value |
| 382 | + } |
| 383 | + |
| 384 | + if totalCount != 100 { |
| 385 | + return fmt.Errorf("all sinks must sum to 100%%, but total is %d%%", totalCount) |
| 386 | + } |
| 387 | + |
| 388 | + return nil |
| 389 | +} |
| 390 | + |
| 391 | +func selectSink(sinks map[string]int) string { |
| 392 | + choice := rand.Intn(100) |
| 393 | + sum := 0 |
| 394 | + for sink, pct := range sinks { |
| 395 | + sum += pct |
| 396 | + if choice < sum { |
| 397 | + return sink |
| 398 | + } |
| 399 | + } |
| 400 | + return "null" // Default fallback if no valid selection |
| 401 | +} |
| 402 | + |
341 | 403 | // getSinkConfigs returns the sink uri along with the options for creating the changefeed. |
342 | 404 | // this will be extended later for more sinks |
343 | 405 | func getSinkConfigs(_ context.Context, _ []*jobDetails) (string, []string, error) { |
344 | | - return "null://", make([]string, 0), nil |
| 406 | + sinkConfigEnv := os.Getenv("SINK_CONFIG") |
| 407 | + if sinkConfigEnv == "" { |
| 408 | + return "null://", []string{}, nil // Default to null sink if env is not set |
| 409 | + } |
| 410 | + |
| 411 | + sinks := make(map[string]int) |
| 412 | + uris := make(map[string]string) |
| 413 | + |
| 414 | + err := ParseConfigs(sinkConfigEnv, func(sink string, value int) error { |
| 415 | + sinks[sink] = value |
| 416 | + uriEnv := fmt.Sprintf("SINK_CONFIG_%s", strings.ToUpper(sink)) |
| 417 | + uri, exists := os.LookupEnv(uriEnv) |
| 418 | + if !exists { |
| 419 | + return fmt.Errorf("environment variable %s not found for sink %s", uriEnv, sink) |
| 420 | + } |
| 421 | + uris[sink] = uri |
| 422 | + return nil |
| 423 | + }) |
| 424 | + if err != nil { |
| 425 | + // Default to null sink on parsing error |
| 426 | + return "null://", []string{}, nil //nolint:returnerrcheck |
| 427 | + } |
| 428 | + |
| 429 | + selectedSink := selectSink(sinks) |
| 430 | + selectedURI, exists := uris[selectedSink] |
| 431 | + if !exists { |
| 432 | + return "null://", []string{}, nil // Default to null sink if selection fails |
| 433 | + } |
| 434 | + |
| 435 | + return selectedURI, []string{}, nil |
345 | 436 | } |
346 | 437 |
|
347 | 438 | // calculateScanOption determines whether the new changefeed should have an initial scan based on existing jobs. |
|
0 commit comments