Skip to content

Commit 01278bc

Browse files
gogo9thapavlo
andauthored
Optimize WorkloadState.addToQueue for large txn rates (#191)
Co-authored-by: Andy Pavlo <[email protected]>
1 parent d9a6a7b commit 01278bc

File tree

1 file changed

+10
-13
lines changed

1 file changed

+10
-13
lines changed

src/main/java/com/oltpbenchmark/WorkloadState.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,32 +59,29 @@ public WorkloadState(BenchmarkState benchmarkState, List<Phase> works, int num_t
5959
* Add a request to do work.
6060
*/
6161
public void addToQueue(int amount, boolean resetQueues) {
62+
int workAdded = 0;
63+
6264
synchronized (this) {
6365
if (resetQueues) {
6466
workQueue.clear();
6567
}
6668

67-
6869
// Only use the work queue if the phase is enabled and rate limited.
6970
if (currentPhase == null || currentPhase.isDisabled()
7071
|| !currentPhase.isRateLimited() || currentPhase.isSerial()) {
7172
return;
72-
} else {
73-
// Add the specified number of procedures to the end of the queue.
74-
for (int i = 0; i < amount; ++i) {
75-
workQueue.add(new SubmittedProcedure(currentPhase.chooseTransaction()));
76-
}
7773
}
78-
79-
// Can't keep up with current rate? Remove the oldest transactions
80-
// (from the front of the queue).
81-
while (workQueue.size() > RATE_QUEUE_LIMIT) {
82-
workQueue.remove();
74+
75+
// Add the specified number of procedures to the end of the queue.
76+
// If we can't keep up with current rate, truncate transactions
77+
for (int i = 0; i < amount && workQueue.size() <= RATE_QUEUE_LIMIT; ++i) {
78+
workQueue.add(new SubmittedProcedure(currentPhase.chooseTransaction()));
79+
workAdded++;
8380
}
8481

8582
// Wake up sleeping workers to deal with the new work.
86-
int numToWake = Math.min(amount, workersWaiting);
87-
for (int i = 0; i < numToWake; ++i) {
83+
int numToWake = Math.min(workAdded, workersWaiting);
84+
while (numToWake-- > 0) {
8885
this.notify();
8986
}
9087
}

0 commit comments

Comments
 (0)