-
Notifications
You must be signed in to change notification settings - Fork 6.1k
br: fix region scan checker #64530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
br: fix region scan checker #64530
Changes from 2 commits
ae98b79
a05629b
be47443
c4f24b4
e11ca24
3dcbd40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -152,7 +152,7 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi | |
| return leftRegions | ||
| } | ||
|
|
||
| func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { | ||
| func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo, limitted bool) error { | ||
| // current pd can't guarantee the consistency of returned regions | ||
| if len(regions) == 0 { | ||
| return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endKey: %s", | ||
|
|
@@ -165,7 +165,7 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro | |
| regions[0].Region.Id, | ||
| redact.Key(regions[0].Region.StartKey), redact.Key(startKey), | ||
| regions[0].Region.RegionEpoch.String()) | ||
| } else if len(regions[len(regions)-1].Region.EndKey) != 0 && | ||
| } else if !limitted && len(regions[len(regions)-1].Region.EndKey) != 0 && | ||
| bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 { | ||
| return errors.Annotatef(berrors.ErrPDBatchScanRegion, | ||
| "last region %d's endKey(%s) < endKey(%s), region epoch: %s", | ||
|
|
@@ -179,11 +179,19 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro | |
| return errors.Annotatef(berrors.ErrPDBatchScanRegion, | ||
| "region %d's leader is nil", cur.Region.Id) | ||
| } | ||
| if cur.Leader.StoreId == 0 { | ||
| return errors.Annotatef(berrors.ErrPDBatchScanRegion, | ||
| "region %d's leader's store id is 0", cur.Region.Id) | ||
| } | ||
| for _, r := range regions[1:] { | ||
| if r.Leader == nil { | ||
| return errors.Annotatef(berrors.ErrPDBatchScanRegion, | ||
| "region %d's leader is nil", r.Region.Id) | ||
| } | ||
| if r.Leader.StoreId == 0 { | ||
| return errors.Annotatef(berrors.ErrPDBatchScanRegion, | ||
| "region %d's leader's store id is 0", r.Region.Id) | ||
| } | ||
| if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) { | ||
| return errors.Annotatef(berrors.ErrPDBatchScanRegion, | ||
| "region %d's endKey not equal to next region %d's startKey, endKey: %s, startKey: %s, region epoch: %s %s", | ||
|
|
@@ -197,6 +205,34 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro | |
| return nil | ||
| } | ||
|
|
||
| func scanRegionsLimitWithRetry( | ||
| ctx context.Context, client SplitClient, startKey, endKey []byte, limit int, mustLeader bool, | ||
| ) ([]*RegionInfo, bool, error) { | ||
| var ( | ||
| batch []*RegionInfo | ||
| err error | ||
| ) | ||
| _ = utils.WithRetry(ctx, func() error { | ||
| defer func() { mustLeader = mustLeader || err != nil }() | ||
| if mustLeader { | ||
| batch, err = client.ScanRegions(ctx, startKey, endKey, limit) | ||
| } else { | ||
| batch, err = client.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle()) | ||
| } | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if err = checkRegionConsistency(startKey, endKey, batch, true); err != nil { | ||
| log.Warn("failed to scan region, retrying", | ||
| logutil.ShortError(err), | ||
| zap.Int("regionLength", len(batch))) | ||
| return err | ||
| } | ||
| return nil | ||
| }, NewWaitRegionOnlineBackoffer()) | ||
| return batch, mustLeader, err | ||
| } | ||
|
|
||
| // PaginateScanRegion scan regions with a limit pagination and return all regions | ||
| // at once. The returned regions are continuous and cover the key range. If not, | ||
| // or meet errors, it will retry internally. | ||
|
|
@@ -210,24 +246,17 @@ func PaginateScanRegion( | |
|
|
||
| var ( | ||
| lastRegions []*RegionInfo | ||
| lastErr error | ||
| err error | ||
| mustLeader = false | ||
| backoffer = NewWaitRegionOnlineBackoffer() | ||
| ) | ||
| _ = utils.WithRetry(ctx, func() error { | ||
| var err error | ||
| defer func() { | ||
| lastErr = err | ||
| }() | ||
| defer func() { mustLeader = true }() | ||
| regions := make([]*RegionInfo, 0, 16) | ||
| scanStartKey := startKey | ||
| for { | ||
| var batch []*RegionInfo | ||
| if lastErr != nil { | ||
| batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit) | ||
| } else { | ||
| batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit, opt.WithAllowFollowerHandle()) | ||
| } | ||
|
|
||
| batch, mustLeader, err = scanRegionsLimitWithRetry(ctx, client, scanStartKey, endKey, limit, mustLeader) | ||
| if err != nil { | ||
| err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s", | ||
| redact.Key(scanStartKey), err.Error()) | ||
|
|
@@ -252,7 +281,7 @@ func PaginateScanRegion( | |
| } | ||
| lastRegions = regions | ||
|
|
||
| if err = checkRegionConsistency(startKey, endKey, regions); err != nil { | ||
| if err = checkRegionConsistency(startKey, endKey, regions, false); err != nil { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need also check the consistency between the adjacent batches, not only the check here. Maybe this duplicate check doesn't consume too much resource? |
||
| log.Warn("failed to scan region, retrying", | ||
| logutil.ShortError(err), | ||
| zap.Int("regionLength", len(regions))) | ||
|
|
@@ -261,7 +290,7 @@ func PaginateScanRegion( | |
| return nil | ||
| }, backoffer) | ||
|
|
||
| return lastRegions, lastErr | ||
| return lastRegions, err | ||
| } | ||
|
|
||
| // checkPartRegionConsistency only checks the continuity of regions and the first region consistency. | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment for
limited?