Skip to content

Commit fff0f6a

Browse files
authored
Add timestamp for nrt points (#915)
1 parent fd07611 commit fff0f6a

File tree

13 files changed

+1799
-734
lines changed

13 files changed

+1799
-734
lines changed

clientlib/src/main/proto/yelp/nrtsearch/luceneserver.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,8 @@ message CopyState {
11971197
repeated string completedMergeFiles = 7;
11981198
// Primary generation
11991199
int64 primaryGen = 8;
1200+
// Timestamp for this index version (epoch seconds), may be 0 for no timestamp
1201+
int64 timestampSec = 9;
12001202
}
12011203

12021204
// Metadata for multiple index files

grpc-gateway/luceneserver.pb.go

Lines changed: 722 additions & 711 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

grpc-gateway/luceneserver.swagger.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2821,6 +2821,11 @@
28212821
"type": "string",
28222822
"format": "int64",
28232823
"title": "Primary generation"
2824+
},
2825+
"timestampSec": {
2826+
"type": "string",
2827+
"format": "int64",
2828+
"title": "Timestamp for this index version (epoch seconds), may be 0 for no timestamp"
28242829
}
28252830
},
28262831
"title": "Holds incRef'd file level details for one point-in-time segment infos on the primary node"

src/main/java/com/yelp/nrtsearch/server/handler/RecvCopyStateHandler.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.yelp.nrtsearch.server.nrt.NRTPrimaryNode;
2727
import com.yelp.nrtsearch.server.state.GlobalState;
2828
import java.io.IOException;
29+
import java.time.Instant;
2930
import java.util.Map;
3031
import org.apache.lucene.replicator.nrt.FileMetaData;
3132
import org.slf4j.Logger;
@@ -65,28 +66,29 @@ private CopyState handle(IndexState indexState, CopyStateRequest copyStateReques
6566
throw new RuntimeException("RecvCopyStateHandler invoked with Invalid Magic Number");
6667
}
6768
NRTPrimaryNode primaryNode = shardState.nrtPrimaryNode;
68-
org.apache.lucene.replicator.nrt.CopyState copyState = null;
69+
NRTPrimaryNode.CopyStateAndTimestamp copyStateAndTimestamp = null;
6970
try {
7071
// Caller does not have CopyState; we pull the latest NRT point:
71-
copyState = primaryNode.getCopyState();
72-
return RecvCopyStateHandler.writeCopyState(copyState);
72+
copyStateAndTimestamp = primaryNode.getCopyStateAndTimestamp();
73+
return RecvCopyStateHandler.writeCopyState(copyStateAndTimestamp);
7374
} catch (IOException e) {
7475
primaryNode.message("top: exception during fetch: " + e.getMessage());
7576
throw new RuntimeException(e);
7677
} finally {
77-
if (copyState != null) {
78+
if (copyStateAndTimestamp != null) {
7879
primaryNode.message("top: fetch: now release CopyState");
7980
try {
80-
primaryNode.releaseCopyState(copyState);
81+
primaryNode.releaseCopyState(copyStateAndTimestamp.copyState());
8182
} catch (IOException e) {
8283
throw new RuntimeException(e);
8384
}
8485
}
8586
}
8687
}
8788

