Skip to content

Commit 3de0e77

Browse files
committed
add LocalReplica
1 parent 1798b22 commit 3de0e77

File tree

20 files changed

+229
-92
lines changed

20 files changed

+229
-92
lines changed

fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.doris.catalog.EnvFactory;
3131
import org.apache.doris.catalog.FsBroker;
3232
import org.apache.doris.catalog.Index;
33+
import org.apache.doris.catalog.LocalReplica;
3334
import org.apache.doris.catalog.MaterializedIndex;
3435
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
3536
import org.apache.doris.catalog.MaterializedIndexMeta;
@@ -1537,8 +1538,8 @@ protected Partition resetTabletForRestore(OlapTable localTbl, OlapTable remoteTb
15371538
for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
15381539
for (Long beId : entry.getValue()) {
15391540
long newReplicaId = env.getNextId();
1540-
Replica newReplica = new Replica(newReplicaId, beId, ReplicaState.NORMAL, visibleVersion,
1541-
schemaHash);
1541+
Replica newReplica = new LocalReplica(newReplicaId, beId, ReplicaState.NORMAL,
1542+
visibleVersion, schemaHash);
15421543
newTablet.addReplica(newReplica, true /* is restore */);
15431544
}
15441545
}

fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,11 @@ public Tablet createTablet(long tabletId) {
107107
}
108108

109109
public Replica createReplica() {
110-
return new Replica();
110+
return new LocalReplica();
111111
}
112112

113113
public Replica createReplica(Replica.ReplicaContext context) {
114-
return new Replica(context);
114+
return new LocalReplica(context);
115115
}
116116

