Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.claude
CLAUDE.md
.cursor*
.kiro*

# intellij files
.idea/
Expand Down
1 change: 1 addition & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies {
// us to invoke the JMH uberjar as usual.
exclude group: 'net.sf.jopt-simple', module: 'jopt-simple'
}
api project(':libs:opensearch-concurrent-queue')
api "org.openjdk.jmh:jmh-core:$versions.jmh"
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh"
// Dependencies of JMH
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.benchmark.queue;

import org.opensearch.common.queue.Lockable;
import org.opensearch.common.queue.LockableConcurrentQueue;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* JMH benchmark for {@link LockableConcurrentQueue} measuring throughput of
* lock-and-poll / add-and-unlock cycles under varying concurrency levels.
* <p>
* Includes two benchmark groups:
* <ul>
* <li>{@code pollAndReturn} — minimal overhead: poll an entry and immediately return it.</li>
* <li>{@code writerWorkload} — simulates a writer pool: poll an entry, perform simulated
* document writes (CPU work), then return the entry. Models the composite writer
* checkout-write-return cycle.</li>
* </ul>
*/
@Fork(3)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
@SuppressWarnings("unused")
public class LockableConcurrentQueueBenchmark {

@Param({ "1", "4", "8" })
int concurrency;

@Param({ "16", "64" })
int poolSize;

private LockableConcurrentQueue<LockableEntry> queue;

@Setup(Level.Iteration)
public void setup() {
queue = new LockableConcurrentQueue<>(LinkedList::new, concurrency);
for (int i = 0; i < poolSize; i++) {
LockableEntry entry = new LockableEntry();
entry.lock();
queue.addAndUnlock(entry);
}
}

// ---- pollAndReturn: minimal overhead benchmarks ----

@Benchmark
@Threads(1)
public LockableEntry pollAndReturn_1thread() {
return pollAndReturn();
}

@Benchmark
@Threads(4)
public LockableEntry pollAndReturn_4threads() {
return pollAndReturn();
}

@Benchmark
@Threads(8)
public LockableEntry pollAndReturn_8threads() {
return pollAndReturn();
}

private LockableEntry pollAndReturn() {
LockableEntry entry = queue.lockAndPoll();
if (entry != null) {
queue.addAndUnlock(entry);
}
return entry;
}

// ---- writerWorkload: simulated writer pool benchmarks ----

@Benchmark
@Threads(4)
public void writerWorkload_4threads(Blackhole bh) {
writerWorkload(bh);
}

@Benchmark
@Threads(8)
public void writerWorkload_8threads(Blackhole bh) {
writerWorkload(bh);
}

@Benchmark
@Threads(16)
public void writerWorkload_16threads(Blackhole bh) {
writerWorkload(bh);
}

/**
* Simulates a writer pool cycle: checkout an entry, perform CPU work
* representing document indexing across multiple formats, then return it.
*/
private void writerWorkload(Blackhole bh) {
LockableEntry entry = queue.lockAndPoll();
if (entry != null) {
// Simulate document write work (field additions across formats)
bh.consume(simulateDocumentWrite(entry));
queue.addAndUnlock(entry);
}
}

/**
* Simulates the CPU cost of writing a document to multiple data formats.
* Performs arithmetic work to prevent JIT elimination while keeping
* the hold time realistic relative to a real addDoc call.
*/
private static long simulateDocumentWrite(LockableEntry entry) {
long result = entry.hashCode();
// ~10 field additions across 2 formats
for (int i = 0; i < 20; i++) {
result ^= (result << 13);
result ^= (result >> 7);
result ^= (result << 17);
}
return result;
}

static final class LockableEntry implements Lockable {
private final ReentrantLock lock = new ReentrantLock();

@Override
public void lock() {
lock.lock();
}

@Override
public boolean tryLock() {
return lock.tryLock();
}

@Override
public void unlock() {
lock.unlock();
}
}
}
33 changes: 33 additions & 0 deletions libs/concurrent-queue/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Shared concurrent queue utilities for the composite indexing engine.
* No external dependencies — pure Java concurrency primitives.
*/

