Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748
github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20220331105802-5ac69661755c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 h1:i4MBe1zGq9/r3BH6rTRunizi4T59fpNk8hvBCrB5UAY=
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e h1:iquj/SVNullS8+llCooL3Pk2DWQPW/HDDpF1EHwsnq0=
github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down
17 changes: 17 additions & 0 deletions server/storage/endpoint/gc_safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ type ServiceSafePoint struct {
SafePoint uint64 `json:"safe_point"`
}

// ServiceGroupGCSafePoint is gcWorker's safepoint for specific service group
type ServiceGroupGCSafePoint struct {
ServiceGroupID string `json:"service_group_id"`
SafePoint uint64 `json:"safe_point"`
}

// GCSafePointStorage defines the storage operations on the GC safe point.
type GCSafePointStorage interface {
LoadGCSafePoint() (uint64, error)
Expand All @@ -42,6 +48,17 @@ type GCSafePointStorage interface {
LoadAllServiceGCSafePoints() ([]*ServiceSafePoint, error)
SaveServiceGCSafePoint(ssp *ServiceSafePoint) error
RemoveServiceGCSafePoint(serviceID string) error

LoadAllServiceGroups() ([]string, error)
// Service safe point interfaces.
SaveServiceSafePointByServiceGroup(serviceGroupID string, ssp *ServiceSafePoint) error
LoadServiceSafePointByServiceGroup(serviceGroupID, serviceID string) (*ServiceSafePoint, error)
LoadMinServiceSafePointByServiceGroup(serviceGroupID string, now time.Time) (*ServiceSafePoint, error)
RemoveServiceSafePointByServiceGroup(serviceGroupID, serviceID string) error
// GC safe point interfaces.
SaveGCSafePointByServiceGroup(gcSafePoint *ServiceGroupGCSafePoint) error
LoadGCSafePointByServiceGroup(serviceGroupID string) (*ServiceGroupGCSafePoint, error)
LoadAllServiceGroupGCSafePoints() ([]*ServiceGroupGCSafePoint, error)
}

var _ GCSafePointStorage = (*StorageEndpoint)(nil)
Expand Down
154 changes: 154 additions & 0 deletions server/storage/endpoint/gc_service_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoint

import (
"encoding/json"
"math"
"time"

"github.com/pingcap/errors"
"go.etcd.io/etcd/clientv3"
)

// Predefine service groups. More service groups would come from "Multi-tenant".
const (
// ServiceGroupRawKVDefault is service group ID for RawKV.
ServiceGroupRawKVDefault = "default_rawkv"
)

// LoadAllServiceGroups returns a list of all service group IDs.
// We have only predefine service groups by now.
// More service groups would come from "Multi-tenant".
func (se *StorageEndpoint) LoadAllServiceGroups() ([]string, error) {
serviceGroupIDs := []string{
ServiceGroupRawKVDefault,
}

return serviceGroupIDs, nil
}

// SaveServiceSafePointByServiceGroup saves service safe point under given service group.
func (se *StorageEndpoint) SaveServiceSafePointByServiceGroup(serviceGroupID string, ssp *ServiceSafePoint) error {
if ssp.ServiceID == "" {
return errors.New("service id of service safepoint cannot be empty")
}
key := GCServiceSafePointPathByServiceGroup(serviceGroupID, ssp.ServiceID)
value, err := json.Marshal(ssp)
if err != nil {
return err
}
return se.Save(key, string(value))
}

// LoadServiceSafePointByServiceGroup reads ServiceSafePoint for the given service group and service name.
// Return nil if no safepoint not exist.
func (se *StorageEndpoint) LoadServiceSafePointByServiceGroup(serviceGroupID, serviceID string) (*ServiceSafePoint, error) {
value, err := se.Load(GCServiceSafePointPathByServiceGroup(serviceGroupID, serviceID))
if err != nil || value == "" {
return nil, err
}
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(value), ssp); err != nil {
return nil, err
}
return ssp, nil
}

// LoadMinServiceSafePointByServiceGroup returns the minimum safepoint for the given service group.
// Note that gc worker safe point are store separately.
// If no service safe point exist for the given service group or all the service safe points just expired, return nil.
func (se *StorageEndpoint) LoadMinServiceSafePointByServiceGroup(serviceGroupID string, now time.Time) (*ServiceSafePoint, error) {
prefix := GCServiceSafePointPrefixPathByServiceGroup(serviceGroupID)
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}

min := &ServiceSafePoint{SafePoint: math.MaxUint64}
for i, key := range keys {
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}