117117
public ReplicaAllocation createDefReplicaAllocation() {
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.catalog;
19+
20+
import org.apache.doris.thrift.TUniqueId;
21+
22+
import com.google.gson.annotations.SerializedName;
23+
import org.apache.logging.log4j.LogManager;
24+
import org.apache.logging.log4j.Logger;
25+
26+
public class LocalReplica extends Replica {
27+
private static final Logger LOG = LogManager.getLogger(LocalReplica.class);
28+
29+
@SerializedName(value = "rds", alternate = {"remoteDataSize"})
30+
private volatile long remoteDataSize = 0;
31+
@SerializedName(value = "ris", alternate = {"remoteInvertedIndexSize"})
32+
private Long remoteInvertedIndexSize = 0L;
33+
@SerializedName(value = "rss", alternate = {"remoteSegmentSize"})
34+
private Long remoteSegmentSize = 0L;
35+
36+
private TUniqueId cooldownMetaId;
37+
private long cooldownTerm = -1;
38+
39+
public LocalReplica() {
40+
super();
41+
}
42+
43+
public LocalReplica(ReplicaContext context) {
44+
super(context);
45+
}
46+
47+
// for rollup
48+
// the new replica's version is -1 and last failed version is -1
49+
public LocalReplica(long replicaId, long backendId, int schemaHash, ReplicaState state) {
50+
super(replicaId, backendId, schemaHash, state);
51+
}
52+
53+
// for create tablet and restore
54+
public LocalReplica(long replicaId, long backendId, ReplicaState state, long version, int schemaHash) {
55+
super(replicaId, backendId, state, version, schemaHash);
56+
}
57+
58+
public LocalReplica(long replicaId, long backendId, long version, int schemaHash, long dataSize,
59+
long remoteDataSize, long rowCount, ReplicaState state, long lastFailedVersion, long lastSuccessVersion) {
60+
super(replicaId, backendId, version, schemaHash, dataSize, remoteDataSize, rowCount, state, lastFailedVersion,
61+
lastSuccessVersion);
62+
this.remoteDataSize = remoteDataSize;
63+
}
64+
65+
@Override
66+
public long getRemoteDataSize() {
67+
return remoteDataSize;
68+
}
69+
70+
@Override
71+
public void setRemoteDataSize(long remoteDataSize) {
72+
this.remoteDataSize = remoteDataSize;
73+
}
74+
75+
@Override
76+
public Long getRemoteInvertedIndexSize() {
77+
return remoteInvertedIndexSize;
78+
}
79+
80+
@Override
81+
public void setRemoteInvertedIndexSize(long remoteInvertedIndexSize) {
82+
this.remoteInvertedIndexSize = remoteInvertedIndexSize;
83+
}
84+
85+
@Override
86+
public Long getRemoteSegmentSize() {
87+
return remoteSegmentSize;
88+
}
89+
90+
@Override
91+
public void setRemoteSegmentSize(long remoteSegmentSize) {
92+
this.remoteSegmentSize = remoteSegmentSize;
93+
}
94+
95+
@Override
96+
public TUniqueId getCooldownMetaId() {
97+
return cooldownMetaId;
98+
}
99+
100+
@Override
101+
public void setCooldownMetaId(TUniqueId cooldownMetaId) {
102+
this.cooldownMetaId = cooldownMetaId;
103+
}
104+
105+
@Override
106+
public long getCooldownTerm() {
107+
return cooldownTerm;
108+
}
109+
110+
@Override
111+
public void setCooldownTerm(long cooldownTerm) {
112+
this.cooldownTerm = cooldownTerm;
113+
}
114+
}

fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -969,7 +969,7 @@ && getTableProperty().getDynamicPartitionProperty().getBuckets()
969969
for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) {
970970
for (Long beId : entry3.getValue()) {
971971
long newReplicaId = env.getNextId();
972-
Replica replica = new Replica(newReplicaId, beId, ReplicaState.NORMAL,
972+
Replica replica = new LocalReplica(newReplicaId, beId, ReplicaState.NORMAL,
973973
visibleVersion, schemaHash);
974974
newTablet.addReplica(replica, true /* is restore */);
975975
}

fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,6 @@ public static class ReplicaContext {
9696
private int schemaHash = -1;
9797
@SerializedName(value = "ds", alternate = {"dataSize"})
9898
private volatile long dataSize = 0;
99-
@SerializedName(value = "rds", alternate = {"remoteDataSize"})
100-
private volatile long remoteDataSize = 0;
10199
@SerializedName(value = "rc", alternate = {"rowCount"})
102100
private volatile long rowCount = 0;
103101
@SerializedName(value = "st", alternate = {"state"})
@@ -120,14 +118,6 @@ public static class ReplicaContext {
120118
@Getter
121119
@SerializedName(value = "lss", alternate = {"localSegmentSize"})
122120
private Long localSegmentSize = 0L;
123-
@Setter
124-
@Getter
125-
@SerializedName(value = "ris", alternate = {"remoteInvertedIndexSize"})
126-
private Long remoteInvertedIndexSize = 0L;
127-
@Setter
128-
@Getter
129-
@SerializedName(value = "rss", alternate = {"remoteSegmentSize"})
130-
private Long remoteSegmentSize = 0L;
131121

132122
private volatile long totalVersionCount = -1;
133123
private volatile long visibleVersionCount = -1;
@@ -137,9 +127,6 @@ public static class ReplicaContext {
137127
// bad means this Replica is unrecoverable, and we will delete it
138128
private boolean bad = false;
139129

140-
private TUniqueId cooldownMetaId;
141-
private long cooldownTerm = -1;
142-
143130
// A replica version should increase monotonically,
144131
// but backend may missing some versions due to disk failure or bugs.
145132
// FE should found these and mark the replica as missing versions.
@@ -217,7 +204,6 @@ public Replica(long replicaId, long backendId, long version, int schemaHash,
217204
this.schemaHash = schemaHash;
218205

219206
this.dataSize = dataSize;
220-
this.remoteDataSize = remoteDataSize;
221207
this.rowCount = rowCount;
222208
this.state = state;
223209
if (this.state == null) {
@@ -280,11 +266,33 @@ public void setDataSize(long dataSize) {
280266
}
281267

282268
public long getRemoteDataSize() {
283-
return remoteDataSize;
269+
return 0;
284270
}
285271

286272
public void setRemoteDataSize(long remoteDataSize) {
287-
this.remoteDataSize = remoteDataSize;
273+
if (remoteDataSize > 0) {
274+
throw new UnsupportedOperationException("setRemoteDataSize is not supported in Replica");
275+
}
276+
}
277+
278+
public Long getRemoteInvertedIndexSize() {
279+
return 0L;
280+
}
281+
282+
public void setRemoteInvertedIndexSize(long remoteInvertedIndexSize) {
283+
if (remoteInvertedIndexSize > 0) {
284+
throw new UnsupportedOperationException("setRemoteInvertedIndexSize is not supported in Replica");
285+
}
286+
}
287+
288+
public Long getRemoteSegmentSize() {
289+
return 0L;
290+
}
291+
292+
public void setRemoteSegmentSize(long remoteSegmentSize) {
293+
if (remoteSegmentSize > 0) {
294+
throw new UnsupportedOperationException("setRemoteSegmentSize is not supported in Replica");
295+
}
288296
}
289297

290298
public long getRowCount() {
@@ -344,19 +352,19 @@ public boolean setBad(boolean bad) {
344352
}
345353

346354
public TUniqueId getCooldownMetaId() {
347-
return cooldownMetaId;
355+
return null;
348356
}
349357

350358
public void setCooldownMetaId(TUniqueId cooldownMetaId) {
351-
this.cooldownMetaId = cooldownMetaId;
359+
throw new UnsupportedOperationException("setCooldownMetaId is not supported in Replica");
352360
}
353361

354362
public long getCooldownTerm() {
355-
return cooldownTerm;
363+
return -1;
356364
}
357365

358366
public void setCooldownTerm(long cooldownTerm) {
359-
this.cooldownTerm = cooldownTerm;
367+
throw new UnsupportedOperationException("setCooldownTerm is not supported in Replica");
360368
}
361369

362370
public boolean needFurtherRepair() {

fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.catalog.Database;
2121
import org.apache.doris.catalog.Env;
22+
import org.apache.doris.catalog.LocalReplica;
2223
import org.apache.doris.catalog.MaterializedIndex;
2324
import org.apache.doris.catalog.OlapTable;
2425
import org.apache.doris.catalog.Partition;
@@ -1017,7 +1018,7 @@ public CloneTask createCloneReplicaAndTask() throws SchedException {
10171018
|| tabletHealth.status == TabletStatus.REPLICA_RELOCATING || type == Type.BALANCE
10181019
|| tabletHealth.status == TabletStatus.COLOCATE_MISMATCH
10191020
|| tabletHealth.status == TabletStatus.REPLICA_MISSING_FOR_TAG) {
1020-
replica = new Replica(
1021+
replica = new LocalReplica(
10211022
Env.getCurrentEnv().getNextId(), destBackendId,
10221023
-1 /* version */, schemaHash,
10231024
-1 /* data size */, -1, -1 /* row count */,

fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.doris.catalog.JdbcTable;
5050
import org.apache.doris.catalog.KeysType;
5151
import org.apache.doris.catalog.ListPartitionItem;
52+
import org.apache.doris.catalog.LocalReplica;
5253
import org.apache.doris.catalog.MTMV;
5354
import org.apache.doris.catalog.MaterializedIndex;
5455
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
@@ -1112,7 +1113,7 @@ private void unprotectAddReplica(OlapTable olapTable, ReplicaPersistInfo info) {
11121113
schemaHash = olapTable.getSchemaHashByIndexId(info.getIndexId());
11131114
}
11141115

1115-
Replica replica = new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash,
1116+
Replica replica = new LocalReplica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash,
11161117
info.getDataSize(),
11171118
info.getRemoteDataSize(), info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(),
11181119
info.getLastSuccessVersion());
@@ -3350,7 +3351,7 @@ public TStorageMedium createTablets(MaterializedIndex index, ReplicaState replic
33503351
for (List<Long> backendIds : chosenBackendIds.values()) {
33513352
for (long backendId : backendIds) {
33523353
long replicaId = idGeneratorBuffer.getNextId();
3353-
Replica replica = new Replica(replicaId, backendId, replicaState, version,
3354+
Replica replica = new LocalReplica(replicaId, backendId, replicaState, version,
33543355
tabletMeta.getOldSchemaHash());
33553356
tablet.addReplica(replica);
33563357
totalReplicaNum++;

fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.doris.catalog.DiskInfo;
2626
import org.apache.doris.catalog.Env;
2727
import org.apache.doris.catalog.Index;
28+
import org.apache.doris.catalog.LocalReplica;
2829
import org.apache.doris.catalog.MaterializedIndex;
2930
import org.apache.doris.catalog.MaterializedIndex.IndexState;
3031
import org.apache.doris.catalog.MaterializedIndexMeta;
@@ -1573,7 +1574,7 @@ private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletI
15731574
}
15741575

15751576
// use replicaId reported by BE to maintain replica meta consistent between FE and BE
1576-
Replica replica = new Replica(replicaId, backendId, version, schemaHash,
1577+
Replica replica = new LocalReplica(replicaId, backendId, version, schemaHash,
15771578
dataSize, remoteDataSize, rowCount, ReplicaState.NORMAL,
15781579
lastFailedVersion, version);
15791580
tablet.addReplica(replica);

fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.apache.doris.catalog.JdbcTable;
8686
import org.apache.doris.catalog.ListPartitionInfo;
8787
import org.apache.doris.catalog.ListPartitionItem;
88+
import org.apache.doris.catalog.LocalReplica;
8889
import org.apache.doris.catalog.LocalTablet;
8990
import org.apache.doris.catalog.MTMV;
9091
import org.apache.doris.catalog.MapType;
@@ -522,8 +523,7 @@ public class GsonUtils {
522523
// runtime adapter for class "CloudReplica".
523524
private static RuntimeTypeAdapterFactory<Replica> replicaTypeAdapterFactory = RuntimeTypeAdapterFactory
524525
.of(Replica.class, "clazz")
525-
.registerDefaultSubtype(Replica.class)
526-
.registerSubtype(Replica.class, Replica.class.getSimpleName())
526+
.registerSubtype(LocalReplica.class, LocalReplica.class.getSimpleName())
527527
.registerSubtype(CloudReplica.class, CloudReplica.class.getSimpleName());
528528

529529
private static RuntimeTypeAdapterFactory<Tablet> tabletTypeAdapterFactory;
@@ -536,9 +536,12 @@ public class GsonUtils {
536536
if (Config.isNotCloudMode()) {
537537
tabletTypeAdapterFactory.registerDefaultSubtype(LocalTablet.class);
538538
tabletTypeAdapterFactory.registerCompatibleSubtype(LocalTablet.class, Tablet.class.getSimpleName());
539+
replicaTypeAdapterFactory.registerDefaultSubtype(LocalReplica.class);
540+
replicaTypeAdapterFactory.registerCompatibleSubtype(LocalReplica.class, Replica.class.getSimpleName());
539541
} else {
540542
// compatible with old cloud code.
541543
tabletTypeAdapterFactory.registerDefaultSubtype(CloudTablet.class);
544+
replicaTypeAdapterFactory.registerDefaultSubtype(CloudReplica.class);
542545
}
543546
}
544547

0 commit comments

Comments
 (0)