|
9 | 9 | "fmt" |
10 | 10 | "math" |
11 | 11 | "net" |
12 | | - "reflect" |
13 | 12 | "slices" |
14 | 13 | "strconv" |
15 | 14 | "strings" |
@@ -293,13 +292,12 @@ func (br *BackendReader) readFromBackends(ctx context.Context, excludeZones []st |
293 | 292 | return |
294 | 293 | } |
295 | 294 | br.metric2History(mf, addr) |
296 | | - result := br.history2Value(addr) |
297 | | - br.mergeQueryResult(result, addr) |
298 | 295 | }, nil, br.lg) |
299 | 296 | }(addr) |
300 | 297 | } |
301 | 298 | br.wgp.Wait() |
302 | 299 |
|
| 300 | + br.history2QueryResult() |
303 | 301 | if err := br.marshalHistory(addrs); err != nil { |
304 | 302 | br.lg.Error("marshal backend history failed", zap.Any("addrs", addrs), zap.Error(err)) |
305 | 303 | } |
@@ -369,87 +367,58 @@ func (br *BackendReader) metric2History(mfs map[string]*dto.MetricFamily, backen |
369 | 367 | } |
370 | 368 | } |
371 | 369 |
|
372 | | -// history2Value converts the history to results for all rules of one backend. |
373 | | -// E.g. return the metrics for 1 minute as a matrix |
374 | | -func (br *BackendReader) history2Value(backend string) map[string]model.Value { |
375 | | - results := make(map[string]model.Value, len(br.queryRules)) |
376 | | - labels := map[model.LabelName]model.LabelValue{LabelNameInstance: model.LabelValue(backend)} |
| 370 | +// history2QueryResult generates new query results from the history. |
| 371 | +func (br *BackendReader) history2QueryResult() { |
| 372 | + now := monotime.Now() |
377 | 373 | br.Lock() |
378 | 374 | defer br.Unlock() |
379 | 375 |
|
| 376 | + queryResults := make(map[string]QueryResult, len(br.queryRules)) |
380 | 377 | for ruleKey, rule := range br.queryRules { |
381 | 378 | ruleHistory := br.history[ruleKey] |
382 | 379 | if len(ruleHistory) == 0 { |
383 | 380 | continue |
384 | 381 | } |
385 | | - beHistory := ruleHistory[backend] |
386 | | - if len(beHistory.Step2History) == 0 { |
387 | | - continue |
388 | | - } |
389 | 382 |
|
| 383 | + var value model.Value |
390 | 384 | switch rule.ResultType { |
391 | 385 | case model.ValVector: |
392 | | - // vector indicates returning the latest pair |
393 | | - lastPair := beHistory.Step2History[len(beHistory.Step2History)-1] |
394 | | - results[ruleKey] = model.Vector{{Value: lastPair.Value, Timestamp: lastPair.Timestamp, Metric: labels}} |
395 | | - case model.ValMatrix: |
396 | | - // matrix indicates returning the history |
397 | | - // copy a slice to avoid data race |
398 | | - pairs := make([]model.SamplePair, len(beHistory.Step2History)) |
399 | | - copy(pairs, beHistory.Step2History) |
400 | | - results[ruleKey] = model.Matrix{{Values: pairs, Metric: labels}} |
401 | | - default: |
402 | | - br.lg.Error("unsupported value type", zap.String("value type", rule.ResultType.String())) |
403 | | - } |
404 | | - } |
405 | | - return results |
406 | | -} |
407 | | - |
408 | | -// mergeQueryResult merges the result of one backend into the final result. |
409 | | -func (br *BackendReader) mergeQueryResult(backendValues map[string]model.Value, backend string) { |
410 | | - now := monotime.Now() |
411 | | - br.Lock() |
412 | | - defer br.Unlock() |
413 | | - for ruleKey, value := range backendValues { |
414 | | - result := br.queryResults[ruleKey] |
415 | | - result.UpdateTime = now |
416 | | - if result.Value == nil || reflect.ValueOf(result.Value).IsNil() { |
417 | | - result.Value = value |
418 | | - br.queryResults[ruleKey] = result |
419 | | - continue |
420 | | - } |
421 | | - switch result.Value.Type() { |
422 | | - case model.ValVector: |
423 | | - idx := -1 |
424 | | - for i, v := range result.Value.(model.Vector) { |
425 | | - if v.Metric[LabelNameInstance] == model.LabelValue(backend) { |
426 | | - idx = i |
427 | | - break |
| 386 | + results := make([]*model.Sample, 0, len(ruleHistory)) |
| 387 | + for backend, beHistory := range ruleHistory { |
| 388 | + if len(beHistory.Step2History) == 0 { |
| 389 | + continue |
428 | 390 | } |
| 391 | + labels := map[model.LabelName]model.LabelValue{LabelNameInstance: model.LabelValue(backend)} |
| 392 | + // vector indicates returning the latest pair |
| 393 | + lastPair := beHistory.Step2History[len(beHistory.Step2History)-1] |
| 394 | + results = append(results, &model.Sample{Value: lastPair.Value, Timestamp: lastPair.Timestamp, Metric: labels}) |
429 | 395 | } |
430 | | - if idx >= 0 { |
431 | | - result.Value.(model.Vector)[idx] = value.(model.Vector)[0] |
432 | | - } else { |
433 | | - result.Value = append(result.Value.(model.Vector), value.(model.Vector)[0]) |
434 | | - } |
| 396 | + value = model.Vector(results) |
435 | 397 | case model.ValMatrix: |
436 | | - idx := -1 |
437 | | - for i, v := range result.Value.(model.Matrix) { |
438 | | - if v.Metric[LabelNameInstance] == model.LabelValue(backend) { |
439 | | - idx = i |
440 | | - break |
| 398 | + results := make([]*model.SampleStream, 0, len(ruleHistory)) |
| 399 | + for backend, beHistory := range ruleHistory { |
| 400 | + if len(beHistory.Step2History) == 0 { |
| 401 | + continue |
441 | 402 | } |
| 403 | + labels := map[model.LabelName]model.LabelValue{LabelNameInstance: model.LabelValue(backend)} |
| 404 | + // matrix indicates returning the history |
| 405 | + // copy a slice to avoid data race |
| 406 | + pairs := make([]model.SamplePair, len(beHistory.Step2History)) |
| 407 | + copy(pairs, beHistory.Step2History) |
| 408 | + results = append(results, &model.SampleStream{Values: pairs, Metric: labels}) |
442 | 409 | } |
443 | | - if idx >= 0 { |
444 | | - result.Value.(model.Matrix)[idx] = value.(model.Matrix)[0] |
445 | | - } else { |
446 | | - result.Value = append(result.Value.(model.Matrix), value.(model.Matrix)[0]) |
447 | | - } |
| 410 | + value = model.Matrix(results) |
448 | 411 | default: |
449 | | - br.lg.Error("unsupported value type", zap.Stringer("value type", result.Value.Type())) |
| 412 | + br.lg.Error("unsupported value type", zap.String("value type", rule.ResultType.String())) |
| 413 | + } |
| 414 | + |
| 415 | + queryResults[ruleKey] = QueryResult{ |
| 416 | + Value: value, |
| 417 | + UpdateTime: now, |
450 | 418 | } |
451 | | - br.queryResults[ruleKey] = result |
452 | 419 | } |
| 420 | + |
| 421 | + br.queryResults = queryResults |
453 | 422 | } |
454 | 423 |
|
455 | 424 | // purgeHistory purges the expired or useless history values, otherwise the memory grows infinitely. |
@@ -499,21 +468,12 @@ func (br *BackendReader) readFromOwner(ctx context.Context, ownerAddr string) er |
499 | 468 | if err := json.Unmarshal(resp, &newHistory); err != nil { |
500 | 469 | return err |
501 | 470 | } |
502 | | - backends := make(map[string]struct{}) |
503 | | - for _, ruleHistory := range newHistory { |
504 | | - for backend := range ruleHistory { |
505 | | - backends[backend] = struct{}{} |
506 | | - } |
507 | | - } |
508 | 471 |
|
509 | 472 | // If this instance becomes the owner in the next round, it can reuse the history. |
510 | 473 | br.mergeHistory(newHistory) |
511 | 474 |
|
512 | | - // Update query result for the updated backends. |
513 | | - for backend := range backends { |
514 | | - result := br.history2Value(backend) |
515 | | - br.mergeQueryResult(result, backend) |
516 | | - } |
| 475 | + // Generate query result for all backends. |
| 476 | + br.history2QueryResult() |
517 | 477 | return nil |
518 | 478 | } |
519 | 479 |
|
@@ -544,6 +504,8 @@ func (br *BackendReader) mergeHistory(newHistory map[string]map[string]backendHi |
544 | 504 | ruleHistory[backend] = backendHistory |
545 | 505 | } |
546 | 506 | } |
| 507 | + // avoid that the stale history is returned to other members when it just becomes the owner |
| 508 | + br.marshalledHistory = nil |
547 | 509 | } |
548 | 510 |
|
549 | 511 | // marshalHistory marshals the backends that are read by this owner. The marshaled data will be returned to other members. |
|
0 commit comments