// remove expired safe points.
if ssp.ExpiredAt < now.Unix() {
se.Remove(key)
continue
}

if ssp.SafePoint < min.SafePoint {
min = ssp
}
}

if min.SafePoint == math.MaxUint64 {
// no service safe point or all of them are expired.
return nil, nil
}

// successfully found a valid min safe point.
return min, nil
}

// RemoveServiceSafePointByServiceGroup removes a service safe point.
func (se *StorageEndpoint) RemoveServiceSafePointByServiceGroup(serviceGroupID, serviceID string) error {
key := GCServiceSafePointPathByServiceGroup(serviceGroupID, serviceID)
return se.Remove(key)
}

// SaveGCSafePointByServiceGroup saves GCSafePoint under given service group.
func (se *StorageEndpoint) SaveGCSafePointByServiceGroup(gcSafePoint *ServiceGroupGCSafePoint) error {
safePoint, err := json.Marshal(gcSafePoint)
if err != nil {
return err
}
return se.Save(gcSafePointPathByServiceGroup(gcSafePoint.ServiceGroupID), string(safePoint))
}

// LoadGCSafePointByServiceGroup reads GCSafePoint for the given service group.
// return nil if no safepoint not exist.
func (se *StorageEndpoint) LoadGCSafePointByServiceGroup(serviceGroupID string) (*ServiceGroupGCSafePoint, error) {
value, err := se.Load(gcSafePointPathByServiceGroup(serviceGroupID))
if err != nil || value == "" {
return nil, err
}
gcSafePoint := &ServiceGroupGCSafePoint{}
if err := json.Unmarshal([]byte(value), gcSafePoint); err != nil {
return nil, err
}
return gcSafePoint, nil
}

// LoadAllServiceGroupGCSafePoints returns two slices of ServiceGroupIDs and their corresponding safe points.
func (se *StorageEndpoint) LoadAllServiceGroupGCSafePoints() ([]*ServiceGroupGCSafePoint, error) {
prefix := gcServiceGroupGCSafePointPrefixPath()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
_, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
safePoints := make([]*ServiceGroupGCSafePoint, 0, len(values))
for _, value := range values {
gcSafePoint := &ServiceGroupGCSafePoint{}
if err := json.Unmarshal([]byte(value), gcSafePoint); err != nil {
return nil, err
}
safePoints = append(safePoints, gcSafePoint)
}
return safePoints, nil
}
27 changes: 27 additions & 0 deletions server/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
customScheduleConfigPath = "scheduler_config"
gcWorkerServiceSafePointID = "gc_worker"
minResolvedTS = "min_resolved_ts"

gcServiceGroupGCSafePointPath = "gc_servicegroup/gc_safepoint"
gcServiceGroupServiceSafePointPath = "gc_servicegroup/service_safepoint"
)

// AppendToRootPath appends the given key to the rootPath.
Expand Down Expand Up @@ -99,6 +102,30 @@ func gcSafePointServicePath(serviceID string) string {
return path.Join(gcSafePointPath(), "service", serviceID)
}

// gcSafePointPathByServiceGroup returns the path of the gc safe point of speicified service group.
// Path: /gc_servicegroup/gc_safepoint/$service_group_id
func gcSafePointPathByServiceGroup(serviceGroupID string) string {
return path.Join(gcServiceGroupGCSafePointPath, serviceGroupID)
}

// GCServiceSafePointPrefixPathByServiceGroup returns the prefix path of the service safe point of speicified service group.
// Path: /gc_servicegroup/service_safepoint/$service_group_id
func GCServiceSafePointPrefixPathByServiceGroup(serviceGroupID string) string {
return path.Join(gcServiceGroupServiceSafePointPath, serviceGroupID) + "/"
}

// GCServiceSafePointPathByServiceGroup returns the path of a service's safe point of speicified service group.
// Path: /gc_servicegroup/service_safepoint/$service_group_id/$service_id
func GCServiceSafePointPathByServiceGroup(serviceGroupID, serviceID string) string {
return path.Join(GCServiceSafePointPrefixPathByServiceGroup(serviceGroupID), serviceID)
}

// gcServiceGroupGCSafePointPrefixPath returns the prefix path of gc safe point for all service groups.
// Path: /gc_servicegroup/gc_safepoint/
func gcServiceGroupGCSafePointPrefixPath() string {
return gcServiceGroupGCSafePointPath + "/"
}

// MinResolvedTSPath returns the min resolved ts path
func MinResolvedTSPath() string {
return path.Join(clusterPath, minResolvedTS)
Expand Down
Loading