Skip to content

Commit 2823348

Browse files
authored
Merge pull request #199 from Bukhtawar/ciee-refactor
Simplify Lockable interface, adds safety for data formats
2 parents e68f39f + 9a8d89d commit 2823348

File tree

13 files changed

+138
-103
lines changed

13 files changed

+138
-103
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.composite.queue;
10+
11+
import jdk.jfr.Experimental;
12+
13+
/**
14+
* A minimal locking contract for objects managed by a {@link LockableConcurrentQueue}.
15+
*
16+
* @opensearch.experimental
17+
*/
18+
@Experimental
19+
public interface Lockable {
20+
21+
/**
22+
* Acquires the lock.
23+
*/
24+
void lock();
25+
26+
/**
27+
* Attempts to acquire the lock without blocking.
28+
*
29+
* @return {@code true} if the lock was acquired
30+
*/
31+
boolean tryLock();
32+
33+
/**
34+
* Releases the lock.
35+
*/
36+
void unlock();
37+
}

sandbox/libs/composite-engine-lib/src/main/java/org/opensearch/composite/queue/LockableConcurrentQueue.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,11 @@
1010

1111
import java.util.Queue;
1212
import java.util.concurrent.atomic.AtomicInteger;
13-
import java.util.concurrent.locks.Lock;
1413
import java.util.function.Supplier;
1514

