diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java index 86aeac885dab0e..57b980152dba12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java @@ -17,6 +17,8 @@ package org.apache.doris.catalog; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.thrift.TUniqueId; import com.google.gson.annotations.SerializedName; @@ -26,6 +28,9 @@ public class LocalReplica extends Replica { private static final Logger LOG = LogManager.getLogger(LocalReplica.class); + @SerializedName(value = "bid", alternate = {"backendId"}) + private long backendId; + @SerializedName(value = "rds", alternate = {"remoteDataSize"}) private volatile long remoteDataSize = 0; @SerializedName(value = "ris", alternate = {"remoteInvertedIndexSize"}) @@ -33,6 +38,70 @@ public class LocalReplica extends Replica { @SerializedName(value = "rss", alternate = {"remoteSegmentSize"}) private Long remoteSegmentSize = 0L; + // the last load failed version + @SerializedName(value = "lfv", alternate = {"lastFailedVersion"}) + private long lastFailedVersion = -1L; + // not serialized, not very important + private long lastFailedTimestamp = 0; + // the last load successful version + @SerializedName(value = "lsv", alternate = {"lastSuccessVersion"}) + private long lastSuccessVersion = -1L; + + private volatile long totalVersionCount = -1; + private volatile long visibleVersionCount = -1; + + private long pathHash = -1; + + // bad means this Replica is unrecoverable, and we will delete it + private boolean bad = false; + + // A replica version should increase monotonically, + // but backend may missing some versions due to disk failure or bugs. + // FE should found these and mark the replica as missing versions. + // If backend's report version < fe version, record the backend's report version as `regressiveVersion`, + // and if time exceed 5min, fe should mark this replica as missing versions. + private long regressiveVersion = -1; + private long regressiveVersionTimestamp = 0; + + /* + * This can happen when this replica is created by a balance clone task, and + * when task finished, the version of this replica is behind the partition's visible version. + * So this replica need a further repair. + * If we do not do this, this replica will be treated as version stale, and will be removed, + * so that the balance task is failed, which is unexpected. + * + * furtherRepairSetTime and leftFurtherRepairCount are set alone with needFurtherRepair. + * This is an insurance, in case that further repair task always fail. If 20 min passed + * since we set needFurtherRepair to true, the 'needFurtherRepair' will be set to false. + */ + private long furtherRepairSetTime = -1; + private int leftFurtherRepairCount = 0; + + // During full clone, the replica's state is CLONE, it will not load the data. + // After full clone finished, even if the replica's version = partition's visible version, + // + // notice: furtherRepairWatermarkTxnTd is used to clone a replica, protected it from be removed. + // + private long furtherRepairWatermarkTxnTd = -1; + + /* Decommission a backend B, steps are as follow: + * 1. wait peer backends catchup with B; + * 2. B change state to DECOMMISSION, set preWatermarkTxnId. B can load data now. + * 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId. B can't load data now. + * 4. wait txn before postWatermarkTxnId finished, delete B. + * + * notice: preWatermarkTxnId and postWatermarkTxnId are used to delete this replica. + * + */ + private long preWatermarkTxnId = -1; + private long postWatermarkTxnId = -1; + + private long userDropTime = -1; + + private long scaleInDropTime = -1; + + private long lastReportVersion = 0; + private TUniqueId cooldownMetaId; private long cooldownTerm = -1; @@ -41,27 +110,92 @@ public LocalReplica() { } public LocalReplica(ReplicaContext context) { - super(context); + this(context.replicaId, context.backendId, context.state, context.version, context.schemaHash); } // for rollup // the new replica's version is -1 and last failed version is -1 public LocalReplica(long replicaId, long backendId, int schemaHash, ReplicaState state) { - super(replicaId, backendId, schemaHash, state); + this(replicaId, backendId, -1, schemaHash, 0L, 0L, 0L, state, -1, -1); } // for create tablet and restore public LocalReplica(long replicaId, long backendId, ReplicaState state, long version, int schemaHash) { - super(replicaId, backendId, state, version, schemaHash); + this(replicaId, backendId, version, schemaHash, 0L, 0L, 0L, state, -1L, version); } public LocalReplica(long replicaId, long backendId, long version, int schemaHash, long dataSize, long remoteDataSize, long rowCount, ReplicaState state, long lastFailedVersion, long lastSuccessVersion) { super(replicaId, backendId, version, schemaHash, dataSize, remoteDataSize, rowCount, state, lastFailedVersion, lastSuccessVersion); + this.backendId = backendId; + this.lastFailedVersion = lastFailedVersion; + if (this.lastFailedVersion > 0) { + this.lastFailedTimestamp = System.currentTimeMillis(); + } + if (lastSuccessVersion < this.version) { + this.lastSuccessVersion = this.version; + } else { + this.lastSuccessVersion = lastSuccessVersion; + } this.remoteDataSize = remoteDataSize; } + @Override + public long getBackendId() { + return this.backendId; + } + + @Override + protected long getBackendIdValue() { + return this.backendId; + } + + // just for ut + @Override + public void setBackendId(long backendId) { + this.backendId = backendId; + } + + @Override + public long getLastFailedVersion() { + return lastFailedVersion; + } + + @Override + public long getLastFailedTimestamp() { + return lastFailedTimestamp; + } + + @Override + public long getLastSuccessVersion() { + return lastSuccessVersion; + } + + @Override + public long getPathHash() { + return pathHash; + } + + @Override + public void setPathHash(long pathHash) { + this.pathHash = pathHash; + } + + @Override + public boolean isBad() { + return bad; + } + + @Override + public boolean setBad(boolean bad) { + if (this.bad == bad) { + return false; + } + this.bad = bad; + return true; + } + @Override public long getRemoteDataSize() { return remoteDataSize; @@ -111,4 +245,322 @@ public long getCooldownTerm() { public void setCooldownTerm(long cooldownTerm) { this.cooldownTerm = cooldownTerm; } + + @Override + public boolean needFurtherRepair() { + return leftFurtherRepairCount > 0 + && System.currentTimeMillis() < furtherRepairSetTime + + Config.tablet_further_repair_timeout_second * 1000; + } + + @Override + public void setNeedFurtherRepair(boolean needFurtherRepair) { + if (needFurtherRepair) { + furtherRepairSetTime = System.currentTimeMillis(); + leftFurtherRepairCount = Config.tablet_further_repair_max_times; + } else { + leftFurtherRepairCount = 0; + furtherRepairSetTime = -1; + } + } + + @Override + public void incrFurtherRepairCount() { + leftFurtherRepairCount--; + } + + @Override + public int getLeftFurtherRepairCount() { + return leftFurtherRepairCount; + } + + @Override + public long getFurtherRepairWatermarkTxnTd() { + return furtherRepairWatermarkTxnTd; + } + + @Override + public void setFurtherRepairWatermarkTxnTd(long furtherRepairWatermarkTxnTd) { + this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd; + } + + @Override + public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion, + long updateTime) { + long oldLastFailedVersion = this.lastFailedVersion; + if (version != null) { + this.version = version; + } + if (lastSuccessVersion != null) { + this.lastSuccessVersion = lastSuccessVersion; + } + if (lastFailedVersion != null) { + if (this.lastFailedVersion < lastFailedVersion) { + this.lastFailedTimestamp = updateTime; + } + this.lastFailedVersion = lastFailedVersion; + } + if (this.lastFailedVersion < this.version) { + this.lastFailedVersion = -1; + this.lastFailedTimestamp = -1; + } + if (this.lastFailedVersion > 0 + && this.lastSuccessVersion > this.lastFailedVersion) { + this.lastSuccessVersion = this.version; + } + if (this.lastSuccessVersion < this.version) { + this.lastSuccessVersion = this.version; + } + if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) { + LOG.info("change replica last failed version from '< 0' to '> 0', replica {}, old last failed version {}", + this, oldLastFailedVersion); + } else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) { + LOG.info("change replica last failed version from '> 0' to '< 0', replica {}, old last failed version {}", + this, oldLastFailedVersion); + } + } + + /* last failed version: LFV + * last success version: LSV + * version: V + * + * Case 1: + * If LFV > LSV, set LSV back to V, which indicates that version between LSV and LFV is invalid. + * Clone task will clone the version between LSV and LFV + * + * Case 2: + * LFV changed, set LSV back to V. This is just same as Case 1. Cause LFV must large than LSV. + * + * Case 3: + * LFV remains unchanged, just update LSV, and then check if it falls into Case 1. + * + * Case 4: + * V is larger or equal to LFV, reset LFV. And if V is less than LSV, just set V to LSV. This may + * happen when a clone task finished and report version V, but the LSV is already larger than V, + * And we know that version between V and LSV is valid, so move V forward to LSV. + * + * Case 5: + * This is a bug case, I don't know why, may be some previous version introduce it. It looks like + * the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number. + * We just reset the LFV(hash) to recovery this replica. + */ + @Override + protected void updateReplicaVersion(long newVersion, long lastFailedVersion, long lastSuccessVersion) { + if (LOG.isDebugEnabled()) { + LOG.debug("before update: {}", this.toString()); + } + + if (newVersion < this.version) { + // This case means that replica meta version has been updated by ReportHandler before + // For example, the publish version daemon has already sent some publish version tasks + // to one be to publish version 2, 3, 4, 5, 6, and the be finish all publish version tasks, + // the be's replica version is 6 now, but publish version daemon need to wait + // for other be to finish most of publish version tasks to update replica version in fe. + // At the moment, the replica version in fe is 4, when ReportHandler sync tablet, + // it find reported replica version in be is 6 and then set version to 6 for replica in fe. + // And then publish version daemon try to finish txn, and use visible version(5) + // to update replica. Finally, it find the newer version(5) is lower than replica version(6) in fe. + if (LOG.isDebugEnabled()) { + LOG.debug("replica {} on backend {}'s new version {} is lower than meta version {}," + + "not to continue to update replica", getId(), getBackendIdValue(), newVersion, this.version); + } + return; + } + + this.version = newVersion; + + long oldLastFailedVersion = this.lastFailedVersion; + + // just check it + if (lastSuccessVersion <= this.version) { + lastSuccessVersion = this.version; + } + + // case 1: + if (this.lastSuccessVersion <= this.lastFailedVersion) { + this.lastSuccessVersion = this.version; + } + + // TODO: this case is unknown, add log to observe + if (this.version > lastFailedVersion && lastFailedVersion > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("current version {} is larger than last failed version {}, " + + "maybe a fatal error or be report version, print a stack here ", + this.version, lastFailedVersion, new Exception()); + } + } + + if (lastFailedVersion != this.lastFailedVersion) { + // Case 2: + if (lastFailedVersion > this.lastFailedVersion || lastFailedVersion < 0) { + this.lastFailedVersion = lastFailedVersion; + this.lastFailedTimestamp = lastFailedVersion > 0 ? System.currentTimeMillis() : -1L; + } + + this.lastSuccessVersion = this.version; + } else { + // Case 3: + if (lastSuccessVersion >= this.lastSuccessVersion) { + this.lastSuccessVersion = lastSuccessVersion; + } + if (lastFailedVersion >= this.lastSuccessVersion) { + this.lastSuccessVersion = this.version; + } + } + + // Case 4: + if (this.version >= this.lastFailedVersion) { + this.lastFailedVersion = -1; + this.lastFailedTimestamp = -1; + if (this.version < this.lastSuccessVersion) { + this.version = this.lastSuccessVersion; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("after update {}", this.toString()); + } + + if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) { + LOG.info("change replica last failed version from '< 0' to '> 0', replica {}, old last failed version {}", + this, oldLastFailedVersion); + } else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) { + LOG.info("change replica last failed version from '> 0' to '< 0', replica {}, old last failed version {}", + this, oldLastFailedVersion); + } + + } + + /* + * If a replica is overwritten by a restore job, we need to reset version and lastSuccessVersion to + * the restored replica version + */ + @Override + public void updateVersionForRestore(long version) { + this.version = version; + this.lastSuccessVersion = version; + } + + @Override + public boolean tooBigVersionCount() { + return visibleVersionCount >= Config.min_version_count_indicate_replica_compaction_too_slow; + } + + @Override + public long getTotalVersionCount() { + return totalVersionCount; + } + + @Override + public void setTotalVersionCount(long totalVersionCount) { + this.totalVersionCount = totalVersionCount; + } + + @Override + public long getVisibleVersionCount() { + return visibleVersionCount; + } + + @Override + public void setVisibleVersionCount(long visibleVersionCount) { + this.visibleVersionCount = visibleVersionCount; + } + + @Override + public boolean checkVersionRegressive(long newVersion) { + if (newVersion >= version) { + regressiveVersion = -1; + regressiveVersionTimestamp = -1; + return false; + } + + if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) { + return true; + } + + if (newVersion != regressiveVersion) { + regressiveVersion = newVersion; + regressiveVersionTimestamp = System.currentTimeMillis(); + } + + return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 60 * 1000L; + } + + @Override + public boolean equals(Object obj) { + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof LocalReplica)) { + return false; + } + LocalReplica replica = (LocalReplica) obj; + return (backendId == replica.backendId) + && (lastFailedVersion == replica.lastFailedVersion) + && (lastSuccessVersion == replica.lastSuccessVersion); + } + + @Override + public void setPreWatermarkTxnId(long preWatermarkTxnId) { + this.preWatermarkTxnId = preWatermarkTxnId; + } + + @Override + public long getPreWatermarkTxnId() { + return preWatermarkTxnId; + } + + @Override + public void setPostWatermarkTxnId(long postWatermarkTxnId) { + this.postWatermarkTxnId = postWatermarkTxnId; + } + + @Override + public long getPostWatermarkTxnId() { + return postWatermarkTxnId; + } + + @Override + public void setUserDropTime(long userDropTime) { + this.userDropTime = userDropTime; + } + + @Override + public boolean isUserDrop() { + if (userDropTime > 0) { + if (System.currentTimeMillis() - userDropTime < Config.manual_drop_replica_valid_second * 1000L) { + return true; + } + userDropTime = -1; + } + + return false; + } + + @Override + public void setScaleInDropTimeStamp(long scaleInDropTime) { + this.scaleInDropTime = scaleInDropTime; + } + + @Override + public boolean isScaleInDrop() { + if (this.scaleInDropTime > 0) { + if (System.currentTimeMillis() - this.scaleInDropTime + < Config.manual_drop_replica_valid_second * 1000L) { + return true; + } + this.scaleInDropTime = -1; + } + return false; + } + + @Override + public void setLastReportVersion(long version) { + this.lastReportVersion = version; + } + + @Override + public long getLastReportVersion() { + return lastReportVersion; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java index 6dfb15b633c95e..0531d499c56209 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java @@ -35,11 +35,33 @@ public class LocalTablet extends Tablet { private long cooldownTerm = -1; private final Object cooldownConfLock = new Object(); + @SerializedName(value = "cv", alternate = {"checkedVersion"}) + private long checkedVersion; + @SerializedName(value = "ic", alternate = {"isConsistent"}) + private boolean isConsistent; + + // last time that the tablet checker checks this tablet. + // no need to persist + private long lastStatusCheckTime = -1; + + // last time for load data fail + private long lastLoadFailedTime = -1; + + // if tablet want to add a new replica, but cann't found any backend to locate the new replica. + // then mark this tablet. For later repair, even try and try to repair this tablet, sched will always fail. + // For example, 1 tablet contains 3 replicas, if 1 backend is dead, then tablet's healthy status + // is REPLICA_MISSING. But since no other backend can held the new replica, then sched always fail. + // So don't increase this tablet's sched priority if it has no path for new replica. + private long lastTimeNoPathForNewReplica = -1; + public LocalTablet() { + this(0); } public LocalTablet(long tabletId) { super(tabletId); + checkedVersion = -1L; + isConsistent = true; } @Override @@ -65,7 +87,6 @@ public Pair getCooldownConf() { @Override public long getRemoteDataSize() { // if CooldownReplicaId is not init - // [fix](fe) move some variables from Tablet to LocalTablet which are not used in CloudTablet if (cooldownReplicaId <= 0) { return 0; } @@ -77,4 +98,54 @@ public long getRemoteDataSize() { // return replica with max remoteDataSize return replicas.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize(); } + + @Override + public long getCheckedVersion() { + return this.checkedVersion; + } + + @Override + public void setCheckedVersion(long checkedVersion) { + this.checkedVersion = checkedVersion; + } + + @Override + public void setIsConsistent(boolean good) { + this.isConsistent = good; + } + + @Override + public boolean isConsistent() { + return isConsistent; + } + + @Override + protected long getLastStatusCheckTime() { + return lastStatusCheckTime; + } + + @Override + public void setLastStatusCheckTime(long lastStatusCheckTime) { + this.lastStatusCheckTime = lastStatusCheckTime; + } + + @Override + public long getLastLoadFailedTime() { + return lastLoadFailedTime; + } + + @Override + public void setLastLoadFailedTime(long lastLoadFailedTime) { + this.lastLoadFailedTime = lastLoadFailedTime; + } + + @Override + protected long getLastTimeNoPathForNewReplica() { + return lastTimeNoPathForNewReplica; + } + + @Override + public void setLastTimeNoPathForNewReplica(long lastTimeNoPathForNewReplica) { + this.lastTimeNoPathForNewReplica = lastTimeNoPathForNewReplica; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 58ebdf4f5f3be1..e1aa322b2832ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -17,9 +17,7 @@ package org.apache.doris.catalog; -import org.apache.doris.common.Config; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TUniqueId; @@ -37,7 +35,6 @@ */ public class Replica { private static final Logger LOG = LogManager.getLogger(Replica.class); - public static final VersionComparator VERSION_DESC_COMPARATOR = new VersionComparator(); public static final LastSuccessVersionComparator LAST_SUCCESS_VERSION_COMPARATOR = new LastSuccessVersionComparator(); public static final IdComparator ID_COMPARATOR = new IdComparator(); @@ -88,11 +85,9 @@ public static class ReplicaContext { @SerializedName(value = "id") private long id; - @SerializedName(value = "bid", alternate = {"backendId"}) - private long backendId; // the version could be queried @SerializedName(value = "v", alternate = {"version"}) - private volatile long version; + protected volatile long version; private int schemaHash = -1; @SerializedName(value = "ds", alternate = {"dataSize"}) private volatile long dataSize = 0; @@ -101,15 +96,6 @@ public static class ReplicaContext { @SerializedName(value = "st", alternate = {"state"}) private volatile ReplicaState state; - // the last load failed version - @SerializedName(value = "lfv", alternate = {"lastFailedVersion"}) - private long lastFailedVersion = -1L; - // not serialized, not very important - private long lastFailedTimestamp = 0; - // the last load successful version - @SerializedName(value = "lsv", alternate = {"lastSuccessVersion"}) - private long lastSuccessVersion = -1L; - @Setter @Getter @SerializedName(value = "lis", alternate = {"localInvertedIndexSize"}) @@ -119,63 +105,6 @@ public static class ReplicaContext { @SerializedName(value = "lss", alternate = {"localSegmentSize"}) private Long localSegmentSize = 0L; - private volatile long totalVersionCount = -1; - private volatile long visibleVersionCount = -1; - - private long pathHash = -1; - - // bad means this Replica is unrecoverable, and we will delete it - private boolean bad = false; - - // A replica version should increase monotonically, - // but backend may missing some versions due to disk failure or bugs. - // FE should found these and mark the replica as missing versions. - // If backend's report version < fe version, record the backend's report version as `regressiveVersion`, - // and if time exceed 5min, fe should mark this replica as missing versions. - private long regressiveVersion = -1; - private long regressiveVersionTimestamp = 0; - - /* - * This can happen when this replica is created by a balance clone task, and - * when task finished, the version of this replica is behind the partition's visible version. - * So this replica need a further repair. - * If we do not do this, this replica will be treated as version stale, and will be removed, - * so that the balance task is failed, which is unexpected. - * - * furtherRepairSetTime and leftFurtherRepairCount are set alone with needFurtherRepair. - * This is an insurance, in case that further repair task always fail. If 20 min passed - * since we set needFurtherRepair to true, the 'needFurtherRepair' will be set to false. - */ - private long furtherRepairSetTime = -1; - private int leftFurtherRepairCount = 0; - - // During full clone, the replica's state is CLONE, it will not load the data. - // After full clone finished, even if the replica's version = partition's visible version, - // - // notice: furtherRepairWatermarkTxnTd is used to clone a replica, protected it from be removed. - // - private long furtherRepairWatermarkTxnTd = -1; - - /* Decommission a backend B, steps are as follow: - * 1. wait peer backends catchup with B; - * 2. B change state to DECOMMISSION, set preWatermarkTxnId. B can load data now. - * 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId. B can't load data now. - * 4. wait txn before postWatermarkTxnId finished, delete B. - * - * notice: preWatermarkTxnId and postWatermarkTxnId are used to delete this replica. - * - */ - private long preWatermarkTxnId = -1; - private long postWatermarkTxnId = -1; - private long segmentCount = 0L; - private long rowsetCount = 0L; - - private long userDropTime = -1; - - private long scaleInDropTime = -1; - - private long lastReportVersion = 0; - public Replica() { } @@ -199,7 +128,6 @@ public Replica(long replicaId, long backendId, long version, int schemaHash, long lastFailedVersion, long lastSuccessVersion) { this.id = replicaId; - this.backendId = backendId; this.version = version; this.schemaHash = schemaHash; @@ -209,15 +137,6 @@ public Replica(long replicaId, long backendId, long version, int schemaHash, if (this.state == null) { this.state = ReplicaState.NORMAL; } - this.lastFailedVersion = lastFailedVersion; - if (this.lastFailedVersion > 0) { - this.lastFailedTimestamp = System.currentTimeMillis(); - } - if (lastSuccessVersion < this.version) { - this.lastSuccessVersion = this.version; - } else { - this.lastSuccessVersion = lastSuccessVersion; - } } public long getVersion() { @@ -249,12 +168,17 @@ public long getBackendIdWithoutException() { } public long getBackendId() throws UserException { - return this.backendId; + return -1L; + } + + protected long getBackendIdValue() { + return -1L; } - // just for ut public void setBackendId(long backendId) { - this.backendId = backendId; + if (backendId != -1) { + throw new UnsupportedOperationException("setBackendId is not supported in Replica"); + } } public long getDataSize() { @@ -304,51 +228,56 @@ public void setRowCount(long rowCount) { } public long getSegmentCount() { - return segmentCount; + return 0; } public void setSegmentCount(long segmentCount) { - this.segmentCount = segmentCount; + if (segmentCount > 0) { + throw new UnsupportedOperationException("setSegmentCount is not supported in Replica"); + } } public long getRowsetCount() { - return rowsetCount; + return 0; } public void setRowsetCount(long rowsetCount) { - this.rowsetCount = rowsetCount; + if (rowsetCount > 0) { + throw new UnsupportedOperationException("setRowsetCount is not supported in Replica"); + } } public long getLastFailedVersion() { - return lastFailedVersion; + return -1; } public long getLastFailedTimestamp() { - return lastFailedTimestamp; + return 0; } public long getLastSuccessVersion() { - return lastSuccessVersion; + return 1; } public long getPathHash() { - return pathHash; + return -1; } public void setPathHash(long pathHash) { - this.pathHash = pathHash; + if (pathHash != -1) { + throw new UnsupportedOperationException("setPathHash is not supported in Replica"); + } } public boolean isBad() { - return bad; + return false; } public boolean setBad(boolean bad) { - if (this.bad == bad) { - return false; + if (bad) { + throw new UnsupportedOperationException("setBad is not supported in Replica"); } - this.bad = bad; - return true; + return false; } public TUniqueId getCooldownMetaId() { @@ -368,35 +297,31 @@ public void setCooldownTerm(long cooldownTerm) { } public boolean needFurtherRepair() { - return leftFurtherRepairCount > 0 - && System.currentTimeMillis() < furtherRepairSetTime - + Config.tablet_further_repair_timeout_second * 1000; + return false; } public void setNeedFurtherRepair(boolean needFurtherRepair) { if (needFurtherRepair) { - furtherRepairSetTime = System.currentTimeMillis(); - leftFurtherRepairCount = Config.tablet_further_repair_max_times; - } else { - leftFurtherRepairCount = 0; - furtherRepairSetTime = -1; + throw new UnsupportedOperationException("setNeedFurtherRepair is not supported in Replica"); } } public void incrFurtherRepairCount() { - leftFurtherRepairCount--; + throw new UnsupportedOperationException("incrFurtherRepairCount is not supported in Replica"); } public int getLeftFurtherRepairCount() { - return leftFurtherRepairCount; + return 0; } public long getFurtherRepairWatermarkTxnTd() { - return furtherRepairWatermarkTxnTd; + return -1; } public void setFurtherRepairWatermarkTxnTd(long furtherRepairWatermarkTxnTd) { - this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd; + if (furtherRepairWatermarkTxnTd != -1) { + throw new UnsupportedOperationException("setFurtherRepairWatermarkTxnTd is not supported in Replica"); + } } public void updateWithReport(TTabletInfo backendReplica) { @@ -411,7 +336,7 @@ public void updateWithReport(TTabletInfo backendReplica) { } public synchronized void updateVersion(long newVersion) { - updateReplicaVersion(newVersion, this.lastFailedVersion, this.lastSuccessVersion); + updateReplicaVersion(newVersion, getLastFailedVersion(), getLastSuccessVersion()); } public synchronized void updateVersionWithFailed( @@ -421,159 +346,32 @@ public synchronized void updateVersionWithFailed( public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion, long updateTime) { - long oldLastFailedVersion = this.lastFailedVersion; - if (version != null) { - this.version = version; - } - if (lastSuccessVersion != null) { - this.lastSuccessVersion = lastSuccessVersion; - } - if (lastFailedVersion != null) { - if (this.lastFailedVersion < lastFailedVersion) { - this.lastFailedTimestamp = updateTime; - } - this.lastFailedVersion = lastFailedVersion; - } - if (this.lastFailedVersion < this.version) { - this.lastFailedVersion = -1; - this.lastFailedTimestamp = -1; - } - if (this.lastFailedVersion > 0 - && this.lastSuccessVersion > this.lastFailedVersion) { - this.lastSuccessVersion = this.version; - } - if (this.lastSuccessVersion < this.version) { - this.lastSuccessVersion = this.version; - } - if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) { - LOG.info("change replica last failed version from '< 0' to '> 0', replica {}, old last failed version {}", - this, oldLastFailedVersion); - } else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) { - LOG.info("change replica last failed version from '> 0' to '< 0', replica {}, old last failed version {}", - this, oldLastFailedVersion); - } - } - - /* last failed version: LFV - * last success version: LSV - * version: V - * - * Case 1: - * If LFV > LSV, set LSV back to V, which indicates that version between LSV and LFV is invalid. - * Clone task will clone the version between LSV and LFV - * - * Case 2: - * LFV changed, set LSV back to V. This is just same as Case 1. Cause LFV must large than LSV. - * - * Case 3: - * LFV remains unchanged, just update LSV, and then check if it falls into Case 1. - * - * Case 4: - * V is larger or equal to LFV, reset LFV. And if V is less than LSV, just set V to LSV. This may - * happen when a clone task finished and report version V, but the LSV is already larger than V, - * And we know that version between V and LSV is valid, so move V forward to LSV. - * - * Case 5: - * This is a bug case, I don't know why, may be some previous version introduce it. It looks like - * the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number. - * We just reset the LFV(hash) to recovery this replica. - */ - private void updateReplicaVersion(long newVersion, long lastFailedVersion, long lastSuccessVersion) { + throw new UnsupportedOperationException("adminUpdateVersionInfo is not supported in Replica"); + } + + protected void updateReplicaVersion(long newVersion, long lastFailedVersion, long lastSuccessVersion) { if (LOG.isDebugEnabled()) { - LOG.debug("before update: {}", this.toString()); + LOG.debug("before update: {}, newVersion: {}, lastFailedVersion: {}, lastSuccessVersion: {}", + this.toString(), newVersion, lastFailedVersion, lastSuccessVersion); } if (newVersion < this.version) { - // This case means that replica meta version has been updated by ReportHandler before - // For example, the publish version daemon has already sent some publish version tasks - // to one be to publish version 2, 3, 4, 5, 6, and the be finish all publish version tasks, - // the be's replica version is 6 now, but publish version daemon need to wait - // for other be to finish most of publish version tasks to update replica version in fe. - // At the moment, the replica version in fe is 4, when ReportHandler sync tablet, - // it find reported replica version in be is 6 and then set version to 6 for replica in fe. - // And then publish version daemon try to finish txn, and use visible version(5) - // to update replica. Finally, it find the newer version(5) is lower than replica version(6) in fe. if (LOG.isDebugEnabled()) { LOG.debug("replica {} on backend {}'s new version {} is lower than meta version {}," - + "not to continue to update replica", id, backendId, newVersion, this.version); + + "not to continue to update replica", getId(), getBackendIdValue(), newVersion, this.version); } return; } - long oldLastFailedVersion = this.lastFailedVersion; - this.version = newVersion; - - // just check it - if (lastSuccessVersion <= this.version) { - lastSuccessVersion = this.version; - } - - // case 1: - if (this.lastSuccessVersion <= this.lastFailedVersion) { - this.lastSuccessVersion = this.version; - } - - // TODO: this case is unknown, add log to observe - if (this.version > lastFailedVersion && lastFailedVersion > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("current version {} is larger than last failed version {}, " - + "maybe a fatal error or be report version, print a stack here ", - this.version, lastFailedVersion, new Exception()); - } - } - - if (lastFailedVersion != this.lastFailedVersion) { - // Case 2: - if (lastFailedVersion > this.lastFailedVersion || lastFailedVersion < 0) { - this.lastFailedVersion = lastFailedVersion; - this.lastFailedTimestamp = lastFailedVersion > 0 ? System.currentTimeMillis() : -1L; - } - - this.lastSuccessVersion = this.version; - } else { - // Case 3: - if (lastSuccessVersion >= this.lastSuccessVersion) { - this.lastSuccessVersion = lastSuccessVersion; - } - if (lastFailedVersion >= this.lastSuccessVersion) { - this.lastSuccessVersion = this.version; - } - } - - // Case 4: - if (this.version >= this.lastFailedVersion) { - this.lastFailedVersion = -1; - this.lastFailedTimestamp = -1; - if (this.version < this.lastSuccessVersion) { - this.version = this.lastSuccessVersion; - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("after update {}", this.toString()); - } - - if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) { - LOG.info("change replica last failed version from '< 0' to '> 0', replica {}, old last failed version {}", - this, oldLastFailedVersion); - } else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) { - LOG.info("change replica last failed version from '> 0' to '< 0', replica {}, old last failed version {}", - this, oldLastFailedVersion); - } } public synchronized void updateLastFailedVersion(long lastFailedVersion) { - updateReplicaVersion(this.version, lastFailedVersion, this.lastSuccessVersion); + updateReplicaVersion(this.version, lastFailedVersion, getLastSuccessVersion()); } - /* - * If a replica is overwritten by a restore job, we need to reset version and lastSuccessVersion to - * the restored replica version - */ public void updateVersionForRestore(long version) { - this.version = version; - this.lastSuccessVersion = version; + throw new UnsupportedOperationException("updateVersionForRestore is not supported in Replica"); } /* @@ -619,7 +417,7 @@ public boolean tooSlow() { } public boolean tooBigVersionCount() { - return visibleVersionCount >= Config.min_version_count_indicate_replica_compaction_too_slow; + return false; } public boolean isNormal() { @@ -627,38 +425,27 @@ public boolean isNormal() { } public long getTotalVersionCount() { - return totalVersionCount; + return -1; } public void setTotalVersionCount(long totalVersionCount) { - this.totalVersionCount = totalVersionCount; + if (totalVersionCount > 0) { + throw new UnsupportedOperationException("setTotalVersionCount is not supported in Replica"); + } } public long getVisibleVersionCount() { - return visibleVersionCount; + return -1; } public void setVisibleVersionCount(long visibleVersionCount) { - this.visibleVersionCount = visibleVersionCount; + if (visibleVersionCount > 0) { + throw new UnsupportedOperationException("setVisibleVersionCount is not supported in Replica"); + } } public boolean checkVersionRegressive(long newVersion) { - if (newVersion >= version) { - regressiveVersion = -1; - regressiveVersionTimestamp = -1; - return false; - } - - if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) { - return true; - } - - if (newVersion != regressiveVersion) { - regressiveVersion = newVersion; - regressiveVersionTimestamp = System.currentTimeMillis(); - } - - return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 60 * 1000L; + throw new UnsupportedOperationException("checkVersionRegressive is not supported in Replica"); } @Override @@ -666,7 +453,7 @@ public String toString() { StringBuilder strBuffer = new StringBuilder("[replicaId="); strBuffer.append(id); strBuffer.append(", BackendId="); - strBuffer.append(backendId); + strBuffer.append(getBackendIdValue()); strBuffer.append(", version="); strBuffer.append(version); strBuffer.append(", dataSize="); @@ -674,11 +461,11 @@ public String toString() { strBuffer.append(", rowCount="); strBuffer.append(rowCount); strBuffer.append(", lastFailedVersion="); - strBuffer.append(lastFailedVersion); + strBuffer.append(getLastFailedVersion()); strBuffer.append(", lastSuccessVersion="); - strBuffer.append(lastSuccessVersion); + strBuffer.append(getLastSuccessVersion()); strBuffer.append(", lastFailedTimestamp="); - strBuffer.append(lastFailedTimestamp); + strBuffer.append(getLastFailedTimestamp()); strBuffer.append(", schemaHash="); strBuffer.append(schemaHash); strBuffer.append(", state="); @@ -693,9 +480,9 @@ public String toStringSimple(boolean checkBeAlive) { StringBuilder strBuffer = new StringBuilder("[replicaId="); strBuffer.append(id); strBuffer.append(", backendId="); - strBuffer.append(backendId); + strBuffer.append(getBackendIdValue()); if (checkBeAlive) { - Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); + Backend backend = Env.getCurrentSystemInfo().getBackend(getBackendIdValue()); if (backend == null) { strBuffer.append(", backend=null"); } else { @@ -708,23 +495,23 @@ public String toStringSimple(boolean checkBeAlive) { } strBuffer.append(", version="); strBuffer.append(version); - if (lastFailedVersion > 0) { + if (getLastFailedVersion() > 0) { strBuffer.append(", lastFailedVersion="); - strBuffer.append(lastFailedVersion); + strBuffer.append(getLastFailedVersion()); strBuffer.append(", lastSuccessVersion="); - strBuffer.append(lastSuccessVersion); + strBuffer.append(getLastSuccessVersion()); strBuffer.append(", lastFailedTimestamp="); - strBuffer.append(lastFailedTimestamp); + strBuffer.append(getLastFailedTimestamp()); } if (isBad()) { strBuffer.append(", isBad=true"); - Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); - if (backend != null && pathHash != -1) { + Backend backend = Env.getCurrentSystemInfo().getBackend(getBackendIdValue()); + if (backend != null && getPathHash() != -1) { DiskInfo diskInfo = backend.getDisks().values().stream() - .filter(disk -> disk.getPathHash() == pathHash) + .filter(disk -> disk.getPathHash() == getPathHash()) .findFirst().orElse(null); if (diskInfo == null) { - strBuffer.append(", disk with path hash " + pathHash + " not exists"); + strBuffer.append(", disk with path hash " + getPathHash() + " not exists"); } else if (diskInfo.getState() == DiskInfo.DiskState.OFFLINE) { strBuffer.append(", disk " + diskInfo.getRootPath() + " is bad"); } @@ -748,29 +535,10 @@ public boolean equals(Object obj) { Replica replica = (Replica) obj; return (id == replica.id) - && (backendId == replica.backendId) && (version == replica.version) && (dataSize == replica.dataSize) && (rowCount == replica.rowCount) - && (state.equals(replica.state)) - && (lastFailedVersion == replica.lastFailedVersion) - && (lastSuccessVersion == replica.lastSuccessVersion); - } - - private static class VersionComparator implements Comparator { - public VersionComparator() { - } - - @Override - public int compare(T replica1, T replica2) { - if (replica1.getVersion() < replica2.getVersion()) { - return 1; - } else if (replica1.getVersion() == replica2.getVersion()) { - return 0; - } else { - return -1; - } - } + && (state.equals(replica.state)); } private static class LastSuccessVersionComparator implements Comparator { @@ -806,52 +574,45 @@ public int compare(T replica1, T replica2) { } public void setPreWatermarkTxnId(long preWatermarkTxnId) { - this.preWatermarkTxnId = preWatermarkTxnId; + if (preWatermarkTxnId != -1) { + throw new UnsupportedOperationException("setPreWatermarkTxnId is not supported in Replica"); + } } public long getPreWatermarkTxnId() { - return preWatermarkTxnId; + return -1; } public void setPostWatermarkTxnId(long postWatermarkTxnId) { - this.postWatermarkTxnId = postWatermarkTxnId; + if (postWatermarkTxnId != -1) { + throw new UnsupportedOperationException("setPostWatermarkTxnId is not supported in Replica"); + } } public long getPostWatermarkTxnId() { - return postWatermarkTxnId; + return -1; } public void setUserDropTime(long userDropTime) { - this.userDropTime = userDropTime; - } - - public boolean isUserDrop() { if (userDropTime > 0) { - if (System.currentTimeMillis() - userDropTime < Config.manual_drop_replica_valid_second * 1000L) { - return true; - } - userDropTime = -1; + throw new UnsupportedOperationException("setUserDropTime is not supported in Replica"); } + } + public boolean isUserDrop() { return false; } public void setScaleInDropTimeStamp(long scaleInDropTime) { - this.scaleInDropTime = scaleInDropTime; + if (scaleInDropTime > 0) { + throw new UnsupportedOperationException("setScaleInDropTimeStamp is not supported in Replica"); + } } public boolean isScaleInDrop() { - if (this.scaleInDropTime > 0) { - if (System.currentTimeMillis() - this.scaleInDropTime - < Config.manual_drop_replica_valid_second * 1000L) { - return true; - } - this.scaleInDropTime = -1; - } return false; } - public boolean isAlive() { return getState() != ReplicaState.CLONE && getState() != ReplicaState.DECOMMISSION @@ -859,15 +620,17 @@ && getState() != ReplicaState.DECOMMISSION } public boolean isScheduleAvailable() { - return Env.getCurrentSystemInfo().checkBackendScheduleAvailable(backendId) - && !isUserDrop(); + return Env.getCurrentSystemInfo().checkBackendScheduleAvailable(getBackendIdValue()) + && !isUserDrop(); } public void setLastReportVersion(long version) { - this.lastReportVersion = version; + if (version > 0) { + throw new UnsupportedOperationException("setLastReportVersion is not supported in Replica"); + } } public long getLastReportVersion() { - return lastReportVersion; + return 0; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index e9e61f87997bc7..7386cae8a27097 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -108,24 +108,6 @@ public TabletHealth() { protected long id; @SerializedName(value = "rs", alternate = {"replicas"}) protected List replicas; - @SerializedName(value = "cv", alternate = {"checkedVersion"}) - private long checkedVersion; - @SerializedName(value = "ic", alternate = {"isConsistent"}) - private boolean isConsistent; - - // last time that the tablet checker checks this tablet. - // no need to persist - private long lastStatusCheckTime = -1; - - // last time for load data fail - private long lastLoadFailedTime = -1; - - // if tablet want to add a new replica, but cann't found any backend to locate the new replica. - // then mark this tablet. For later repair, even try and try to repair this tablet, sched will always fail. - // For example, 1 tablet contains 3 replicas, if 1 backend is dead, then tablet's healthy status - // is REPLICA_MISSING. But since no other backend can held the new replica, then sched always fail. - // So don't increase this tablet's sched priority if it has no path for new replica. - private long lastTimeNoPathForNewReplica = -1; public Tablet() { this(0L, new ArrayList<>()); @@ -141,10 +123,6 @@ private Tablet(long tabletId, List replicas) { if (this.replicas == null) { this.replicas = new ArrayList<>(); } - - checkedVersion = -1L; - - isConsistent = true; } public long getId() { @@ -152,23 +130,27 @@ public long getId() { } public long getCheckedVersion() { - return this.checkedVersion; + return -1; } public void setCheckedVersion(long checkedVersion) { - this.checkedVersion = checkedVersion; + if (checkedVersion != -1) { + throw new UnsupportedOperationException("setCheckedVersion is not supported in Tablet"); + } } public void setIsConsistent(boolean good) { - this.isConsistent = good; + if (!good) { + throw new UnsupportedOperationException("setIsConsistent is not supported in Tablet"); + } } public boolean isConsistent() { - return isConsistent; + return true; } public void setCooldownConf(long cooldownReplicaId, long cooldownTerm) { - throw new UnsupportedOperationException("not support setCooldownConf in Tablet"); + throw new UnsupportedOperationException("setCooldownConf is not supported in Tablet"); } public long getCooldownReplicaId() { @@ -685,8 +667,8 @@ public TabletHealth getHealth(SystemInfoService systemInfoService, private void initTabletHealth(TabletHealth tabletHealth) { long endTime = System.currentTimeMillis() - Config.tablet_recent_load_failed_second * 1000L; - tabletHealth.hasRecentLoadFailed = lastLoadFailedTime > endTime; - tabletHealth.noPathForNewReplica = lastTimeNoPathForNewReplica > endTime; + tabletHealth.hasRecentLoadFailed = getLastLoadFailedTime() > endTime; + tabletHealth.noPathForNewReplica = getLastTimeNoPathForNewReplica() > endTime; } private boolean isReplicaAndBackendAlive(Replica replica, Backend backend, Set hosts) { @@ -855,21 +837,21 @@ public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.P long currentTime = System.currentTimeMillis(); // first check, wait for next round - if (lastStatusCheckTime == -1) { - lastStatusCheckTime = currentTime; + if (getLastStatusCheckTime() == -1) { + setLastStatusCheckTime(currentTime); return false; } boolean ready = false; switch (priority) { case HIGH: - ready = currentTime - lastStatusCheckTime > Config.tablet_repair_delay_factor_second * 1000 * 1; + ready = currentTime - getLastStatusCheckTime() > Config.tablet_repair_delay_factor_second * 1000 * 1; break; case NORMAL: - ready = currentTime - lastStatusCheckTime > Config.tablet_repair_delay_factor_second * 1000 * 2; + ready = currentTime - getLastStatusCheckTime() > Config.tablet_repair_delay_factor_second * 1000 * 2; break; case LOW: - ready = currentTime - lastStatusCheckTime > Config.tablet_repair_delay_factor_second * 1000 * 3; + ready = currentTime - getLastStatusCheckTime() > Config.tablet_repair_delay_factor_second * 1000 * 3; break; default: break; @@ -878,19 +860,33 @@ public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.P return ready; } + protected long getLastStatusCheckTime() { + return -1; + } + public void setLastStatusCheckTime(long lastStatusCheckTime) { - this.lastStatusCheckTime = lastStatusCheckTime; + if (lastStatusCheckTime != -1) { + throw new UnsupportedOperationException("setLastStatusCheckTime is not supported in Tablet"); + } } public long getLastLoadFailedTime() { - return lastLoadFailedTime; + return -1; } public void setLastLoadFailedTime(long lastLoadFailedTime) { - this.lastLoadFailedTime = lastLoadFailedTime; + if (lastLoadFailedTime != -1) { + throw new UnsupportedOperationException("setLastLoadFailedTime is not supported in Tablet"); + } + } + + protected long getLastTimeNoPathForNewReplica() { + return -1; } public void setLastTimeNoPathForNewReplica(long lastTimeNoPathForNewReplica) { - this.lastTimeNoPathForNewReplica = lastTimeNoPathForNewReplica; + if (lastTimeNoPathForNewReplica != -1) { + throw new UnsupportedOperationException("setLastTimeNoPathForNewReplica is not supported in Tablet"); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java index 2834551b2dac0f..c703d4af62a15c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java @@ -67,6 +67,9 @@ public class CloudReplica extends Replica { @SerializedName(value = "idx") private long idx = -1; + private long segmentCount = 0L; + private long rowsetCount = 0L; + private static final Random rand = new Random(); private Map> memClusterToBackends = new ConcurrentHashMap>(); @@ -626,4 +629,24 @@ public List getAllPrimaryBes() { }); return result; } + + @Override + public long getSegmentCount() { + return segmentCount; + } + + @Override + public void setSegmentCount(long segmentCount) { + this.segmentCount = segmentCount; + } + + @Override + public long getRowsetCount() { + return rowsetCount; + } + + @Override + public void setRowsetCount(long rowsetCount) { + this.rowsetCount = rowsetCount; + } }