Skip to content

Commit 234e36c

Browse files
committed
[Enhancement] add support for restore to ccr
1 parent 5d3bc2b commit 234e36c

File tree

8 files changed

+146
-12
lines changed

8 files changed

+146
-12
lines changed

fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ private void restore(Repository repository, Database db, RestoreStmt stmt) throw
570570
env, repository.getId());
571571
}
572572

573-
env.getEditLog().logRestoreJob(restoreJob);
573+
env.getEditLog().logRestoreJob(restoreJob, null);
574574

575575
// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
576576
addBackupOrRestoreJob(db.getId(), restoreJob);
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.backup;
19+
20+
import org.apache.doris.persist.gson.GsonUtils;
21+
22+
import com.google.common.collect.Maps;
23+
import com.google.gson.annotations.SerializedName;
24+
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.stream.Collectors;
28+
29+
public class RestoreBinLogInfo {
30+
// currently we are sending only DB and table info.
31+
// partitions level restore not possible since, there can be
32+
// race condition when two partition recover and ccr-syncer try to sync it.
33+
@SerializedName(value = "dbId")
34+
private long dbId;
35+
@SerializedName(value = "dbName")
36+
private String dbName;
37+
@SerializedName(value = "tableInfo")
38+
// map of tableId and TableName.
39+
private Map<Long, String> tableInfo = Maps.newHashMap();
40+
41+
/*
42+
* constuctor
43+
*/
44+
public RestoreBinLogInfo(long dbId, String dbName) {
45+
this.dbId = dbId;
46+
this.dbName = dbName;
47+
}
48+
49+
public void addTableInfo(long tableId, String tableName) {
50+
tableInfo.put(tableId, tableName);
51+
}
52+
53+
public long getDbId() {
54+
return dbId;
55+
}
56+
57+
public List<Long> getTableIdList() {
58+
return tableInfo.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
59+
}
60+
61+
public String toJson() {
62+
return GsonUtils.GSON.toJson(this);
63+
}
64+
65+
public static RestoreBinLogInfo fromJson(String json) {
66+
return GsonUtils.GSON.fromJson(json, RestoreBinLogInfo.class);
67+
}
68+
}

fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1638,7 +1638,7 @@ private void waitingAllSnapshotsFinished() {
16381638
snapshotFinishedTime = System.currentTimeMillis();
16391639
state = RestoreJobState.DOWNLOAD;
16401640

1641-
env.getEditLog().logRestoreJob(this);
1641+
env.getEditLog().logRestoreJob(this, null);
16421642
LOG.info("finished making snapshots. {}", this);
16431643
return;
16441644
}
@@ -1981,7 +1981,7 @@ private void waitingAllDownloadFinished() {
19811981
// backupMeta is useless now
19821982
backupMeta = null;
19831983

1984-
env.getEditLog().logRestoreJob(this);
1984+
env.getEditLog().logRestoreJob(this, null);
19851985
LOG.info("finished to download. {}", this);
19861986
}
19871987

@@ -2036,6 +2036,7 @@ private Status allTabletCommitted(boolean isReplay) {
20362036
return new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist");
20372037
}
20382038