dependencies {
/*******
* !!!! NO RUNTIME DEPENDENCIES !!!!
*******/

testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testImplementation "junit:junit:${versions.junit}"
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"

testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'opensearch-concurrent-queue'
}
}

testingConventions.enabled = true

tasks.named('forbiddenApisMain').configure {
// :libs:opensearch-concurrent-queue does not depend on server
replaceSignatureFiles 'jdk-signatures'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.queue;

import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* A striped concurrent queue that distributes entries across multiple internal
* queues using thread-affinity-based hashing. This reduces contention by allowing
* concurrent threads to operate on different stripes without blocking each other.
*
* @param <T> the type of elements held in this queue
* @opensearch.experimental
*/
public final class ConcurrentQueue<T> {

static final int MIN_CONCURRENCY = 1;
static final int MAX_CONCURRENCY = 256;

private final int concurrency;
private final Lock[] locks;
private final Queue<T>[] queues;
private final Supplier<Queue<T>> queueSupplier;
/** Maps each entry to its queue index so that {@link #remove} can go directly to the right queue. */
private final ConcurrentHashMap<T, Integer> queueIndex;

ConcurrentQueue(Supplier<Queue<T>> queueSupplier, int concurrency) {
if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) {
throw new IllegalArgumentException(
"concurrency must be in [" + MIN_CONCURRENCY + ", " + MAX_CONCURRENCY + "], got " + concurrency
);
}
this.concurrency = concurrency;
this.queueSupplier = queueSupplier;
this.queueIndex = new ConcurrentHashMap<>();
locks = new Lock[concurrency];
@SuppressWarnings({ "rawtypes", "unchecked" })
Queue<T>[] queues = new Queue[concurrency];
this.queues = queues;
for (int i = 0; i < concurrency; ++i) {
locks[i] = new ReentrantLock();
queues[i] = queueSupplier.get();
}
}

void add(T entry) {
// Seed the order in which to look at entries based on the current thread. This helps distribute
// entries across queues and gives a bit of thread affinity between entries and threads, which
// can't hurt.
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final Queue<T> queue = queues[index];
if (lock.tryLock()) {
try {
queue.add(entry);
queueIndex.put(entry, index);
return;
} finally {
lock.unlock();
}
}
}
final int index = threadHash % concurrency;
final Lock lock = locks[index];
final Queue<T> queue = queues[index];
lock.lock();
try {
queue.add(entry);
queueIndex.put(entry, index);
} finally {
lock.unlock();
}
}

T poll(Predicate<T> predicate) {
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final Queue<T> queue = queues[index];
if (lock.tryLock()) {
try {
Iterator<T> it = queue.iterator();
while (it.hasNext()) {
T entry = it.next();
if (predicate.test(entry)) {
it.remove();
queueIndex.remove(entry);
return entry;
}
}
} finally {
lock.unlock();
}
}
}
for (int i = 0; i < concurrency; ++i) {
final int index = (threadHash + i) % concurrency;
final Lock lock = locks[index];
final Queue<T> queue = queues[index];
lock.lock();
try {
Iterator<T> it = queue.iterator();
while (it.hasNext()) {
T entry = it.next();
if (predicate.test(entry)) {
it.remove();
queueIndex.remove(entry);
return entry;
}
}
} finally {
lock.unlock();
}
}
return null;
}

boolean remove(T entry) {
Integer queueIdx = queueIndex.get(entry);
if (queueIdx != null) {
final Lock lock = locks[queueIdx];
final Queue<T> queue = queues[queueIdx];
lock.lock();
try {
if (queue.remove(entry)) {
queueIndex.remove(entry);
return true;
}
} finally {
lock.unlock();
}
}
// Fallback: entry may have been re-added to a different queue between the index lookup
// and the lock acquisition, so scan all stripes.
for (int i = 0; i < concurrency; ++i) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid going through all the entries here by maintaining an auxiliary data structure?

final Lock lock = locks[i];
final Queue<T> queue = queues[i];
lock.lock();
try {
if (queue.remove(entry)) {
queueIndex.remove(entry);
return true;
}
} finally {
lock.unlock();
}
}
return false;
}
}
Loading
Loading