88-
private static CopyState writeCopyState(org.apache.lucene.replicator.nrt.CopyState state)
89-
throws IOException {
89+
private static CopyState writeCopyState(
90+
NRTPrimaryNode.CopyStateAndTimestamp copyStateAndTimestamp) throws IOException {
91+
org.apache.lucene.replicator.nrt.CopyState state = copyStateAndTimestamp.copyState();
9092
CopyState.Builder builder = CopyState.newBuilder();
9193
builder.setInfoBytesLength(state.infosBytes().length);
9294
builder.setInfoBytes(ByteString.copyFrom(state.infosBytes(), 0, state.infosBytes().length));
@@ -105,6 +107,11 @@ private static CopyState writeCopyState(org.apache.lucene.replicator.nrt.CopySta
105107

106108
builder.setPrimaryGen(state.primaryGen());
107109

110+
// If this index version has a timestamp, add it to the response
111+
Instant timestamp = copyStateAndTimestamp.timestamp();
112+
long timestampSeconds = timestamp != null ? timestamp.getEpochSecond() : 0;
113+
builder.setTimestampSec(timestampSeconds);
114+
108115
return builder.build();
109116
}
110117

src/main/java/com/yelp/nrtsearch/server/nrt/NRTPrimaryNode.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.yelp.nrtsearch.server.nrt;
1717

18+
import com.google.common.annotations.VisibleForTesting;
1819
import com.yelp.nrtsearch.server.grpc.FilesMetadata;
1920
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
2021
import com.yelp.nrtsearch.server.grpc.TransferStatus;
@@ -27,6 +28,7 @@
2728
import io.grpc.StatusRuntimeException;
2829
import java.io.IOException;
2930
import java.io.PrintStream;
31+
import java.time.Instant;
3032
import java.util.ArrayList;
3133
import java.util.Collection;
3234
import java.util.Collections;
@@ -64,6 +66,18 @@ public class NRTPrimaryNode extends PrimaryNode {
6466
final List<MergePreCopy> warmingSegments = Collections.synchronizedList(new ArrayList<>());
6567
final Queue<ReplicaDetails> replicasInfos = new ConcurrentLinkedQueue<>();
6668

69+
private long preRefreshVersion = -1;
70+
private Instant refreshTimestamp;
71+
private Instant indexVersionTimestamp;
72+
73+
/**
74+
* Class to hold a copy state and its timestamp
75+
*
76+
* @param copyState lucene copy state
77+
* @param timestamp timestamp
78+
*/
79+
public record CopyStateAndTimestamp(CopyState copyState, Instant timestamp) {}
80+
6781
public NRTPrimaryNode(
6882
IndexStateManager indexStateManager,
6983
HostPort hostPort,
@@ -80,6 +94,8 @@ public NRTPrimaryNode(
8094
this.indexName = indexStateManager.getCurrent().getName();
8195
this.indexStateManager = indexStateManager;
8296
this.nrtDataManager = nrtDataManager;
97+
// initialize timestamp to that of index data loaded from the backend
98+
setIndexVersionTimestamp(nrtDataManager.getLastPointTimestamp());
8399
}
84100

85101
/**
@@ -584,4 +600,80 @@ public void close() throws IOException {
584600
public FileMetaData readLocalFileMetaData(String fileName) throws IOException {
585601
return NrtUtils.readOnceLocalFileMetaData(fileName, lastFileMetaData, this);
586602
}
603+
604+
@VisibleForTesting
605+
synchronized void setPreRefreshVersion(long preRefreshVersion) {
606+
this.preRefreshVersion = preRefreshVersion;
607+
}
608+
609+
@VisibleForTesting
610+
synchronized long getPreRefreshVersion() {
611+
return preRefreshVersion;
612+
}
613+
614+
@VisibleForTesting
615+
synchronized void setRefreshTimestamp(Instant refreshTimestamp) {
616+
this.refreshTimestamp = refreshTimestamp;
617+
}
618+
619+
@VisibleForTesting
620+
synchronized Instant getRefreshTimestamp() {
621+
return refreshTimestamp;
622+
}
623+
624+
@VisibleForTesting
625+
synchronized void setIndexVersionTimestamp(Instant indexVersionTimestamp) {
626+
this.indexVersionTimestamp = indexVersionTimestamp;
627+
}
628+
629+
@VisibleForTesting
630+
synchronized Instant getIndexVersionTimestamp() {
631+
return indexVersionTimestamp;
632+
}
633+
634+
/**
635+
* Function that wraps the call to the super method and updates the index version timestamp.
636+
*
637+
* @return true if the index version changed
638+
* @throws IOException
639+
*/
640+
@Override
641+
public boolean flushAndRefresh() throws IOException {
642+
synchronized (this) {
643+
// get the loaded index version before the refresh
644+
setPreRefreshVersion(getCopyStateVersion());
645+
// create a new timestamp to use for any index version created by this refresh
646+
setRefreshTimestamp(Instant.now());
647+
}
648+
649+
boolean refreshed = super.flushAndRefresh();
650+
651+
synchronized (this) {
652+
// if the index version was updated, also update the main timestamp
653+
if (refreshed) {
654+
setIndexVersionTimestamp(getRefreshTimestamp());
655+
}
656+
// reset version to indicate no refresh in progress
657+
setPreRefreshVersion(-1);
658+
}
659+
return refreshed;
660+
}
661+
662+
/**
663+
* Get the copy state representing the currently loaded index version, and its timestamp.
664+
*
665+
* @return copy state and timestamp
666+
* @throws IOException
667+
*/
668+
public synchronized CopyStateAndTimestamp getCopyStateAndTimestamp() throws IOException {
669+
CopyState copyState = getCopyState();
670+
Instant timestamp;
671+
if (getPreRefreshVersion() != -1 && copyState.version() > getPreRefreshVersion()) {
672+
// the index version has changed, but the timestamp is not updated yet, use the refresh value
673+
timestamp = getRefreshTimestamp();
674+
} else {
675+
timestamp = getIndexVersionTimestamp();
676+
}
677+
return new CopyStateAndTimestamp(copyState, timestamp);
678+
}
587679
}

src/main/java/com/yelp/nrtsearch/server/nrt/NRTReplicaNode.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import com.yelp.nrtsearch.server.nrt.jobs.CopyJobManager;
2525
import com.yelp.nrtsearch.server.nrt.jobs.GrpcCopyJobManager;
2626
import com.yelp.nrtsearch.server.nrt.jobs.RemoteCopyJobManager;
27+
import com.yelp.nrtsearch.server.nrt.jobs.SimpleCopyJob;
2728
import com.yelp.nrtsearch.server.utils.HostPort;
2829
import java.io.IOException;
2930
import java.io.PrintStream;
31+
import java.time.Instant;
3032
import java.util.Map;
3133
import java.util.concurrent.atomic.AtomicBoolean;
3234
import org.apache.lucene.index.SegmentInfos;
@@ -221,6 +223,14 @@ protected void finishNRTCopy(CopyJob job, long startNS) throws IOException {
221223
.observe((System.nanoTime() - startNS) / 1000000.0);
222224
NrtMetrics.nrtPointSize.labelValues(indexName).observe(job.getTotalBytesCopied());
223225
NrtMetrics.searcherVersion.labelValues(indexName).set(job.getCopyState().version());
226+
227+
// if the job is a simple copy job, read out the index data timestamp and update the metric
228+
if (job instanceof SimpleCopyJob simpleCopyJob) {
229+
Instant timestamp = simpleCopyJob.getTimestamp();
230+
if (timestamp != null) {
231+
NrtMetrics.indexTimestampSec.labelValues(indexName).set(timestamp.getEpochSecond());
232+
}
233+
}
224234
}
225235
}
226236

src/main/java/com/yelp/nrtsearch/server/nrt/NrtDataManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ NrtPointState getLastPointState() {
129129
return lastPointState;
130130
}
131131

132-
@VisibleForTesting
133-
Instant getLastPointTimestamp() {
132+
/** Get the timestamp associated with the last loaded index version */
133+
public Instant getLastPointTimestamp() {
134134
return lastPointTimestamp;
135135
}
136136

src/main/java/com/yelp/nrtsearch/server/nrt/jobs/GrpcCopyJobManager.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import static com.yelp.nrtsearch.server.nrt.NrtUtils.readFilesMetaData;
1919

2020
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
21+
import com.yelp.nrtsearch.server.nrt.NRTPrimaryNode;
2122
import com.yelp.nrtsearch.server.nrt.NRTReplicaNode;
2223
import java.io.IOException;
2324
import java.nio.ByteBuffer;
25+
import java.time.Instant;
2426
import java.util.HashSet;
2527
import java.util.Map;
2628
import java.util.Set;
@@ -72,26 +74,26 @@ public CopyJob newCopyJob(
7274
"Cannot create new copy job, primary connection not available");
7375
}
7476

75-
CopyState copyState;
77+
NRTPrimaryNode.CopyStateAndTimestamp copyStateAndTimestamp;
7678

7779
// sendMeFiles(?) (we dont need this, just send Index,replica, and request for copy State)
7880
if (files == null) {
7981
// No incoming CopyState: ask primary for latest one now
8082
try {
8183
// Exceptions in here mean something went wrong talking over the socket, which are fine
8284
// (e.g. primary node crashed):
83-
copyState = getCopyStateFromPrimary();
85+
copyStateAndTimestamp = getCopyStateFromPrimary();
8486
} catch (Throwable t) {
8587
throw new NodeCommunicationException("exc while reading files to copy", t);
8688
}
87-
files = copyState.files();
89+
files = copyStateAndTimestamp.copyState().files();
8890
} else {
89-
copyState = null;
91+
copyStateAndTimestamp = null;
9092
}
9193
return new SimpleCopyJob(
9294
reason,
9395
primaryAddress,
94-
copyState,
96+
copyStateAndTimestamp,
9597
replicaNode,
9698
files,
9799
highPriority,
@@ -104,15 +106,15 @@ public CopyJob newCopyJob(
104106
@Override
105107
public void finishNRTCopy(CopyJob copyJob) throws IOException {}
106108

107-
private CopyState getCopyStateFromPrimary() throws IOException {
109+
private NRTPrimaryNode.CopyStateAndTimestamp getCopyStateFromPrimary() throws IOException {
108110
com.yelp.nrtsearch.server.grpc.CopyState copyState =
109111
primaryAddress.recvCopyState(indexName, indexId, replicaId);
110112
return readCopyState(copyState);
111113
}
112114

113115
/** Pulls CopyState off the wire */
114-
private static CopyState readCopyState(com.yelp.nrtsearch.server.grpc.CopyState copyState)
115-
throws IOException {
116+
private static NRTPrimaryNode.CopyStateAndTimestamp readCopyState(
117+
com.yelp.nrtsearch.server.grpc.CopyState copyState) throws IOException {
116118

117119
// Decode a new CopyState
118120
byte[] infosBytes = new byte[copyState.getInfoBytesLength()];
@@ -125,7 +127,11 @@ private static CopyState readCopyState(com.yelp.nrtsearch.server.grpc.CopyState
125127
Set<String> completedMergeFiles = new HashSet<>(copyState.getCompletedMergeFilesList());
126128
long primaryGen = copyState.getPrimaryGen();
127129

128-
return new CopyState(files, version, gen, infosBytes, completedMergeFiles, primaryGen, null);
130+
CopyState luceneCopyState =
131+
new CopyState(files, version, gen, infosBytes, completedMergeFiles, primaryGen, null);
132+
Instant timestamp =
133+
copyState.getTimestampSec() > 0 ? Instant.ofEpochMilli(copyState.getTimestampSec()) : null;
134+
return new NRTPrimaryNode.CopyStateAndTimestamp(luceneCopyState, timestamp);
129135
}
130136

131137
@Override

src/main/java/com/yelp/nrtsearch/server/nrt/jobs/SimpleCopyJob.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import com.yelp.nrtsearch.server.grpc.RawFileChunk;
2020
import com.yelp.nrtsearch.server.grpc.ReplicationServerClient;
2121
import com.yelp.nrtsearch.server.monitoring.NrtMetrics;
22+
import com.yelp.nrtsearch.server.nrt.NRTPrimaryNode;
2223
import io.grpc.stub.StreamObserver;
2324
import java.io.IOException;
25+
import java.time.Instant;
2426
import java.util.HashSet;
2527
import java.util.Iterator;
2628
import java.util.Locale;
@@ -42,7 +44,7 @@
4244
public class SimpleCopyJob extends VisitableCopyJob {
4345
private static final Logger logger = LoggerFactory.getLogger(SimpleCopyJob.class);
4446

45-
private final CopyState copyState;
47+
private final NRTPrimaryNode.CopyStateAndTimestamp copyStateAndTimestamp;
4648
private final ReplicationServerClient primaryAddres;
4749
private final String indexName;
4850
private final String indexId;
@@ -52,7 +54,7 @@ public class SimpleCopyJob extends VisitableCopyJob {
5254
public SimpleCopyJob(
5355
String reason,
5456
ReplicationServerClient primaryAddress,
55-
CopyState copyState,
57+
NRTPrimaryNode.CopyStateAndTimestamp copyStateAndTimestamp,
5658
ReplicaNode dest,
5759
Map<String, FileMetaData> files,
5860
boolean highPriority,
@@ -62,7 +64,7 @@ public SimpleCopyJob(
6264
boolean ackedCopy)
6365
throws IOException {
6466
super(reason, files, dest, highPriority, onceDone);
65-
this.copyState = copyState;
67+
this.copyStateAndTimestamp = copyStateAndTimestamp;
6668
this.primaryAddres = primaryAddress;
6769
this.indexName = indexName;
6870
this.indexId = indexId;
@@ -189,7 +191,12 @@ public Set<String> getFileNames() {
189191

190192
@Override
191193
public CopyState getCopyState() {
192-
return copyState;
194+
return copyStateAndTimestamp != null ? copyStateAndTimestamp.copyState() : null;
195+
}
196+
197+
/** Get the index version timestamp, or null if no timestamp is available. */
198+
public Instant getTimestamp() {
199+
return copyStateAndTimestamp != null ? copyStateAndTimestamp.timestamp() : null;
193200
}
194201

195202
@Override

0 commit comments

Comments
 (0)