Skip to content

Commit aab0a3d

Browse files
authored
Wire IngestService from Node to IngestionEngine for pull-based ingestion (opensearch-project#20757)
* Wire IngestService from Node to IngestionEngine for pull-based ingestion Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
1 parent 8f8f7b5 commit aab0a3d

File tree

6 files changed

+119
-4
lines changed

6 files changed

+119
-4
lines changed

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.indices.pollingingest.IngestionSettings;
4141
import org.opensearch.indices.pollingingest.PollingIngestStats;
4242
import org.opensearch.indices.pollingingest.StreamPoller;
43+
import org.opensearch.ingest.IngestService;
4344

4445
import java.io.IOException;
4546
import java.util.HashMap;
@@ -60,11 +61,17 @@ public class IngestionEngine extends InternalEngine {
6061
private StreamPoller streamPoller;
6162
private final IngestionConsumerFactory ingestionConsumerFactory;
6263
private final DocumentMapperForType documentMapperForType;
64+
private final IngestService ingestService;
6365
private volatile IngestionShardPointer lastCommittedBatchStartPointer;
6466

6567
public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory) {
68+
this(engineConfig, ingestionConsumerFactory, null);
69+
}
70+
71+
public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory, IngestService ingestService) {
6672
super(engineConfig);
6773
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
74+
this.ingestService = ingestService;
6875
this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get();
6976
registerDynamicIndexSettingsHandlers();
7077
}

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@
164164
import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsPublisher;
165165
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
166166
import org.opensearch.indices.replication.common.ReplicationType;
167+
import org.opensearch.ingest.IngestService;
167168
import org.opensearch.node.Node;
168169
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
169170
import org.opensearch.plugins.IndexStorePlugin;
@@ -395,6 +396,7 @@ public class IndicesService extends AbstractLifecycleComponent
395396
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
396397
private final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories;
397398
private final Map<String, IngestionConsumerFactory> ingestionConsumerFactories;
399+
private final Supplier<IngestService> ingestServiceSupplier;
398400
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
399401
private final Map<String, IndexStorePlugin.StoreFactory> storeFactories;
400402
final AbstractRefCounted indicesRefCount; // pkg-private for testing
@@ -458,6 +460,7 @@ public IndicesService(
458460
SearchRequestStats searchRequestStats,
459461
@Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
460462
Map<String, IngestionConsumerFactory> ingestionConsumerFactories,
463+
Supplier<IngestService> ingestServiceSupplier,
461464
RecoverySettings recoverySettings,
462465
CacheService cacheService,
463466
RemoteStoreSettings remoteStoreSettings,
@@ -519,6 +522,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
519522
this.recoveryStateFactories = recoveryStateFactories;
520523
this.storeFactories = storeFactories;
521524
this.ingestionConsumerFactories = ingestionConsumerFactories;
525+
this.ingestServiceSupplier = ingestServiceSupplier;
522526
// doClose() is called when shutting down a node, yet there might still be ongoing requests
523527
// that we need to wait for before closing some resources such as the caches. In order to
524528
// avoid closing these resources while ongoing requests are still being processed, we use a
@@ -666,6 +670,7 @@ public IndicesService(
666670
searchRequestStats,
667671
remoteStoreStatsTrackerFactory,
668672
ingestionConsumerFactories,
673+
() -> null,
669674
recoverySettings,
670675
cacheService,
671676
remoteStoreSettings,
@@ -1164,7 +1169,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
11641169
// streaming ingestion
11651170
if (indexMetadata != null && indexMetadata.useIngestionSource()) {
11661171
IngestionConsumerFactory ingestionConsumerFactory = getIngestionConsumerFactory(idxSettings);
1167-
return new IngestionEngineFactory(ingestionConsumerFactory);
1172+
return new IngestionEngineFactory(ingestionConsumerFactory, ingestServiceSupplier);
11681173
}
11691174

11701175
final List<Optional<EngineFactory>> engineFactories = engineFactoryProviders.stream()

server/src/main/java/org/opensearch/indices/pollingingest/IngestionEngineFactory.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,30 @@
99
package org.opensearch.indices.pollingingest;
1010

1111
import org.opensearch.cluster.metadata.IngestionSource;
12+
import org.opensearch.common.Nullable;
1213
import org.opensearch.index.IngestionConsumerFactory;
1314
import org.opensearch.index.engine.Engine;
1415
import org.opensearch.index.engine.EngineConfig;
1516
import org.opensearch.index.engine.EngineFactory;
1617
import org.opensearch.index.engine.IngestionEngine;
1718
import org.opensearch.index.engine.NRTReplicationEngine;
19+
import org.opensearch.ingest.IngestService;
1820

1921
import java.util.Objects;
22+
import java.util.function.Supplier;
2023

2124
/**
2225
* Engine Factory implementation used with streaming ingestion.
2326
*/
2427
public class IngestionEngineFactory implements EngineFactory {
2528

2629
private final IngestionConsumerFactory ingestionConsumerFactory;
30+
@Nullable
31+
private final Supplier<IngestService> ingestServiceSupplier;
2732

28-
public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory) {
33+
public IngestionEngineFactory(IngestionConsumerFactory ingestionConsumerFactory, Supplier<IngestService> ingestServiceSupplier) {
2934
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
35+
this.ingestServiceSupplier = ingestServiceSupplier;
3036
}
3137

3238
/**
@@ -39,9 +45,13 @@ public Engine newReadWriteEngine(EngineConfig config) {
3945
IngestionSource ingestionSource = config.getIndexSettings().getIndexMetadata().getIngestionSource();
4046
boolean isAllActiveIngestion = ingestionSource != null && ingestionSource.isAllActiveIngestionEnabled();
4147

48+
IngestService ingestService = ingestServiceSupplier != null ? ingestServiceSupplier.get() : null;
49+
assert ingestService != null || ingestServiceSupplier == null
50+
: "IngestService supplier returned null. This indicates a initialization ordering issue.";
51+
4252
if (isAllActiveIngestion) {
4353
// use ingestion engine on both primary and replica in all-active mode
44-
IngestionEngine ingestionEngine = new IngestionEngine(config, ingestionConsumerFactory);
54+
IngestionEngine ingestionEngine = new IngestionEngine(config, ingestionConsumerFactory, ingestService);
4555
ingestionEngine.start();
4656
return ingestionEngine;
4757
}
@@ -52,7 +62,7 @@ public Engine newReadWriteEngine(EngineConfig config) {
5262
return new NRTReplicationEngine(config);
5363
}
5464

55-
IngestionEngine ingestionEngine = new IngestionEngine(config, ingestionConsumerFactory);
65+
IngestionEngine ingestionEngine = new IngestionEngine(config, ingestionConsumerFactory, ingestService);
5666
ingestionEngine.start();
5767
return ingestionEngine;
5868
}

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,7 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
626626
}
627627

628628
final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
629+
final SetOnce<IngestService> ingestServiceReference = new SetOnce<>();
629630
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(repositoriesServiceReference::get, threadPool);
630631
localNodeFactory = new RemoteStoreVerifyingLocalNodeFactory(settings, nodeEnvironment.nodeId(), remoteStoreNodeService);
631632
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
@@ -1001,6 +1002,7 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
10011002
searchRequestStats,
10021003
remoteStoreStatsTrackerFactory,
10031004
ingestionConsumerFactories,
1005+
ingestServiceReference::get,
10041006
recoverySettings,
10051007
cacheService,
10061008
remoteStoreSettings,
@@ -1022,6 +1024,7 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
10221024
xContentRegistry,
10231025
new SystemIngestPipelineCache()
10241026
);
1027+
ingestServiceReference.set(ingestService);
10251028

10261029
final FsServiceProvider fsServiceProvider = new FsServiceProvider(
10271030
settings,

server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.opensearch.indices.pollingingest.PollingIngestStats;
2626
import org.opensearch.indices.pollingingest.StreamPoller;
2727
import org.opensearch.indices.replication.common.ReplicationType;
28+
import org.opensearch.ingest.IngestService;
2829
import org.opensearch.test.IndexSettingsModule;
2930
import org.junit.After;
3031
import org.junit.Assert;
@@ -265,4 +266,59 @@ private boolean resultsFound(Engine engine, int numDocs) {
265266
return searcher.getIndexReader().numDocs() == numDocs;
266267
}
267268
}
269+
270+
public void testConstructorWithNullIngestService() throws IOException {
271+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
272+
Store testStore = createStore(indexSettings, newDirectory());
273+
FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages);
274+
275+
EngineConfig config = config(indexSettings, testStore, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
276+
String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}";
277+
MapperService mapperService = createMapperService(mapping);
278+
config = config(config, () -> new DocumentMapperForType(mapperService.documentMapper(), null), clusterApplierService);
279+
280+
testStore.createEmpty(config.getIndexSettings().getIndexVersionCreated().luceneVersion);
281+
final String translogUuid = Translog.createEmptyTranslog(
282+
config.getTranslogConfig().getTranslogPath(),
283+
SequenceNumbers.NO_OPS_PERFORMED,
284+
shardId,
285+
primaryTerm.get()
286+
);
287+
testStore.associateIndexWithNewTranslog(translogUuid);
288+
289+
// null IngestService — engine should start without pipeline support
290+
IngestionEngine engine = new IngestionEngine(config, consumerFactory, null);
291+
engine.start();
292+
waitForResults(engine, 2);
293+
engine.close();
294+
testStore.close();
295+
}
296+
297+
public void testConstructorWithNonNullIngestService() throws IOException {
298+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
299+
Store testStore = createStore(indexSettings, newDirectory());
300+
FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages);
301+
302+
EngineConfig config = config(indexSettings, testStore, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
303+
String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}";
304+
MapperService mapperService = createMapperService(mapping);
305+
config = config(config, () -> new DocumentMapperForType(mapperService.documentMapper(), null), clusterApplierService);
306+
307+
testStore.createEmpty(config.getIndexSettings().getIndexVersionCreated().luceneVersion);
308+
final String translogUuid = Translog.createEmptyTranslog(
309+
config.getTranslogConfig().getTranslogPath(),
310+
SequenceNumbers.NO_OPS_PERFORMED,
311+
shardId,
312+
primaryTerm.get()
313+
);
314+
testStore.associateIndexWithNewTranslog(translogUuid);
315+
316+
// non-null IngestService — engine should start with pipeline support available
317+
IngestService ingestService = mock(IngestService.class);
318+
IngestionEngine engine = new IngestionEngine(config, consumerFactory, ingestService);
319+
engine.start();
320+
waitForResults(engine, 2);
321+
engine.close();
322+
testStore.close();
323+
}
268324
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest;
10+
11+
import org.opensearch.index.IngestionConsumerFactory;
12+
import org.opensearch.ingest.IngestService;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
import static org.mockito.Mockito.mock;
16+
17+
public class IngestionEngineFactoryTests extends OpenSearchTestCase {
18+
19+
public void testConstructorWithIngestServiceSupplier() {
20+
IngestionConsumerFactory consumerFactory = mock(IngestionConsumerFactory.class);
21+
IngestService ingestService = mock(IngestService.class);
22+
23+
IngestionEngineFactory factory = new IngestionEngineFactory(consumerFactory, () -> ingestService);
24+
assertNotNull(factory);
25+
}
26+
27+
public void testConstructorWithNullIngestServiceSupplier() {
28+
IngestionConsumerFactory consumerFactory = mock(IngestionConsumerFactory.class);
29+
30+
// Null supplier should be accepted
31+
IngestionEngineFactory factory = new IngestionEngineFactory(consumerFactory, null);
32+
assertNotNull(factory);
33+
}
34+
}

0 commit comments

Comments
 (0)