Skip to content

Commit 238416a

Browse files
committed
Add sandbox plugin for composite engine
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent 6562711 commit 238416a

25 files changed

+3222
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
.claude
22
CLAUDE.md
33
.cursor*
4+
.kiro*
45

56
# intellij files
67
.idea/
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Shared concurrent queue utilities for the composite indexing engine.
11+
* No external dependencies — pure Java concurrency primitives.
12+
*/
13+
14+
dependencies {
15+
testImplementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
16+
testImplementation "junit:junit:${versions.junit}"
17+
testImplementation "org.hamcrest:hamcrest:${versions.hamcrest}"
18+
testImplementation(project(":test:framework")) {
19+
exclude group: 'org.opensearch', module: 'opensearch-composite-engine-lib'
20+
}
21+
}
22+
23+
testingConventions.enabled = true
24+
25+
tasks.named('forbiddenApisMain').configure {
26+
replaceSignatureFiles 'jdk-signatures'
27+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.composite.queue;
10+
11+
import java.util.Iterator;
12+
import java.util.Queue;
13+
import java.util.concurrent.locks.Lock;
14+
import java.util.concurrent.locks.ReentrantLock;
15+
import java.util.function.Predicate;
16+
import java.util.function.Supplier;
17+
18+
/**
19+
* A striped concurrent queue that distributes entries across multiple internal
20+
* queues using thread-affinity-based hashing. This reduces contention by allowing
21+
* concurrent threads to operate on different stripes without blocking each other.
22+
*
23+
* @param <T> the type of elements held in this queue
24+
* @opensearch.experimental
25+
*/
26+
public final class ConcurrentQueue<T> {
27+
28+
static final int MIN_CONCURRENCY = 1;
29+
static final int MAX_CONCURRENCY = 256;
30+
31+
private final int concurrency;
32+
private final Lock[] locks;
33+
private final Queue<T>[] queues;
34+
private final Supplier<Queue<T>> queueSupplier;
35+
36+
ConcurrentQueue(Supplier<Queue<T>> queueSupplier, int concurrency) {
37+
if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) {
38+
throw new IllegalArgumentException(
39+
"concurrency must be in [" + MIN_CONCURRENCY + ", " + MAX_CONCURRENCY + "], got " + concurrency
40+
);
41+
}
42+
this.concurrency = concurrency;
43+
this.queueSupplier = queueSupplier;
44+
locks = new Lock[concurrency];
45+
@SuppressWarnings({ "rawtypes", "unchecked" })
46+
Queue<T>[] queues = new Queue[concurrency];
47+
this.queues = queues;
48+
for (int i = 0; i < concurrency; ++i) {
49+
locks[i] = new ReentrantLock();
50+
queues[i] = queueSupplier.get();
51+
}
52+
}
53+
54+
void add(T entry) {
55+
// Seed the order in which to look at entries based on the current thread. This helps distribute
56+
// entries across queues and gives a bit of thread affinity between entries and threads, which
57+
// can't hurt.
58+
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
59+
for (int i = 0; i < concurrency; ++i) {
60+
final int index = (threadHash + i) % concurrency;
61+
final Lock lock = locks[index];
62+
final Queue<T> queue = queues[index];
63+
if (lock.tryLock()) {
64+
try {
65+
queue.add(entry);
66+
return;
67+
} finally {
68+
lock.unlock();
69+
}
70+
}
71+
}
72+
final int index = threadHash % concurrency;
73+
final Lock lock = locks[index];
74+
final Queue<T> queue = queues[index];
75+
lock.lock();
76+
try {
77+
queue.add(entry);
78+
} finally {
79+
lock.unlock();
80+
}
81+
}
82+
83+
T poll(Predicate<T> predicate) {
84+
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
85+
for (int i = 0; i < concurrency; ++i) {
86+
final int index = (threadHash + i) % concurrency;
87+
final Lock lock = locks[index];
88+
final Queue<T> queue = queues[index];
89+
if (lock.tryLock()) {
90+
try {
91+
Iterator<T> it = queue.iterator();
92+
while (it.hasNext()) {
93+
T entry = it.next();
94+
if (predicate.test(entry)) {
95+
it.remove();
96+
return entry;
97+
}
98+
}
99+
} finally {
100+
lock.unlock();
101+
}
102+
}
103+
}
104+
for (int i = 0; i < concurrency; ++i) {
105+
final int index = (threadHash + i) % concurrency;
106+
final Lock lock = locks[index];
107+
final Queue<T> queue = queues[index];
108+
lock.lock();
109+
try {
110+
Iterator<T> it = queue.iterator();
111+
while (it.hasNext()) {
112+
T entry = it.next();
113+
if (predicate.test(entry)) {
114+
it.remove();
115+
return entry;
116+
}
117+
}
118+
} finally {
119+
lock.unlock();
120+
}
121+
}
122+
return null;
123+
}
124+
125+
boolean remove(T entry) {
126+
for (int i = 0; i < concurrency; ++i) {
127+
final Lock lock = locks[i];
128+
final Queue<T> queue = queues[i];
129+
lock.lock();
130+
try {
131+
if (queue.remove(entry)) {
132+
return true;
133+
}
134+
} finally {
135+
lock.unlock();
136+
}
137+
}
138+
return false;
139+
}
140+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.composite.queue;
10+
11+
import java.util.Queue;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.concurrent.locks.Lock;
14+
import java.util.function.Supplier;
15+
16+
/**
17+
* A concurrent queue wrapper that adds lock-and-poll / add-and-unlock semantics
18+
* on top of {@link ConcurrentQueue}. Entries must implement {@link Lock} so that
19+
* they can be atomically locked when polled and unlocked when returned.
20+
* <p>
21+
* This is used by the composite writer pool to ensure that a writer is locked
22+
* before it is handed out and unlocked when it is returned.
23+
*
24+
* @param <T> the type of lockable elements held in this queue
25+
* @opensearch.experimental
26+
*/
27+
public final class LockableConcurrentQueue<T extends Lock> {
28+
29+
private final ConcurrentQueue<T> queue;
30+
private final AtomicInteger addAndUnlockCounter = new AtomicInteger();
31+
32+
/**
33+
* Creates a new lockable concurrent queue.
34+
*
35+
* @param queueSupplier supplier for the underlying queue instances
36+
* @param concurrency the concurrency level (number of stripes)
37+
*/
38+
public LockableConcurrentQueue(Supplier<Queue<T>> queueSupplier, int concurrency) {
39+
this.queue = new ConcurrentQueue<>(queueSupplier, concurrency);
40+
}
41+
42+
/**
43+
* Lock an entry, and poll it from the queue, in that order. If no entry can be found and locked,
44+
* {@code null} is returned.
45+
*/
46+
public T lockAndPoll() {
47+
int addAndUnlockCount;
48+
do {
49+
addAndUnlockCount = addAndUnlockCounter.get();
50+
T entry = queue.poll(Lock::tryLock);
51+
if (entry != null) {
52+
return entry;
53+
}
54+
// If an entry has been added to the queue in the meantime, try again.
55+
} while (addAndUnlockCount != addAndUnlockCounter.get());
56+
57+
return null;
58+
}
59+
60+
/**
61+
* Remove an entry from the queue.
62+
*
63+
* @param entry the entry to remove
64+
* @return {@code true} if the entry was removed
65+
*/
66+
public boolean remove(T entry) {
67+
return queue.remove(entry);
68+
}
69+
70+
/**
71+
* Add an entry to the queue and unlock it, in that order.
72+
*
73+
* @param entry the entry to add and unlock
74+
*/
75+
public void addAndUnlock(T entry) {
76+
queue.add(entry);
77+
entry.unlock();
78+
addAndUnlockCounter.incrementAndGet();
79+
}
80+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Concurrent queue utilities for the composite indexing engine.
11+
*
12+
* @opensearch.experimental
13+
*/
14+
package org.opensearch.composite.queue;

0 commit comments

Comments
 (0)