1615
/**
1716
* A concurrent queue wrapper that adds lock-and-poll / add-and-unlock semantics
18-
* on top of {@link ConcurrentQueue}. Entries must implement {@link Lock} so that
17+
* on top of {@link ConcurrentQueue}. Entries must implement {@link Lockable} so that
1918
* they can be atomically locked when polled and unlocked when returned.
2019
* <p>
2120
* This is used by the composite writer pool to ensure that a writer is locked
@@ -24,7 +23,7 @@
2423
* @param <T> the type of lockable elements held in this queue
2524
* @opensearch.experimental
2625
*/
27-
public final class LockableConcurrentQueue<T extends Lock> {
26+
public final class LockableConcurrentQueue<T extends Lockable> {
2827

2928
private final ConcurrentQueue<T> queue;
3029
private final AtomicInteger addAndUnlockCounter = new AtomicInteger();
@@ -47,7 +46,7 @@ public T lockAndPoll() {
4746
int addAndUnlockCount;
4847
do {
4948
addAndUnlockCount = addAndUnlockCounter.get();
50-
T entry = queue.poll(Lock::tryLock);
49+
T entry = queue.poll(Lockable::tryLock);
5150
if (entry != null) {
5251
return entry;
5352
}

sandbox/libs/composite-engine-lib/src/test/java/org/opensearch/composite/queue/LockableConcurrentQueueTests.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,37 @@ public class LockableConcurrentQueueTests extends OpenSearchTestCase {
2424
/**
2525
* A simple lockable entry for testing.
2626
*/
27-
static class LockableEntry extends ReentrantLock {
27+
static class LockableEntry implements Lockable {
2828
final String id;
29+
private final ReentrantLock delegate = new ReentrantLock();
2930

3031
LockableEntry(String id) {
3132
this.id = id;
3233
}
3334

35+
@Override
36+
public void lock() {
37+
delegate.lock();
38+
}
39+
40+
@Override
41+
public boolean tryLock() {
42+
return delegate.tryLock();
43+
}
44+
45+
@Override
46+
public void unlock() {
47+
delegate.unlock();
48+
}
49+
50+
boolean isHeldByCurrentThread() {
51+
return delegate.isHeldByCurrentThread();
52+
}
53+
54+
boolean isLocked() {
55+
return delegate.isLocked();
56+
}
57+
3458
@Override
3559
public String toString() {
3660
return "LockableEntry{" + id + "}";

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* @opensearch.experimental
2525
*/
2626
@ExperimentalApi
27-
public class CompositeDataFormat implements DataFormat {
27+
public class CompositeDataFormat extends DataFormat {
2828

2929
private final List<DataFormat> dataFormats;
3030

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeEnginePlugin.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.opensearch.index.IndexSettings;
1818
import org.opensearch.index.engine.dataformat.DataFormat;
1919
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
20-
import org.opensearch.index.engine.dataformat.DocumentInput;
2120
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
2221
import org.opensearch.index.mapper.MapperService;
2322
import org.opensearch.index.shard.ShardPath;
@@ -129,6 +128,10 @@ public void loadExtensions(ExtensionLoader loader) {
129128
continue;
130129
}
131130
String name = format.name();
131+
if (name == null || name.isBlank()) {
132+
logger.warn("DataFormatPlugin [{}] returned a DataFormat with null/blank name, skipping", plugin.getClass().getName());
133+
continue;
134+
}
132135
DataFormatPlugin existing = registry.get(name);
133136
if (existing != null) {
134137
long existingPriority = existing.getDataFormat().priority();
@@ -180,13 +183,12 @@ public DataFormat getDataFormat() {
180183
}
181184

182185
@Override
183-
@SuppressWarnings("unchecked")
184-
public <T extends DataFormat, P extends DocumentInput<?>> IndexingExecutionEngine<T, P> indexingEngine(
186+
public IndexingExecutionEngine<?, ?> indexingEngine(
185187
MapperService mapperService,
186188
ShardPath shardPath,
187189
IndexSettings indexSettings
188190
) {
189-
return (IndexingExecutionEngine<T, P>) new CompositeIndexingExecutionEngine(
191+
return new CompositeIndexingExecutionEngine(
190192
dataFormatPlugins,
191193
indexSettings,
192194
mapperService,

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,6 @@ public CompositeIndexingExecutionEngine(
105105

106106
List<IndexingExecutionEngine<?, ?>> secondaries = new ArrayList<>();
107107
for (String secondaryName : secondaryFormatNames) {
108-
if (secondaryName.equals(primaryFormatName)) {
109-
logger.warn("Secondary data format [{}] is the same as primary, skipping duplicate", secondaryName);
110-
continue;
111-
}
112108
DataFormatPlugin secondaryPlugin = dataFormatPlugins.get(secondaryName);
113109
secondaries.add(secondaryPlugin.indexingEngine(mapperService, shardPath, indexSettings));
114110
allFormats.add(secondaryPlugin.getDataFormat());

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java

Lines changed: 27 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@
2121

2222
import java.io.IOException;
2323
import java.util.AbstractMap;
24-
import java.util.Collections;
2524
import java.util.LinkedHashMap;
2625
import java.util.Map;
2726
import java.util.Optional;
28-
import java.util.concurrent.TimeUnit;
29-
import java.util.concurrent.locks.Condition;
30-
import java.util.concurrent.locks.Lock;
3127
import java.util.concurrent.locks.ReentrantLock;
3228

29+
import org.opensearch.composite.queue.Lockable;
30+
3331
/**
3432
* A composite {@link Writer} that wraps one {@link Writer} per registered data format
3533
* and delegates write operations to each per-format writer.
@@ -42,12 +40,12 @@
4240
* @opensearch.experimental
4341
*/
4442
@ExperimentalApi
45-
public class CompositeWriter implements Writer<CompositeDocumentInput>, Lock {
43+
public class CompositeWriter implements Writer<CompositeDocumentInput>, Lockable {
4644

4745
private static final Logger logger = LogManager.getLogger(CompositeWriter.class);
4846

4947
private final Map.Entry<DataFormat, Writer<DocumentInput<?>>> primaryWriter;
50-
private final Map<DataFormat, Writer<DocumentInput<?>>> secondaryWriters;
48+
private final Map<DataFormat, Writer<DocumentInput<?>>> secondaryWritersByFormat;
5149
private final ReentrantLock lock;
5250
private final long writerGeneration;
5351
private final RowIdGenerator rowIdGenerator;
@@ -79,9 +77,12 @@ public CompositeWriter(CompositeIndexingExecutionEngine engine, long writerGener
7977

8078
Map<DataFormat, Writer<DocumentInput<?>>> secondaries = new LinkedHashMap<>();
8179
for (IndexingExecutionEngine<?, ?> delegate : engine.getSecondaryDelegates()) {
82-
secondaries.put(delegate.getDataFormat(), (Writer<DocumentInput<?>>) delegate.createWriter(writerGeneration));
80+
secondaries.put(
81+
delegate.getDataFormat(),
82+
(Writer<DocumentInput<?>>) delegate.createWriter(writerGeneration)
83+
);
8384
}
84-
this.secondaryWriters = Collections.unmodifiableMap(secondaries);
85+
this.secondaryWritersByFormat = Map.copyOf(secondaries);
8586
this.rowIdGenerator = new RowIdGenerator(CompositeWriter.class.getName());
8687
}
8788

@@ -97,19 +98,20 @@ public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
9798
}
9899
}
99100

100-
// Then write to each secondary by matching format keys
101-
Map<DataFormat, DocumentInput<?>> secondaryInputMap = doc.getSecondaryInputs();
102-
for (Map.Entry<DataFormat, Writer<DocumentInput<?>>> entry : secondaryWriters.entrySet()) {
103-
DocumentInput<?> input = secondaryInputMap.get(entry.getKey());
104-
if (input == null) {
105-
logger.warn("No secondary input found for format [{}], skipping", entry.getKey().name());
101+
// Then write to each secondary — keyed lookup by DataFormat (equals/hashCode based on name)
102+
Map<DataFormat, DocumentInput<?>> secondaryInputs = doc.getSecondaryInputs();
103+
for (Map.Entry<DataFormat, DocumentInput<?>> inputEntry : secondaryInputs.entrySet()) {
104+
DataFormat format = inputEntry.getKey();
105+
Writer<DocumentInput<?>> writer = secondaryWritersByFormat.get(format);
106+
if (writer == null) {
107+
logger.warn("No writer found for secondary format [{}], skipping", format.name());
106108
continue;
107109
}
108-
WriteResult result = entry.getValue().addDoc(input);
110+
WriteResult result = writer.addDoc(inputEntry.getValue());
109111
switch (result) {
110-
case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", entry.getKey().name());
112+
case WriteResult.Success s -> logger.trace("Successfully added document in secondary format [{}]", format.name());
111113
case WriteResult.Failure f -> {
112-
logger.debug("Failed to add document in secondary format [{}]", entry.getKey().name());
114+
logger.debug("Failed to add document in secondary format [{}]", format.name());
113115
return result;
114116
}
115117
}
@@ -125,25 +127,28 @@ public FileInfos flush() throws IOException {
125127
Optional<WriterFileSet> primaryWfs = primaryWriter.getValue().flush().getWriterFileSet(primaryWriter.getKey());
126128
primaryWfs.ifPresent(writerFileSet -> builder.putWriterFileSet(primaryWriter.getKey(), writerFileSet));
127129
// Flush secondaries
128-
for (Map.Entry<DataFormat, Writer<DocumentInput<?>>> entry : secondaryWriters.entrySet()) {
129-
Optional<WriterFileSet> wfs = entry.getValue().flush().getWriterFileSet(entry.getKey());
130-
wfs.ifPresent(writerFileSet -> builder.putWriterFileSet(entry.getKey(), writerFileSet));
130+
for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
131+
FileInfos fileInfos = writer.flush();
132+
// Iterate all format entries in the returned FileInfos
133+
for (Map.Entry<DataFormat, WriterFileSet> fileEntry : fileInfos.writerFilesMap().entrySet()) {
134+
builder.putWriterFileSet(fileEntry.getKey(), fileEntry.getValue());
135+
}
131136
}
132137
return builder.build();
133138
}
134139

135140
@Override
136141
public void sync() throws IOException {
137142
primaryWriter.getValue().sync();
138-
for (Writer<DocumentInput<?>> writer : secondaryWriters.values()) {
143+
for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
139144
writer.sync();
140145
}
141146
}
142147

143148
@Override
144149
public void close() throws IOException {
145150
primaryWriter.getValue().close();
146-
for (Writer<DocumentInput<?>> writer : secondaryWriters.values()) {
151+
for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
147152
writer.close();
148153
}
149154
}
@@ -194,28 +199,13 @@ public void lock() {
194199
lock.lock();
195200
}
196201

197-
@Override
198-
public void lockInterruptibly() throws InterruptedException {
199-
lock.lockInterruptibly();
200-
}
201-
202202
@Override
203203
public boolean tryLock() {
204204
return lock.tryLock();
205205
}
206206

207-
@Override
208-
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
209-
return lock.tryLock(time, unit);
210-
}
211-
212207
@Override
213208
public void unlock() {
214209
lock.unlock();
215210
}
216-
217-
@Override
218-
public Condition newCondition() {
219-
throw new UnsupportedOperationException();
220-
}
221211
}

sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeEnginePluginTests.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,11 @@ public DataFormat getDataFormat() {
156156
}
157157

158158
@Override
159-
public <
160-
T extends DataFormat,
161-
P extends org.opensearch.index.engine.dataformat.DocumentInput<?>>
162-
org.opensearch.index.engine.dataformat.IndexingExecutionEngine<T, P>
163-
indexingEngine(
164-
org.opensearch.index.mapper.MapperService mapperService,
165-
org.opensearch.index.shard.ShardPath shardPath,
166-
IndexSettings indexSettings
167-
) {
159+
public org.opensearch.index.engine.dataformat.IndexingExecutionEngine<?, ?> indexingEngine(
160+
org.opensearch.index.mapper.MapperService mapperService,
161+
org.opensearch.index.shard.ShardPath shardPath,
162+
IndexSettings indexSettings
163+
) {
168164
return null;
169165
}
170166
};

sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeTestHelper.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,12 @@ public DataFormat getDataFormat() {
7676
}
7777

7878
@Override
79-
@SuppressWarnings("unchecked")
80-
public <T extends DataFormat, P extends DocumentInput<?>> IndexingExecutionEngine<T, P> indexingEngine(
79+
public IndexingExecutionEngine<?, ?> indexingEngine(
8180
MapperService mapperService,
8281
ShardPath shardPath,
8382
IndexSettings indexSettings
8483
) {
85-
return (IndexingExecutionEngine<T, P>) new StubIndexingExecutionEngine(format);
84+
return new StubIndexingExecutionEngine(format);
8685
}
8786
};
8887
}
@@ -96,13 +95,12 @@ public DataFormat getDataFormat() {
9695
}
9796

9897
@Override
99-
@SuppressWarnings("unchecked")
100-
public <T extends DataFormat, P extends DocumentInput<?>> IndexingExecutionEngine<T, P> indexingEngine(
98+
public IndexingExecutionEngine<?, ?> indexingEngine(
10199
MapperService mapperService,
102100
ShardPath shardPath,
103101
IndexSettings indexSettings
104102
) {
105-
return (IndexingExecutionEngine<T, P>) new StubIndexingExecutionEngine(format);
103+
return new StubIndexingExecutionEngine(format);
106104
}
107105
};
108106
}

sandbox/plugins/composite-engine/src/test/java/org/opensearch/composite/CompositeWriterTests.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.opensearch.test.OpenSearchTestCase;
1313

1414
import java.io.IOException;
15-
import java.util.concurrent.TimeUnit;
1615

1716
/**
1817
* Tests for {@link CompositeWriter}.
@@ -76,19 +75,6 @@ public void testTryLockSucceedsWhenUnlocked() throws IOException {
7675
writer.close();
7776
}
7877

79-
public void testTryLockWithTimeoutSucceeds() throws Exception {
80-
CompositeWriter writer = new CompositeWriter(engine, 0);
81-
assertTrue(writer.tryLock(100, TimeUnit.MILLISECONDS));
82-
writer.unlock();
83-
writer.close();
84-
}
85-
86-
public void testNewConditionThrowsUnsupported() throws IOException {
87-
CompositeWriter writer = new CompositeWriter(engine, 0);
88-
expectThrows(UnsupportedOperationException.class, writer::newCondition);
89-
writer.close();
90-
}
91-
9278
public void testFlushReturnsFileInfos() throws IOException {
9379
CompositeWriter writer = new CompositeWriter(engine, 0);
9480
FileInfos fileInfos = writer.flush();
@@ -109,10 +95,4 @@ public void testCloseDoesNotThrow() throws IOException {
10995
writer.close();
11096
}
11197

112-
public void testLockInterruptiblySucceeds() throws Exception {
113-
CompositeWriter writer = new CompositeWriter(engine, 0);
114-
writer.lockInterruptibly();
115-
writer.unlock();
116-
writer.close();
117-
}
11898
}

0 commit comments

Comments
 (0)