Skip to content

Commit 131196a

Browse files
committed
feat: support medium sync
1 parent 0a9d670 commit 131196a

File tree

12 files changed

+925
-52
lines changed

12 files changed

+925
-52
lines changed

pkg/ccr/base/spec.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,6 +1595,51 @@ func (s *Spec) GetCreateTableSql(tableName string) (string, error) {
15951595

15961596
return createSql, nil
15971597
}
1598+
func (s *Spec) ModifyPartitionProperty(destTableName string, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error {
1599+
if batchModifyPartitionsInfo == nil || len(batchModifyPartitionsInfo.Infos) == 0 {
1600+
log.Warnf("empty partition infos, skip modify partition property")
1601+
return nil
1602+
}
1603+
1604+
dbName := utils.FormatKeywordName(s.Database)
1605+
destTableName = utils.FormatKeywordName(destTableName)
1606+
1607+
var lastErr error
1608+
successCount := 0
1609+
for _, partitionInfo := range batchModifyPartitionsInfo.Infos {
1610+
if partitionInfo.DataProperty == nil || partitionInfo.DataProperty.StorageMedium == "" {
1611+
log.Warnf("partition %d has no storage medium, skip modify partition property", partitionInfo.PartitionId)
1612+
continue
1613+
}
1614+
1615+
sql := fmt.Sprintf("ALTER TABLE %s.%s MODIFY PARTITION %s SET (\"storage_medium\" = \"%s\")",
1616+
dbName, destTableName, utils.FormatKeywordName(partitionInfo.PartitionName), partitionInfo.DataProperty.StorageMedium)
1617+
1618+
log.Infof("modify partition property sql: %s", sql)
1619+
if err := s.Exec(sql); err != nil {
1620+
errMsg := err.Error()
1621+
// Skip if partition not found (partition may have been dropped)
1622+
if strings.Contains(errMsg, "does not exist") || strings.Contains(errMsg, "not found") {
1623+
log.Warnf("partition %s not found, skip: %v", partitionInfo.PartitionName, err)
1624+
continue
1625+
}
1626+
// For other errors, record and continue to try remaining partitions
1627+
log.Warnf("modify partition %s property failed: %v", partitionInfo.PartitionName, err)
1628+
lastErr = err
1629+
} else {
1630+
successCount++
1631+
}
1632+
}
1633+
1634+
// Return error if any partition modification failed (except partition not found)
1635+
if lastErr != nil {
1636+
return xerror.Wrapf(lastErr, xerror.Normal,
1637+
"modify partition storage medium failed, success: %d, total: %d",
1638+
successCount, len(batchModifyPartitionsInfo.Infos))
1639+
}
1640+
1641+
return nil
1642+
}
15981643

15991644
// Determine whether the error are network related, eg connection refused, connection reset, exposed from net packages.
16001645
func isNetworkRelated(err error) bool {

pkg/ccr/base/specer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type Specer interface {
6666
AddPartition(destTableName string, addPartition *record.AddPartition) error
6767
DropPartition(destTableName string, dropPartition *record.DropPartition) error
6868
RenamePartition(destTableName, oldPartition, newPartition string) error
69+
ModifyPartitionProperty(destTableName string, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error
6970

7071
LightningIndexChange(tableAlias string, changes *record.ModifyTableAddOrDropInvertedIndices) error
7172
BuildIndex(tableAlias string, buildIndex *record.IndexChangeJob) error

pkg/ccr/handle/create_table.go

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package handle
22

33
import (
4+
"fmt"
5+
"regexp"
46
"strings"
57

68
"github.com/selectdb/ccr_syncer/pkg/ccr"
@@ -18,6 +20,70 @@ type CreateTableHandle struct {
1820
IdempotentJobHandle[*record.CreateTable]
1921
}
2022

23+
// Check if error message indicates storage medium or capacity related issues
24+
func isStorageMediumError(errMsg string) bool {
25+
// Doris returns "Failed to find enough backend" for storage/capacity issues
26+
return strings.Contains(strings.ToLower(errMsg), "failed to find enough backend")
27+
}
28+
29+
// Set specific property in CREATE TABLE SQL
30+
func setPropertyInCreateTableSql(createSql string, key string, value string) string {
31+
// Add property to PROPERTIES clause
32+
pattern := `(?i)(PROPERTIES\s*\(\s*)`
33+
replacement := fmt.Sprintf(`${1}"%s" = "%s", `, key, value)
34+
createSql = regexp.MustCompile(pattern).ReplaceAllString(createSql, replacement)
35+
36+
// Clean up trailing comma if PROPERTIES was empty
37+
return ccr.FilterTailingCommaFromCreateTableSql(createSql)
38+
}
39+
40+
// Set specific storage_medium in CREATE TABLE SQL
41+
func setStorageMediumInCreateTableSql(createSql string, medium string) string {
42+
// Remove existing storage_medium first
43+
createSql = ccr.FilterStorageMediumFromCreateTableSql(createSql)
44+
return setPropertyInCreateTableSql(createSql, "storage_medium", medium)
45+
}
46+
47+
// Set specific medium_allocation_mode in CREATE TABLE SQL
48+
func setMediumAllocationModeInCreateTableSql(createSql string, mode string) string {
49+
// Remove existing medium_allocation_mode first
50+
createSql = ccr.FilterMediumAllocationModeFromCreateTableSql(createSql)
51+
return setPropertyInCreateTableSql(createSql, "medium_allocation_mode", mode)
52+
}
53+
54+
// Process CREATE TABLE SQL according to storage medium policy
55+
func processCreateTableSqlByMediumPolicy(j *ccr.Job, createTable *record.CreateTable) {
56+
storageMedium := j.StorageMedium
57+
mediumAllocationMode := j.MediumAllocationMode
58+
59+
// Process storage_medium
60+
switch storageMedium {
61+
case ccr.StorageMediumSameWithUpstream:
62+
// Keep upstream storage_medium unchanged
63+
log.Infof("using same_with_upstream storage medium, keeping original storage_medium")
64+
65+
case ccr.StorageMediumHDD:
66+
log.Infof("using hdd storage medium, setting storage_medium to hdd")
67+
createTable.Sql = setStorageMediumInCreateTableSql(createTable.Sql, "hdd")
68+
69+
case ccr.StorageMediumSSD:
70+
log.Infof("using ssd storage medium, setting storage_medium to ssd")
71+
createTable.Sql = setStorageMediumInCreateTableSql(createTable.Sql, "ssd")
72+
73+
default:
74+
log.Warnf("unknown storage medium: %s, falling back to filter storage_medium", storageMedium)
75+
if ccr.FeatureFilterStorageMedium {
76+
createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(createTable.Sql)
77+
}
78+
}
79+
80+
// Process medium_allocation_mode from CCR job parameter
81+
if mediumAllocationMode != "" {
82+
log.Infof("setting medium_allocation_mode to %s", mediumAllocationMode)
83+
createTable.Sql = setMediumAllocationModeInCreateTableSql(createTable.Sql, mediumAllocationMode)
84+
}
85+
}
86+
2187
func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *record.CreateTable) error {
2288
if j.SyncType != ccr.DBSync {
2389
return xerror.Errorf(xerror.Normal, "invalid sync type: %v", j.SyncType)
@@ -68,9 +134,8 @@ func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *rec
68134
}
69135
}
70136

71-
if ccr.FeatureFilterStorageMedium {
72-
createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(createTable.Sql)
73-
}
137+
// Process SQL according to storage medium policy
138+
processCreateTableSqlByMediumPolicy(j, createTable)
74139
createTable.Sql = ccr.FilterDynamicPartitionStoragePolicyFromCreateTableSql(createTable.Sql)
75140

76141
if ccr.FeatureOverrideReplicationNum() && j.ReplicationNum > 0 {
@@ -79,26 +144,49 @@ func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *rec
79144

80145
if err := j.IDest.CreateTableOrView(createTable, j.Src.Database); err != nil {
81146
errMsg := err.Error()
147+
148+
// Skip unsupported features
82149
if strings.Contains(errMsg, "Can not found function") {
83150
log.Warnf("skip creating table/view because the UDF function is not supported yet: %s", errMsg)
84151
return nil
85-
} else if strings.Contains(errMsg, "Can not find resource") {
152+
}
153+
if strings.Contains(errMsg, "Can not find resource") {
86154
log.Warnf("skip creating table/view for the resource is not supported yet: %s", errMsg)
87155
return nil
88-
} else if createTable.IsCreateView() && strings.Contains(errMsg, "Unknown column") {
156+
}
157+
158+
// Trigger partial snapshot for recoverable errors
159+
if createTable.IsCreateView() && strings.Contains(errMsg, "Unknown column") {
89160
log.Warnf("create view but the column is not found, trigger partial snapshot, commit seq: %d, msg: %s",
90161
commitSeq, errMsg)
91162
replace := false // new view no need to replace
92163
isView := true
93164
return j.NewPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView)
94165
}
95-
if len(createTable.TableName) > 0 && ccr.IsSessionVariableRequired(errMsg) { // ignore doris 2.0.3
166+
if len(createTable.TableName) > 0 && ccr.IsSessionVariableRequired(errMsg) {
96167
log.Infof("a session variable is required to create table %s, force partial snapshot, commit seq: %d, msg: %s",
97168
createTable.TableName, commitSeq, errMsg)
98169
replace := false // new table no need to replace
99170
isView := false
100171
return j.NewPartialSnapshot(createTable.TableId, createTable.TableName, nil, replace, isView)
101172
}
173+
174+
// Storage medium related error: pause job and require manual intervention
175+
if isStorageMediumError(errMsg) {
176+
log.Errorf("create table %s failed due to storage medium issue, job will be paused. "+
177+
"Current storage_medium=%s. Please check target cluster resources or update storage_medium via API. Error: %s",
178+
createTable.TableName, j.StorageMedium, errMsg)
179+
return xerror.Panicf(xerror.Normal,
180+
"Create table failed: storage medium issue for table %s. "+
181+
"Current storage_medium=%s. Possible causes:\n"+
182+
"1. Storage medium (%s) not available on target cluster\n"+
183+
"2. Insufficient disk capacity\n"+
184+
"3. Replication number exceeds available BE nodes\n"+
185+
"Please check target cluster configuration or update storage_medium via /update_storage_medium API. "+
186+
"Original error: %s",
187+
createTable.TableName, j.StorageMedium, j.StorageMedium, errMsg)
188+
}
189+
102190
return xerror.Wrapf(err, xerror.Normal, "create table %d", createTable.TableId)
103191
}
104192

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package handle
2+
3+
import (
4+
"strings"
5+
6+
"github.com/selectdb/ccr_syncer/pkg/ccr"
7+
"github.com/selectdb/ccr_syncer/pkg/ccr/record"
8+
festruct "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/frontendservice"
9+
"github.com/selectdb/ccr_syncer/pkg/xerror"
10+
log "github.com/sirupsen/logrus"
11+
)
12+
13+
func init() {
14+
ccr.RegisterJobHandle[*record.BatchModifyPartitionsInfo](festruct.TBinlogType_MODIFY_PARTITIONS, &ModifyPartitionsHandle{})
15+
}
16+
17+
type ModifyPartitionsHandle struct {
18+
// The modify partitions binlog is idempotent
19+
IdempotentJobHandle[*record.BatchModifyPartitionsInfo]
20+
}
21+
22+
// Filter partitions that have storage medium changes and are not temporary partitions
23+
func filterStorageMediumChanges(infos []*record.ModifyPartitionInfo) []*record.ModifyPartitionInfo {
24+
filtered := make([]*record.ModifyPartitionInfo, 0)
25+
for _, info := range infos {
26+
// Skip temporary partitions (they are not synced)
27+
if info.IsTempPartition {
28+
log.Debugf("skip temporary partition %d", info.PartitionId)
29+
continue
30+
}
31+
// Only process partitions with storage medium specified
32+
if info.DataProperty != nil && info.DataProperty.StorageMedium != "" {
33+
filtered = append(filtered, info)
34+
}
35+
}
36+
return filtered
37+
}
38+
39+
func (h *ModifyPartitionsHandle) Handle(j *ccr.Job, commitSeq int64, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error {
40+
// Skip if using fixed storage medium (hdd/ssd)
41+
// Only process when using same_with_upstream storage medium
42+
if j.StorageMedium == ccr.StorageMediumHDD ||
43+
j.StorageMedium == ccr.StorageMediumSSD {
44+
log.Infof("skip modify partitions for storage_medium is fixed to %s", j.StorageMedium)
45+
return nil
46+
}
47+
48+
// Safety check: ensure we have partition infos to process
49+
if batchModifyPartitionsInfo == nil || len(batchModifyPartitionsInfo.Infos) == 0 {
50+
log.Warnf("batch modify partitions info is empty or nil, skip")
51+
return nil
52+
}
53+
54+
// Filter to only process storage medium changes
55+
filteredInfos := filterStorageMediumChanges(batchModifyPartitionsInfo.Infos)
56+
if len(filteredInfos) == 0 {
57+
log.Infof("no storage medium changes in modify partitions binlog, skip")
58+
return nil
59+
}
60+
log.Infof("processing %d partition storage medium changes out of %d total modifications",
61+
len(filteredInfos), len(batchModifyPartitionsInfo.Infos))
62+
63+
// Update to use filtered infos
64+
batchModifyPartitionsInfo.Infos = filteredInfos
65+
66+
// Get table ID from the first partition info (all partitions should belong to the same table)
67+
tableId := batchModifyPartitionsInfo.GetTableId()
68+
if tableId <= 0 {
69+
log.Warnf("invalid table ID: %d, skip modify partitions", tableId)
70+
return nil
71+
}
72+
73+
// Check if it's a materialized view table
74+
if isAsyncMv, err := j.IsMaterializedViewTable(tableId); err != nil {
75+
return err
76+
} else if isAsyncMv {
77+
log.Infof("skip modify partitions for materialized view table %d", tableId)
78+
return nil
79+
}
80+
81+
// Get destination table name
82+
destTableName, err := j.GetDestNameBySrcId(tableId)
83+
if err != nil {
84+
errMsg := err.Error()
85+
// If table not found in mapping, it may not be synced yet or already dropped
86+
if strings.Contains(errMsg, "not found") || strings.Contains(errMsg, "does not exist") {
87+
log.Warnf("table %d not found in dest, skip modify partitions: %v", tableId, err)
88+
return nil
89+
}
90+
// Other errors (network, etc.): return error to retry
91+
return xerror.Wrapf(err, xerror.Normal, "failed to get dest table name for table %d", tableId)
92+
}
93+
94+
// Execute modify partition property
95+
// Note: ModifyPartition only updates FE metadata, actual BE storage migration is async
96+
// So this SQL won't fail due to backend resource issues
97+
if err := j.Dest.ModifyPartitionProperty(destTableName, batchModifyPartitionsInfo); err != nil {
98+
// Return error to let job framework retry (network issues, etc.)
99+
return xerror.Wrapf(err, xerror.Normal, "modify partition storage medium failed for table %s", destTableName)
100+
}
101+
102+
log.Infof("successfully modified storage medium for %d partitions in table %s",
103+
len(filteredInfos), destTableName)
104+
return nil
105+
}

0 commit comments

Comments
 (0)