Skip to content

Commit efb7060

Browse files
pm-oscporunov
authored andcommitted
#4871 support recurring transaction recovery and extend recovery statistics with failure count
Signed-off-by: pm-osc <[email protected]>
1 parent 21c6cf9 commit efb7060

File tree

8 files changed

+525
-10
lines changed

8 files changed

+525
-10
lines changed

docs/operations/recovery.md

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,58 @@ identify partially failed transaction and repair any inconsistencies
4545
caused. It is suggested to run the transaction repair process on a
4646
separate machine connected to the cluster to isolate failures. Configure
4747
a separately controlled process to run the following where the start
48-
time specifies the time since epoch where the recovery process should
49-
start reading from the write-ahead log.
48+
time (Java Instant that specifies the time since epoch) where the recovery
49+
process should start reading from the write-ahead log.
5050
```groovy
51-
recovery = JanusGraphFactory.startTransactionRecovery(graph, startTime, TimeUnit.MILLISECONDS);
51+
recovery = JanusGraphFactory.startTransactionRecovery(graph, startTime);
5252
```
5353

54+
Once the recovery process is started, the process never ends and stops
55+
only if:
56+
57+
1. it is manually stopped by calling `recovery.shutdown()`
58+
2. the process encounters errors and fails due to exception
59+
3. the graph gets closed
60+
61+
While the recovery process runs, `recovery.getStatistics()` call provides
62+
information about the progress of recovery process by returning three numbers:
63+
64+
1. the first number shows how many secondary persistence transaction succeeded
65+
2. the second number shows how many secondary persistence transaction failed
66+
and attempted to be recovered
67+
3. the third number shows how many failed secondary persistence transaction
68+
could not be recovered
69+
70+
Depending on the used `startTime` value and configured `log.tx.read-interval`
71+
configuration option, the recovery process might need to run for hours in
72+
order to process all relevant entries from the write-ahead log.
73+
`startTime` defines the point in time from which the write-ahead
74+
log should be read. The log is read in every `log.tx.read-interval`
75+
millisecond and an approximately 100 seconds long chunk is processed in
76+
one iteration.
77+
78+
For example, if `startTime` is configured to look for log entries from
79+
the last twenty hours (72 000 seconds) and `log.tx.read-interval` is set to
80+
5 000 ms (5 seconds), it might take approximately at least one hour
81+
(72 000 / 100 * 5 = 3 600 seconds = 1 hour) while all log entries are processed.
82+
Note: in case there are many failed secondary persistence transactions, the recovery
83+
process might take much longer as fixing those transactions takes time.
84+
85+
When a write-ahead log entry is found that should be repaired,
86+
the following INFO level log message appears in the JanusGraph's
87+
logging system where the transaction ID appears between the
88+
squared brackets:
89+
90+
```
91+
Attempting to repair partially failed transaction [...]
92+
```
93+
94+
Even if the recovery process is stopped by calling `recovery.shutdown()`,
95+
when it started again for the same graph `Provided read marker is not compatible
96+
with existing read marker for previously registered readers` error is shown.
97+
To avoid the error, the graph needs to be closed by `graph.close()` before the
98+
process is started again.
99+
54100
Enabling the transaction write-ahead log causes an additional write
55101
operation for mutating transactions which increases the latency. Also
56102
note, that additional space is required to store the log. The
@@ -59,6 +105,27 @@ which means that log entries expire after that time to keep the storage
59105
overhead small. Refer to [Configuration Reference](../configs/configuration-reference.md) for a complete list of all
60106
log related configuration options to fine tune logging behavior.
61107

