Skip to content

Commit bf5a79c

Browse files
committed
fix: table sync should reset the shadow index map (#662)
if the table are full backup & restore.
1 parent 055d1ce commit bf5a79c

File tree

4 files changed

+45
-11
lines changed

4 files changed

+45
-11
lines changed

pkg/ccr/job.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -921,13 +921,17 @@ func (j *Job) partialSync() error {
921921
j.progress.TableMapping[tableId] = destTable.Id
922922
j.progress.NextWithPersist(j.progress.CommitSeq, DBTablesIncrementalSync, Done, "")
923923
case TableSync:
924-
commitSeq, ok := j.progress.TableCommitSeqMap[j.Src.TableId]
925-
if !ok {
926-
return xerror.Errorf(xerror.Normal, "table id %d, commit seq not found", j.Src.TableId)
927-
}
924+
var commitSeq int64
928925
if len(partitions) > 0 {
929926
// Only commit partition commit seq map, instead of the entire table commit seq.
930927
commitSeq = j.progress.CommitSeq
928+
} else if seq, ok := j.progress.TableCommitSeqMap[j.Src.TableId]; !ok {
929+
return xerror.Errorf(xerror.Normal, "table id %d, commit seq not found", j.Src.TableId)
930+
} else {
931+
commitSeq = seq
932+
// The entire table has been restored, clear the partition commit seq map and shadow index.
933+
j.progress.PartitionCommitSeqMap = nil
934+
j.progress.ShadowIndexes = nil
931935
}
932936
j.Dest.TableId = destTable.Id
933937
j.progress.TableMapping = nil

pkg/rpc/rpc_factory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,12 @@ func (rf *RpcFactory) NewFeRpc(spec *base.Spec) (IFeRpc, error) {
6666
rf.feRpcsLock.Lock()
6767
if feRpc, ok := rf.feRpcs[key]; ok {
6868
rf.feRpcsLock.Unlock()
69-
log.Debugf("RpcFactory: reused cached FeRpc for %s (cache hit)", key)
69+
log.Tracef("RpcFactory: reused cached FeRpc for %s (cache hit)", key)
7070
return feRpc, nil
7171
}
7272
rf.feRpcsLock.Unlock()
7373

74-
log.Debugf("RpcFactory: creating new FeRpc for %s (cache miss)", key)
74+
log.Tracef("RpcFactory: creating new FeRpc for %s (cache miss)", key)
7575
feRpc, err := NewFeRpc(spec)
7676
if err != nil {
7777
return nil, err

regression-test/suites/db_sync_dep/index/drop_bf/test_dsd_index_drop_bf.groovy

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,14 @@ suite("test_dsd_index_drop_bf") {
149149
assertTrue(helper.checkShowTimesOf(
150150
""" select * from ${tableName} where username = 'post_drop_unique' """,
151151
has_count(1), 30, "target"))
152+
153+
// Insert some more data to verify downstream is still functional
154+
sql """
155+
INSERT INTO ${tableName} VALUES
156+
(2, 200, "final_user_1", "final_data_1"),
157+
(2, 201, "final_user_2", "final_data_2")
158+
"""
159+
assertTrue(helper.checkShowTimesOf(
160+
""" select * from ${tableName} """,
161+
has_count(insert_num + 5), 60, "target"))
152162
}

regression-test/suites/table_sync_dep/index/drop_bf/test_tsd_index_drop_bf.groovy

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,22 +108,32 @@ suite("test_tsd_index_drop_bf") {
108108
ALTER TABLE ${tableName}
109109
SET ("bloom_filter_columns" = "")
110110
"""
111+
112+
// Insert more data in parallel, to create the shadow index.
113+
def num_values = insert_num
114+
insert_num *= 2
115+
for (int index = num_values; index < insert_num; index++) {
116+
sql """
117+
INSERT INTO ${tableName} VALUES (1, ${index}, 'user_${index}', 'data_${index}')
118+
"""
119+
}
111120
sql "sync"
112121

113122
// Verify bloom filter was removed upstream
114123
assertTrue(helper.checkShowTimesOf("""
115124
SHOW CREATE TABLE ${context.dbName}.${tableName}
116-
""",
125+
""",
117126
noBloomFilter, 30, "sql"))
118127

119128
// 4. Insert more data after dropping bloom filter
120129
sql """
121130
INSERT INTO ${tableName} VALUES
122-
(1, 100, "post_drop_user_1", "extra_data_1"),
123-
(1, 101, "post_drop_user_2", "extra_data_2"),
124-
(1, 102, "post_drop_unique", "extra_data_3")
131+
(2, 100, "post_drop_user_1", "extra_data_1"),
132+
(2, 101, "post_drop_user_2", "extra_data_2"),
133+
(2, 102, "post_drop_unique", "extra_data_3")
125134
"""
126135
sql "sync"
136+
insert_num += 3
127137

128138
// Verify query still works without bloom filter
129139
assertTrue(helper.checkShowTimesOf(
@@ -137,7 +147,7 @@ suite("test_tsd_index_drop_bf") {
137147
// Check all data is synced
138148
assertTrue(helper.checkShowTimesOf(
139149
""" select * from ${tableName} """,
140-
has_count(insert_num + 3), 60, "target"))
150+
has_count(insert_num), 60, "target"))
141151

142152
// Check bloom filter is removed downstream
143153
assertTrue(helper.checkShowTimesOf("""
@@ -149,4 +159,14 @@ suite("test_tsd_index_drop_bf") {
149159
assertTrue(helper.checkShowTimesOf(
150160
""" select * from ${tableName} where username = 'post_drop_unique' """,
151161
has_count(1), 30, "target"))
162+
163+
// Insert some more data to verify downstream is still functional
164+
sql """
165+
INSERT INTO ${tableName} VALUES
166+
(3, 200, "final_user_1", "final_data_1"),
167+
(3, 201, "final_user_2", "final_data_2")
168+
"""
169+
assertTrue(helper.checkShowTimesOf(
170+
""" select * from ${tableName} """,
171+
has_count(insert_num + 2), 60, "target"))
152172
}

0 commit comments

Comments
 (0)