2039+
RestoreBinLogInfo restoreBinLogInfo = new RestoreBinLogInfo(dbId, db.getName());
20392040
// replace the origin tables in atomic.
20402041
if (isAtomicRestore) {
20412042
Status st = atomicReplaceOlapTables(db, isReplay);
@@ -2097,6 +2098,7 @@ private Status allTabletCommitted(boolean isReplay) {
20972098
}
20982099

20992100
if (!isReplay) {
2101+
restoredTbls.stream().forEach(tbl -> restoreBinLogInfo.addTableInfo(tbl.getId(), tbl.getName()));
21002102
restoredPartitions.clear();
21012103
restoredTbls.clear();
21022104
restoredResources.clear();
@@ -2111,7 +2113,7 @@ private Status allTabletCommitted(boolean isReplay) {
21112113
finishedTime = System.currentTimeMillis();
21122114
state = RestoreJobState.FINISHED;
21132115

2114-
env.getEditLog().logRestoreJob(this);
2116+
env.getEditLog().logRestoreJob(this, restoreBinLogInfo);
21152117
}
21162118

21172119
LOG.info("job is finished. is replay: {}. {}", isReplay, this);
@@ -2383,7 +2385,7 @@ private void cancelInternal(boolean isReplay) {
23832385
finishedTime = System.currentTimeMillis();
23842386
state = RestoreJobState.CANCELLED;
23852387
// log
2386-
env.getEditLog().logRestoreJob(this);
2388+
env.getEditLog().logRestoreJob(this, null);
23872389

23882390
LOG.info("finished to cancel restore job. current state: {}. is replay: {}. {}",
23892391
curState.name(), isReplay, this);

fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.alter.AlterJobV2;
2121
import org.apache.doris.alter.IndexChangeJob;
22+
import org.apache.doris.backup.RestoreBinLogInfo;
2223
import org.apache.doris.catalog.Database;
2324
import org.apache.doris.catalog.Env;
2425
import org.apache.doris.common.Config;
@@ -328,6 +329,17 @@ public void addTruncateTable(TruncateTableInfo info, long commitSeq) {
328329
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record);
329330
}
330331

332+
public void addRestoreInfo(RestoreBinLogInfo info, long commitSeq) {
333+
long dbId = info.getDbId();
334+
List<Long> tableIds = info.getTableIdList();
335+
long timestamp = -1;
336+
TBinlogType type = TBinlogType.RESTORE_INFO;
337+
String data = info.toJson();
338+
339+
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
340+
}
341+
342+
331343
public void addTableRename(TableInfo info, long commitSeq) {
332344
long dbId = info.getDbId();
333345
List<Long> tableIds = Lists.newArrayList();

fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.doris.analysis.UserIdentity;
2424
import org.apache.doris.backup.BackupJob;
2525
import org.apache.doris.backup.Repository;
26+
import org.apache.doris.backup.RestoreBinLogInfo;
2627
import org.apache.doris.backup.RestoreJob;
2728
import org.apache.doris.binlog.AddPartitionRecord;
2829
import org.apache.doris.binlog.CreateTableRecord;
@@ -1689,8 +1690,12 @@ public void logAlterRepository(Repository repo) {
16891690
logEdit(OperationType.OP_ALTER_REPOSITORY, repo);
16901691
}
16911692

1692-
public void logRestoreJob(RestoreJob job) {
1693-
logEdit(OperationType.OP_RESTORE_JOB, job);
1693+
public void logRestoreJob(RestoreJob job, RestoreBinLogInfo binInfo) {
1694+
long logId = logEdit(OperationType.OP_RESTORE_JOB, job);
1695+
if (binInfo != null) {
1696+
LOG.info("log restore info, logId:{}, infos: {}", logId, binInfo.toJson());
1697+
Env.getCurrentEnv().getBinlogManager().addRestoreInfo(binInfo, logId);
1698+
}
16941699
}
16951700

16961701
public void logUpdateUserProperty(UserPropertyInfo propertyInfo) {

gensrc/thrift/FrontendService.thrift

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,7 +1199,8 @@ enum TBinlogType {
11991199
RENAME_ROLLUP = 21,
12001200
RENAME_PARTITION = 22,
12011201
DROP_ROLLUP = 23,
1202-
1202+
RECOVER_INFO = 24,
1203+
RESTORE_INFO = 25,
12031204
// Keep some IDs for allocation so that when new binlog types are added in the
12041205
// future, the changes can be picked back to the old versions without breaking
12051206
// compatibility.
@@ -1215,9 +1216,7 @@ enum TBinlogType {
12151216
// MODIFY_XXX = 17,
12161217
// MIN_UNKNOWN = 18,
12171218
// UNKNOWN_3 = 19,
1218-
MIN_UNKNOWN = 24,
1219-
UNKNOWN_9 = 25,
1220-
UNKNOWN_10 = 26,
1219+
MIN_UNKNOWN = 26,
12211220
UNKNOWN_11 = 27,
12221221
UNKNOWN_12 = 28,
12231222
UNKNOWN_13 = 29,

regression-test/conf/regression-conf.groovy

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,3 +259,20 @@ lakesoulMinioEndpoint="*******"
259259
metaServiceToken = "greedisgood9999"
260260
instanceId = "default_instance_id"
261261
multiClusterInstance = "default_instance_id"
262+
263+
// JDBC configuration
264+
jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?"
265+
targetJdbcUrl = "jdbc:mysql://127.0.0.1:9190/?"
266+
jdbcUser = "root"
267+
jdbcPassword = ""
268+
feSourceThriftAddress = "127.0.0.1:9020"
269+
feTargetThriftAddress = "127.0.0.1:9020"
270+
syncerAddress = "127.0.0.1:9190"
271+
feSyncerUser = "root"
272+
feSyncerPassword = ""
273+
feHttpAddress = "127.0.0.1:8330"
274+
// CCR configuration
275+
ccrDownstreamUrl = "jdbc:mysql://127.0.0.1:9030/?"
276+
ccrDownstreamUser = "root"
277+
ccrDownstreamPassword = ""
278+
ccrDownstreamFeThriftAddress = "127.0.0.1:9020"

regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -947,4 +947,35 @@ class Syncer {
947947
)
948948
"""
949949
}
950-
}
950+
951+
void createMinoIoRepository(String name, boolean readOnly = false) {
952+
String ak = "xuYlrTbIGdz8oA8HeHSa"
953+
String sk = "6n7S3PFkDOBDSJiwQLURNywVwT20ptqOAY3wrOSF"
954+
String endpoint = "http://127.0.0.1:9000"
955+
String region = suite.getS3Region()
956+
String bucket = "test"
957+
958+
959+
suite.try_sql "DROP REPOSITORY `${name}`"
960+
suite.sql """
961+
CREATE ${readOnly ? "READ ONLY" : ""} REPOSITORY `${name}`
962+
WITH S3
963+
ON LOCATION "s3://${bucket}/${name}"
964+
PROPERTIES
965+
(
966+
"s3.endpoint" = "http://127.0.0.1:9000",
967+
"s3.access_key" = "${ak}",
968+
"s3.secret_key" = "${sk}",
969+
"s3.root"= "${name}",
970+
"s3.use_path_style" = "true",
971+
"s3.region" = "${region}"
972+
)
973+
"""
974+
}
975+
976+
void disableDbBinlog() {
977+
suite.sql """
978+
ALTER DATABASE ${context.dbName} SET properties ("binlog.enable" = "false")
979+
"""
980+
}
981+
}

0 commit comments

Comments
 (0)