108+
### Recurring Transaction Recovery
109+
110+
In case of daily data ingestion, transaction recovery needs to run recurring to ensure that
111+
both primary and secondary persistence (e.g. indexing data by the mixed index backend)
112+
of the data succeeds each day.
113+
114+
Since `JanusGraphFactory.startTransactionRecovery()` is not meant to be executed on
115+
recurring way, JanusGraph provides a dedicated way to run transaction recovery multiple
116+
times on the same graph:
117+
118+
```groovy
119+
recovery = JanusGraphFactory.startRecurringTransactionRecovery(graph, startTime);
120+
```
121+
122+
Similarly to the normal transaction recovery process, the recurring transaction recovery
123+
process has the same `graph` and `startTime` parameters and provides the same `getStatistics()`
124+
and `shutdown()` methods.
125+
126+
Once the process is stopped, `startRecurringTransactionRecovery()` can be used
127+
to start the process again from the same or from another start time.
128+
62129
## JanusGraph Instance Failure
63130

64131
JanusGraph is robust against individual instance failure in that other

janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java

Lines changed: 320 additions & 0 deletions
Large diffs are not rendered by default.

janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/TestMockLog.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ public synchronized boolean unregisterReader(MessageReader reader) {
131131
return readers.remove(reader);
132132
}
133133

