Skip to content

Commit ae4560a

Browse files
committed
Adding changes for CompositeExecutionEngine
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent 6562711 commit ae4560a

File tree

14 files changed

+1444
-0
lines changed

14 files changed

+1444
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
.claude
22
CLAUDE.md
33
.cursor*
4+
.kiro*
45

56
# intellij files
67
.idea/
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Shared concurrent queue utilities for the composite indexing engine.
11+
* No external dependencies — pure Java concurrency primitives.
12+
*/
13+
14+
dependencies {
15+
}
16+
17+
testingConventions.enabled = false
18+
19+
tasks.named('forbiddenApisMain').configure {
20+
replaceSignatureFiles 'jdk-signatures'
21+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.composite.queue;
10+
11+
import java.util.Iterator;
12+
import java.util.Queue;
13+
import java.util.concurrent.locks.Lock;
14+
import java.util.concurrent.locks.ReentrantLock;
15+
import java.util.function.Predicate;
16+
import java.util.function.Supplier;
17+
18+
/**
19+
* A striped concurrent queue that distributes entries across multiple internal
20+
* queues using thread-affinity-based hashing. This reduces contention by allowing
21+
* concurrent threads to operate on different stripes without blocking each other.
22+
*
23+
* @param <T> the type of elements held in this queue
24+
* @opensearch.experimental
25+
*/
26+
public final class ConcurrentQueue<T> {
27+
28+
static final int MIN_CONCURRENCY = 1;
29+
static final int MAX_CONCURRENCY = 256;
30+
31+
private final int concurrency;
32+
private final Lock[] locks;
33+
private final Queue<T>[] queues;
34+
private final Supplier<Queue<T>> queueSupplier;
35+
36+
ConcurrentQueue(Supplier<Queue<T>> queueSupplier, int concurrency) {
37+
if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) {
38+
throw new IllegalArgumentException(
39+
"concurrency must be in [" + MIN_CONCURRENCY + ", " + MAX_CONCURRENCY + "], got " + concurrency
40+
);
41+
}
42+
this.concurrency = concurrency;
43+
this.queueSupplier = queueSupplier;
44+
locks = new Lock[concurrency];
45+
@SuppressWarnings({ "rawtypes", "unchecked" })
46+
Queue<T>[] queues = new Queue[concurrency];
47+
this.queues = queues;
48+
for (int i = 0; i < concurrency; ++i) {
49+
locks[i] = new ReentrantLock();
50+
queues[i] = queueSupplier.get();
51+
}
52+
}
53+
54+
void add(T entry) {
55+
// Seed the order in which to look at entries based on the current thread. This helps distribute
56+
// entries across queues and gives a bit of thread affinity between entries and threads, which
57+
// can't hurt.
58+
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
59+
for (int i = 0; i < concurrency; ++i) {
60+
final int index = (threadHash + i) % concurrency;
61+
final Lock lock = locks[index];
62+
final Queue<T> queue = queues[index];
63+
if (lock.tryLock()) {
64+
try {
65+
queue.add(entry);
66+
return;
67+
} finally {
68+
lock.unlock();
69+
}
70+
}
71+
}
72+
final int index = threadHash % concurrency;
73+
final Lock lock = locks[index];
74+
final Queue<T> queue = queues[index];
75+
lock.lock();
76+
try {
77+
queue.add(entry);
78+
} finally {
79+
lock.unlock();
80+
}
81+
}
82+
83+
T poll(Predicate<T> predicate) {
84+
final int threadHash = Thread.currentThread().hashCode() & 0xFFFF;
85+
for (int i = 0; i < concurrency; ++i) {
86+
final int index = (threadHash + i) % concurrency;
87+
final Lock lock = locks[index];
88+
final Queue<T> queue = queues[index];
89+
if (lock.tryLock()) {
90+
try {
91+
Iterator<T> it = queue.iterator();
92+
while (it.hasNext()) {
93+
T entry = it.next();
94+
if (predicate.test(entry)) {
95+
it.remove();
96+
return entry;
97+
}
98+
}
99+
} finally {
100+
lock.unlock();
101+
}
102+
}
103+
}
104+
for (int i = 0; i < concurrency; ++i) {
105+
final int index = (threadHash + i) % concurrency;
106+
final Lock lock = locks[index];
107+
final Queue<T> queue = queues[index];
108+
lock.lock();
109+
try {
110+
Iterator<T> it = queue.iterator();
111+
while (it.hasNext()) {
112+
T entry = it.next();
113+
if (predicate.test(entry)) {
114+
it.remove();
115+
return entry;
116+
}
117+
}
118+
} finally {
119+
lock.unlock();
120+
}
121+
}
122+
return null;
123+
}
124+
125+
boolean remove(T entry) {
126+
for (int i = 0; i < concurrency; ++i) {
127+
final Lock lock = locks[i];
128+
final Queue<T> queue = queues[i];
129+
lock.lock();
130+
try {
131+
if (queue.remove(entry)) {
132+
return true;
133+
}
134+
} finally {
135+
lock.unlock();
136+
}
137+
}
138+
return false;
139+
}
140+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.composite.queue;
10+
11+
import java.util.Queue;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.concurrent.locks.Lock;
14+
import java.util.function.Supplier;
15+
16+
/**
17+
* A concurrent queue wrapper that adds lock-and-poll / add-and-unlock semantics
18+
* on top of {@link ConcurrentQueue}. Entries must implement {@link Lock} so that
19+
* they can be atomically locked when polled and unlocked when returned.
20+
* <p>
21+
* This is used by the composite writer pool to ensure that a writer is locked
22+
* before it is handed out and unlocked when it is returned.
23+
*
24+
* @param <T> the type of lockable elements held in this queue
25+
* @opensearch.experimental
26+
*/
27+
public final class LockableConcurrentQueue<T extends Lock> {
28+
29+
private final ConcurrentQueue<T> queue;
30+
private final AtomicInteger addAndUnlockCounter = new AtomicInteger();
31+
32+
public LockableConcurrentQueue(Supplier<Queue<T>> queueSupplier, int concurrency) {
33+
this.queue = new ConcurrentQueue<>(queueSupplier, concurrency);
34+
}
35+
36+
/**
37+
* Lock an entry, and poll it from the queue, in that order. If no entry can be found and locked,
38+
* {@code null} is returned.
39+
*/
40+
public T lockAndPoll() {
41+
int addAndUnlockCount;
42+
do {
43+
addAndUnlockCount = addAndUnlockCounter.get();
44+
T entry = queue.poll(Lock::tryLock);
45+
if (entry != null) {
46+
return entry;
47+
}
48+
// If an entry has been added to the queue in the meantime, try again.
49+
} while (addAndUnlockCount != addAndUnlockCounter.get());
50+
51+
return null;
52+
}
53+
54+
/** Remove an entry from the queue. */
55+
public boolean remove(T entry) {
56+
return queue.remove(entry);
57+
}
58+
59+
/** Add an entry to the queue and unlock it, in that order. */
60+
public void addAndUnlock(T entry) {
61+
queue.add(entry);
62+
entry.unlock();
63+
addAndUnlockCounter.incrementAndGet();
64+
}
65+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Concurrent queue utilities for the composite indexing engine.
11+
*
12+
* @opensearch.experimental
13+
*/
14+
package org.opensearch.composite.queue;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
opensearchplugin {
10+
description = 'Composite indexing engine plugin that orchestrates multi-format indexing across multiple data format engines.'
11+
classname = 'org.opensearch.composite.CompositeEnginePlugin'
12+
}
13+
14+
dependencies {
15+
api project(':sandbox:libs:composite-engine-lib')
16+
compileOnly project(':server')
17+
testImplementation project(':test:framework')
18+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.composite;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.index.engine.dataformat.DataFormat;
13+
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
14+
15+
import java.util.List;
16+
import java.util.Objects;
17+
import java.util.Set;
18+
19+
/**
20+
* A composite {@link DataFormat} that wraps multiple per-format {@link DataFormat} instances.
21+
* Each constituent format retains its own {@link FieldTypeCapabilities} — field routing is
22+
* handled per-format by {@link CompositeDocumentInput}, not by this class.
23+
*
24+
* @opensearch.experimental
25+
*/
26+
@ExperimentalApi
27+
public class CompositeDataFormat implements DataFormat {
28+
29+
private final List<DataFormat> dataFormats;
30+
31+
/**
32+
* Constructs a CompositeDataFormat from the given list of data formats.
33+
*
34+
* @param dataFormats the constituent data formats
35+
*/
36+
public CompositeDataFormat(List<DataFormat> dataFormats) {
37+
this.dataFormats = List.copyOf(Objects.requireNonNull(dataFormats, "dataFormats must not be null"));
38+
}
39+
40+
public CompositeDataFormat() {
41+
this.dataFormats = List.of();
42+
}
43+
44+
/**
45+
* Returns the list of constituent data formats.
46+
*
47+
* @return the data formats
48+
*/
49+
public List<DataFormat> getDataFormats() {
50+
return dataFormats;
51+
}
52+
53+
@Override
54+
public String name() {
55+
return "composite";
56+
}
57+
58+
@Override
59+
public long priority() {
60+
return Long.MAX_VALUE;
61+
}
62+
63+
@Override
64+
public Set<FieldTypeCapabilities> supportedFields() {
65+
// Union of all constituent formats' supported fields
66+
if (dataFormats.isEmpty()) {
67+
return Set.of();
68+
}
69+
return dataFormats.get(0).supportedFields();
70+
}
71+
72+
@Override
73+
public String toString() {
74+
return "CompositeDataFormat{dataFormats=" + dataFormats + '}';
75+
}
76+
}

0 commit comments

Comments
 (0)