Skip to content

Commit bfbf6cc

Browse files
committed
feat: support medium sync policy in CCR tasks
# Conflicts: # pkg/ccr/base/spec.go # pkg/ccr/job.go
1 parent 0fc0a66 commit bfbf6cc

File tree

12 files changed

+683
-57
lines changed

12 files changed

+683
-57
lines changed

pkg/ccr/base/spec.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,6 +1595,32 @@ func (s *Spec) GetCreateTableSql(tableName string) (string, error) {
15951595

15961596
return createSql, nil
15971597
}
1598+
func (s *Spec) ModifyPartitionProperty(destTableName string, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error {
1599+
if batchModifyPartitionsInfo == nil || len(batchModifyPartitionsInfo.Infos) == 0 {
1600+
log.Warnf("empty partition infos, skip modify partition property")
1601+
return nil
1602+
}
1603+
1604+
dbName := utils.FormatKeywordName(s.Database)
1605+
destTableName = utils.FormatKeywordName(destTableName)
1606+
1607+
for _, partitionInfo := range batchModifyPartitionsInfo.Infos {
1608+
if partitionInfo.DataProperty == nil || partitionInfo.DataProperty.StorageMedium == "" {
1609+
log.Warnf("partition %d has no storage medium, skip modify partition property", partitionInfo.PartitionId)
1610+
continue
1611+
}
1612+
1613+
sql := fmt.Sprintf("ALTER TABLE %s.%s MODIFY PARTITION %s SET (\"storage_medium\" = \"%s\")",
1614+
dbName, destTableName, utils.FormatKeywordName(partitionInfo.PartitionName), partitionInfo.DataProperty.StorageMedium)
1615+
1616+
log.Infof("modify partition property sql: %s", sql)
1617+
if err := s.Exec(sql); err != nil {
1618+
log.Warnf("modify partition %s property failed: %v", partitionInfo.PartitionName, err)
1619+
}
1620+
}
1621+
1622+
return nil
1623+
}
15981624

15991625
// Determine whether the error are network related, eg connection refused, connection reset, exposed from net packages.
16001626
func isNetworkRelated(err error) bool {

pkg/ccr/base/specer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type Specer interface {
6666
AddPartition(destTableName string, addPartition *record.AddPartition) error
6767
DropPartition(destTableName string, dropPartition *record.DropPartition) error
6868
RenamePartition(destTableName, oldPartition, newPartition string) error
69+
ModifyPartitionProperty(destTableName string, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error
6970

7071
LightningIndexChange(tableAlias string, changes *record.ModifyTableAddOrDropInvertedIndices) error
7172
BuildIndex(tableAlias string, buildIndex *record.IndexChangeJob) error

pkg/ccr/handle/create_table.go

Lines changed: 175 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package handle
22

33
import (
4+
"fmt"
5+
"regexp"
46
"strings"
57

68
"github.com/selectdb/ccr_syncer/pkg/ccr"
@@ -18,6 +20,176 @@ type CreateTableHandle struct {
1820
IdempotentJobHandle[*record.CreateTable]
1921
}
2022

23+
// Check if error message indicates storage medium or capacity related issues
24+
func isStorageMediumError(errMsg string) bool {
25+
log.Infof("STORAGE_MEDIUM_DEBUG: Analyzing error message: %s", errMsg)
26+
27+
patterns := []string{
28+
"capExceedLimit",
29+
"Failed to find enough backend",
30+
"not enough backend",
31+
"storage medium",
32+
"storage_medium",
33+
"avail capacity",
34+
"disk space",
35+
"not enough space",
36+
"replication num",
37+
"replication tag",
38+
}
39+
40+
for _, pattern := range patterns {
41+
if strings.Contains(strings.ToLower(errMsg), strings.ToLower(pattern)) {
42+
log.Infof("STORAGE_MEDIUM_DEBUG: Found storage/capacity related pattern '%s' in error message", pattern)
43+
return true
44+
}
45+
}
46+
47+
log.Infof("STORAGE_MEDIUM_DEBUG: No storage/capacity related patterns found in error message")
48+
return false
49+
}
50+
51+
// Extract storage_medium from CREATE TABLE SQL
52+
func extractStorageMediumFromCreateTableSql(createSql string) string {
53+
pattern := `"storage_medium"\s*=\s*"([^"]*)"`
54+
re := regexp.MustCompile(pattern)
55+
matches := re.FindStringSubmatch(createSql)
56+
if len(matches) >= 2 {
57+
medium := strings.ToLower(matches[1])
58+
log.Infof("STORAGE_MEDIUM_DEBUG: Extracted storage medium: %s", medium)
59+
return medium
60+
}
61+
log.Infof("STORAGE_MEDIUM_DEBUG: No storage medium found in SQL")
62+
return ""
63+
}
64+
65+
// Switch storage medium between SSD and HDD
66+
func switchStorageMedium(medium string) string {
67+
switch strings.ToLower(medium) {
68+
case "ssd":
69+
return "hdd"
70+
case "hdd":
71+
return "ssd"
72+
default:
73+
// Default to hdd if not standard medium
74+
return "hdd"
75+
}
76+
}
77+
78+
// Set specific storage_medium in CREATE TABLE SQL
79+
func setStorageMediumInCreateTableSql(createSql string, medium string) string {
80+
// Remove existing storage_medium first
81+
createSql = ccr.FilterStorageMediumFromCreateTableSql(createSql)
82+
83+
// Check if PROPERTIES clause exists
84+
propertiesPattern := `PROPERTIES\s*\(`
85+
if matched, _ := regexp.MatchString(propertiesPattern, createSql); matched {
86+
// Add storage_medium at the beginning of PROPERTIES
87+
pattern := `(PROPERTIES\s*\(\s*)`
88+
replacement := fmt.Sprintf(`${1}"storage_medium" = "%s", `, medium)
89+
createSql = regexp.MustCompile(pattern).ReplaceAllString(createSql, replacement)
90+
} else {
91+
// Add entire PROPERTIES clause
92+
pattern := `(\s*)$`
93+
replacement := fmt.Sprintf(` PROPERTIES ("storage_medium" = "%s")`, medium)
94+
createSql = regexp.MustCompile(pattern).ReplaceAllString(createSql, replacement)
95+
}
96+
97+
return createSql
98+
}
99+
100+
// Process CREATE TABLE SQL according to medium sync policy
101+
func processCreateTableSqlByMediumPolicy(j *ccr.Job, createTable *record.CreateTable) error {
102+
// Note: We need to access Job's medium sync policy and feature flags
103+
// For now, we'll implement basic logic based on what we know the Job should do
104+
105+
// Check if medium sync policy feature is enabled (we assume it's enabled for new handler)
106+
// This is a simplified version that handles the main cases
107+
mediumPolicy := j.MediumSyncPolicy
108+
109+
switch mediumPolicy {
110+
case ccr.MediumSyncPolicySameWithUpstream:
111+
// Keep upstream storage_medium unchanged
112+
log.Infof("using same_with_upstream policy, keeping original storage_medium")
113+
return nil
114+
115+
case ccr.MediumSyncPolicyHDD:
116+
// Force set to HDD
117+
log.Infof("using hdd policy, setting storage_medium to hdd")
118+
createTable.Sql = setStorageMediumInCreateTableSql(createTable.Sql, "hdd")
119+
return nil
120+
121+
default:
122+
log.Warnf("unknown medium sync policy: %s, falling back to filter storage_medium", mediumPolicy)
123+
if ccr.FeatureFilterStorageMedium {
124+
createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(createTable.Sql)
125+
}
126+
return nil
127+
}
128+
}
129+
130+
// Create table with medium retry mechanism
131+
func createTableWithMediumRetry(j *ccr.Job, createTable *record.CreateTable, srcDb string) error {
132+
originalSql := createTable.Sql
133+
log.Infof("STORAGE_MEDIUM_DEBUG: Starting create table with medium retry for table: %s", createTable.TableName)
134+
135+
// Process SQL according to medium policy
136+
if err := processCreateTableSqlByMediumPolicy(j, createTable); err != nil {
137+
return err
138+
}
139+
140+
// First attempt
141+
err := j.IDest.CreateTableOrView(createTable, srcDb)
142+
if err == nil {
143+
log.Infof("STORAGE_MEDIUM_DEBUG: Create table succeeded on first attempt")
144+
return nil
145+
}
146+
147+
log.Warnf("STORAGE_MEDIUM_DEBUG: First attempt failed: %s", err.Error())
148+
149+
// Check if it's storage related error and should retry
150+
if !isStorageMediumError(err.Error()) {
151+
log.Infof("STORAGE_MEDIUM_DEBUG: Not a storage related error, no retry")
152+
return err
153+
}
154+
155+
// Extract current medium and switch to the other one
156+
currentMedium := extractStorageMediumFromCreateTableSql(createTable.Sql)
157+
if currentMedium == "" {
158+
currentMedium = "ssd" // default
159+
}
160+
161+
switchedMedium := switchStorageMedium(currentMedium)
162+
log.Infof("STORAGE_MEDIUM_DEBUG: Switching from %s to %s", currentMedium, switchedMedium)
163+
164+
createTable.Sql = setStorageMediumInCreateTableSql(originalSql, switchedMedium)
165+
166+
// Second attempt with switched medium
167+
err = j.IDest.CreateTableOrView(createTable, srcDb)
168+
if err == nil {
169+
log.Infof("STORAGE_MEDIUM_DEBUG: Create table succeeded after switching to %s", switchedMedium)
170+
return nil
171+
}
172+
173+
log.Warnf("STORAGE_MEDIUM_DEBUG: Second attempt with %s also failed: %s", switchedMedium, err.Error())
174+
175+
// Final attempt: remove storage_medium if still storage related error
176+
if isStorageMediumError(err.Error()) {
177+
log.Infof("STORAGE_MEDIUM_DEBUG: Removing storage_medium for final attempt")
178+
createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(originalSql)
179+
180+
err = j.IDest.CreateTableOrView(createTable, srcDb)
181+
if err == nil {
182+
log.Infof("STORAGE_MEDIUM_DEBUG: Create table succeeded after removing storage_medium")
183+
return nil
184+
}
185+
186+
log.Warnf("STORAGE_MEDIUM_DEBUG: Final attempt without storage_medium also failed: %s", err.Error())
187+
}
188+
189+
log.Errorf("STORAGE_MEDIUM_DEBUG: All attempts failed, returning final error")
190+
return err
191+
}
192+
21193
func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *record.CreateTable) error {
22194
if j.SyncType != ccr.DBSync {
23195
return xerror.Errorf(xerror.Normal, "invalid sync type: %v", j.SyncType)
@@ -68,12 +240,11 @@ func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *rec
68240
}
69241
}
70242

71-
if ccr.FeatureFilterStorageMedium {
72-
createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(createTable.Sql)
73-
}
243+
// Remove old storage_medium filtering logic, handled by new retry function
74244
createTable.Sql = ccr.FilterDynamicPartitionStoragePolicyFromCreateTableSql(createTable.Sql)
75245

76-
if err := j.IDest.CreateTableOrView(createTable, j.Src.Database); err != nil {
246+
// Use new create table function with medium retry mechanism
247+
if err := createTableWithMediumRetry(j, createTable, j.Src.Database); err != nil {
77248
errMsg := err.Error()
78249
if strings.Contains(errMsg, "Can not found function") {
79250
log.Warnf("skip creating table/view because the UDF function is not supported yet: %s", errMsg)
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package handle
2+
3+
import (
4+
"github.com/selectdb/ccr_syncer/pkg/ccr"
5+
"github.com/selectdb/ccr_syncer/pkg/ccr/record"
6+
festruct "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/frontendservice"
7+
"github.com/selectdb/ccr_syncer/pkg/xerror"
8+
log "github.com/sirupsen/logrus"
9+
)
10+
11+
func init() {
12+
ccr.RegisterJobHandle[*record.BatchModifyPartitionsInfo](festruct.TBinlogType_MODIFY_PARTITIONS, &ModifyPartitionsHandle{})
13+
}
14+
15+
type ModifyPartitionsHandle struct {
16+
// The modify partitions binlog is idempotent
17+
IdempotentJobHandle[*record.BatchModifyPartitionsInfo]
18+
}
19+
20+
func (h *ModifyPartitionsHandle) Handle(j *ccr.Job, commitSeq int64, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error {
21+
// TODO: custom by medium_sync_policy
22+
if !ccr.FeatureMediumSyncPolicy || j.MediumSyncPolicy == "hdd" {
23+
log.Warnf("skip modify partitions for FeatureMediumSyncPolicy off or medium_sync_policy is hdd")
24+
return nil
25+
}
26+
27+
// Safety check: ensure we have partition infos to process
28+
if batchModifyPartitionsInfo == nil || len(batchModifyPartitionsInfo.Infos) == 0 {
29+
return xerror.Errorf(xerror.Normal, "batch modify partitions info is empty or nil")
30+
}
31+
32+
// Get table ID from the first partition info (all partitions should belong to the same table)
33+
tableId := batchModifyPartitionsInfo.GetTableId()
34+
if tableId <= 0 {
35+
return xerror.Errorf(xerror.Normal, "invalid table ID: %d", tableId)
36+
}
37+
38+
// Check if it's a materialized view table
39+
if isAsyncMv, err := j.IsMaterializedViewTable(tableId); err != nil {
40+
return err
41+
} else if isAsyncMv {
42+
log.Warnf("skip modify partitions for materialized view table %d", tableId)
43+
return nil
44+
}
45+
46+
// Get destination table name
47+
destTableName, err := j.GetDestNameBySrcId(tableId)
48+
if err != nil {
49+
return err
50+
}
51+
52+
// Get the source cluster meta information and supplement the partition name information
53+
srcMeta := j.GetSrcMeta()
54+
if err := batchModifyPartitionsInfo.EnrichWithPartitionNames(srcMeta); err != nil {
55+
log.Errorf("failed to enrich partition names from source meta: %v", err)
56+
return err
57+
}
58+
59+
// Call spec layer method directly
60+
return j.Dest.ModifyPartitionProperty(destTableName, batchModifyPartitionsInfo)
61+
}

0 commit comments

Comments
 (0)