134+
@Override
135+
public synchronized boolean unregisterReaderAndStopReadingProcess(MessageReader reader) {
136+
return readers.remove(reader);
137+
}
138+
134139
@Override
135140
public String getName() {
136141
return name;

janusgraph-core/src/main/java/org/janusgraph/core/JanusGraphFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,18 @@ public static TransactionRecovery startTransactionRecovery(JanusGraph graph, Ins
303303
return new StandardTransactionLogProcessor((StandardJanusGraph)graph, start);
304304
}
305305

306+
/**
307+
* Returns a {@link TransactionRecovery} process for recovering partially failed transactions. The recovery process
308+
* will start processing the write-ahead transaction log at the specified transaction time.
309+
* Once stopped, this method can be used to start the recovery process again from a specified transaction time.
310+
* @param graph
311+
* @param start
312+
* @return
313+
*/
314+
public static TransactionRecovery startRecurringTransactionRecovery(JanusGraph graph, Instant start) {
315+
return new StandardTransactionLogProcessor((StandardJanusGraph) graph, start, true);
316+
}
317+
306318
//###################################
307319
// HELPER METHODS
308320
//###################################

janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/Log.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,17 @@ public interface Log {
8282
*/
8383
boolean unregisterReader(MessageReader reader);
8484

85+
/**
86+
* Removes the given reader from the list of registered readers, terminates the reader thread pool and returns
87+
* whether this reader was registered in the first place.
88+
* Note, that removing the reader by this method also stops the reading process.
89+
*
90+
* @param reader
91+
* @return true if this MessageReader was registered before and was successfully
92+
* unregistered, else false
93+
*/
94+
boolean unregisterReaderAndStopReadingProcess(MessageReader reader);
95+
8596
/**
8697
* Returns the name of this log
8798
* @return

janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/ReadMarker.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,17 @@ public class ReadMarker {
2626

2727
private final String identifier;
2828
private Instant startTime;
29+
private boolean isRecurring;
2930

31+
private ReadMarker(String identifier, Instant startTime) {
32+
this(identifier, startTime, false);
33+
}
3034

3135

32-
private ReadMarker(String identifier, Instant startTime) {
36+
private ReadMarker(String identifier, Instant startTime, boolean isRecurring) {
3337
this.identifier = identifier;
3438
this.startTime = startTime;
39+
this.isRecurring = isRecurring;
3540
}
3641

3742
/**
@@ -55,6 +60,10 @@ public boolean hasStartTime() {
5560
return startTime!=null;
5661
}
5762

63+
public boolean isRecurring() {
64+
return isRecurring;
65+
}
66+
5867
/**
5968
* Returns the start time of this marker if such has been defined or the current time if not
6069
* @return
@@ -72,6 +81,12 @@ public synchronized Instant getStartTime(TimestampProvider times) {
7281
* @return
7382
*/
7483
public boolean isCompatible(ReadMarker newMarker) {
84+
if (isRecurring() && newMarker.isRecurring()) {
85+
// recurring read marker is not expected to have identifier
86+
// See this comment for more information:
87+
// https://github.com/JanusGraph/janusgraph/pull/4872#discussion_r2490578432
88+
return !hasIdentifier() && !newMarker.hasIdentifier();
89+
}
7590
if (newMarker.hasIdentifier()) {
7691
return hasIdentifier() && identifier.equals(newMarker.identifier);
7792
}
@@ -96,6 +111,17 @@ public static ReadMarker fromTime(Instant timestamp) {
96111
return new ReadMarker(null, timestamp);
97112
}
98113

114+
/**
115+
* Starts reading the log from the given timestamp onward. The specified timestamp is included.
116+
* The read marker is set to allow recurring mode which means that reading the log is supported
117+
* several times from the same or from different timestamp.
118+
* @param timestamp
119+
* @return
120+
*/
121+
public static ReadMarker fromTimeRecurring(Instant timestamp) {
122+
return new ReadMarker(null, timestamp, true);
123+
}
124+
99125
/**
100126
* Starts reading the log from the last recorded point in the log for the given id.
101127
* If the log has a record of such an id, it will use it as the starting point.

janusgraph-core/src/main/java/org/janusgraph/diskstorage/log/kcvs/KCVSLog.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,12 @@ public synchronized void registerReaders(ReadMarker readMarker, Iterable<Message
669669
Preconditions.checkArgument(readMarker!=null,"Read marker cannot be null");
670670
Preconditions.checkArgument(this.readMarker==null || this.readMarker.isCompatible(readMarker),
671671
"Provided read marker is not compatible with existing read marker for previously registered readers");
672-
if (this.readMarker==null) this.readMarker=readMarker;
672+
if (this.readMarker==null || this.readMarker.isRecurring())
673+
// for recurring read marker the marker needs to be updated
674+
// so also the start time of the marker will be updated and the log
675+
// is read from the right start time
676+
this.readMarker = readMarker;
677+
673678
boolean firstRegistration = this.readers.isEmpty();
674679
for (MessageReader reader : readers) {
675680
Preconditions.checkNotNull(reader);
@@ -707,6 +712,30 @@ public synchronized boolean unregisterReader(MessageReader reader) {
707712
return this.readers.remove(reader);
708713
}
709714

715+
@Override
716+
public synchronized boolean unregisterReaderAndStopReadingProcess(MessageReader reader) {
717+
ResourceUnavailableException.verifyOpen(isOpen, "Log", name);
718+
719+
if (readExecutor!=null) {
720+
readExecutor.shutdown();
721+
try {
722+
readExecutor.awaitTermination(1,TimeUnit.SECONDS);
723+
} catch (InterruptedException e) {
724+
log.error("Could not terminate reader thread pool for KCVSLog {} due to interruption", name, e);
725+
}
726+
if (!readExecutor.isTerminated()) {
727+
readExecutor.shutdownNow();
728+
log.error("Reader thread pool for KCVSLog {} did not shut down in time - could not clean up or set read markers", name);
729+
} else {
730+
for (MessagePuller puller : msgPullers) {
731+
puller.close();
732+
}
733+
}
734+
}
735+
736+
return this.readers.remove(reader);
737+
}
738+
710739
private class MessageReaderStateUpdater implements Runnable {
711740
@Override
712741
public void run() {

janusgraph-core/src/main/java/org/janusgraph/graphdb/log/StandardTransactionLogProcessor.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,29 @@ public class StandardTransactionLogProcessor implements TransactionRecovery {
9393
private final BackgroundCleaner cleaner;
9494
private final boolean verboseLogging;
9595

96+
// flag to support starting the log processor multiple times
97+
// for a given graph
98+
private final boolean recurringMode;
99+
96100
private final AtomicLong successTxCounter = new AtomicLong(0);
97101
private final AtomicLong failureTxCounter = new AtomicLong(0);
98102

103+
// counter for storing how many transactions from failureTxCounter
104+
// could not be repaired due to any exception
105+
private final AtomicLong failureTxRepairExceptionCounter = new AtomicLong(0);
106+
107+
private final Log txLog;
108+
private final TxLogMessageReader reader;
109+
99110
private final Cache<StandardTransactionId,TxEntry> txCache;
100111

101112
public StandardTransactionLogProcessor(StandardJanusGraph graph,
102113
Instant startTime) {
114+
this(graph, startTime, false);
115+
}
116+
117+
public StandardTransactionLogProcessor(StandardJanusGraph graph,
118+
Instant startTime, boolean recurringMode) {
103119
Preconditions.checkArgument(graph != null && graph.isOpen());
104120
Preconditions.checkArgument(startTime!=null);
105121
Preconditions.checkArgument(graph.getConfiguration().hasLogTransactions(),
@@ -109,9 +125,10 @@ public StandardTransactionLogProcessor(StandardJanusGraph graph,
109125
Preconditions.checkArgument(maxTxLength != null && !maxTxLength.isZero(),
110126
"Max transaction time cannot be 0");
111127
this.graph = graph;
128+
this.recurringMode = recurringMode;
112129
this.serializer = graph.getDataSerializer();
113130
this.times = graph.getConfiguration().getTimestampProvider();
114-
final Log txLog = graph.getBackend().getSystemTxLog();
131+
this.txLog = graph.getBackend().getSystemTxLog();
115132
this.persistenceTime = graph.getConfiguration().getMaxWriteTime();
116133
this.verboseLogging = graph.getConfiguration().getConfiguration()
117134
.get(GraphDatabaseConfiguration.VERBOSE_TX_RECOVERY);
@@ -123,26 +140,54 @@ public StandardTransactionLogProcessor(StandardJanusGraph graph,
123140
"Unexpected removal cause [%s] for transaction [%s]", cause, key);
124141
if (entry.status == LogTxStatus.SECONDARY_FAILURE || entry.status == LogTxStatus.PRIMARY_SUCCESS) {
125142
failureTxCounter.incrementAndGet();
126-
fixSecondaryFailure(key, entry);
143+
try {
144+
fixSecondaryFailure(key, entry);
145+
} catch (Exception e) {
146+
failureTxRepairExceptionCounter.incrementAndGet();
147+
// pass exception up - here the exception is caught only for
148+
// incrementing the failureTxRepairExceptionCounter counter
149+
throw e;
150+
}
127151
} else {
128152
successTxCounter.incrementAndGet();
129153
}
130154
})
131155
.build();
132156

133-
ReadMarker start = ReadMarker.fromTime(startTime);
134-
txLog.registerReader(start,new TxLogMessageReader());
157+
ReadMarker start;
158+
159+
if (this.recurringMode) {
160+
// create the read marker with recurring mode so it can be registered multiple times
161+
start = ReadMarker.fromTimeRecurring(startTime);
162+
} else {
163+
start = ReadMarker.fromTime(startTime);
164+
}
165+
166+
this.reader = new TxLogMessageReader();
167+
txLog.registerReader(start,this.reader);
135168

136169
cleaner = new BackgroundCleaner();
137170
cleaner.start();
138171
}
139172

140173
public long[] getStatistics() {
141-
return new long[]{successTxCounter.get(),failureTxCounter.get()};
174+
return new long[]{successTxCounter.get(),failureTxCounter.get(),failureTxRepairExceptionCounter.get()};
142175
}
143176

144177
public synchronized void shutdown() throws JanusGraphException {
145178
cleaner.close(CLEAN_SLEEP_TIME);
179+
180+
// in recurring mode, the reader needs to be unregistered and the reading process
181+
// needs to be stopped so resources are not occupied while the log processor is not
182+
// actively running
183+
if (this.recurringMode) {
184+
try {
185+
Thread.sleep(CLEAN_SLEEP_TIME.toMillis());
186+
txLog.unregisterReaderAndStopReadingProcess(this.reader);
187+
} catch (Exception e) {
188+
logger.error("Interrupted while waiting for background cleaner to stop. Reader is not unregistered.", e);
189+
}
190+
}
146191
}
147192

148193
private void logRecoveryMsg(String message, Object... args) {

0 commit comments

Comments
 (0)