Skip to content

Commit e8e349f

Browse files
committed
provide ProcessorBridge.Factory.AbstractExternal
1 parent 99948ba commit e8e349f

File tree

2 files changed

+66
-12
lines changed

2 files changed

+66
-12
lines changed

libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/PipelineBridge.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
*/
99
package org.elasticsearch.logstashbridge.ingest;
1010

11-
import org.elasticsearch.core.FixForMultiProject;
11+
import org.elasticsearch.cluster.metadata.ProjectId;
1212
import org.elasticsearch.ingest.Pipeline;
1313
import org.elasticsearch.logstashbridge.StableBridgeAPI;
1414
import org.elasticsearch.logstashbridge.script.ScriptServiceBridge;
@@ -24,7 +24,6 @@ public static PipelineBridge fromInternal(final Pipeline pipeline) {
2424
return new PipelineBridge(pipeline);
2525
}
2626

27-
@FixForMultiProject(description = "should we pass a non-null project ID here?")
2827
public static PipelineBridge create(
2928
String id,
3029
Map<String, Object> config,
@@ -37,7 +36,7 @@ public static PipelineBridge create(
3736
config,
3837
StableBridgeAPI.toInternal(processorFactories),
3938
StableBridgeAPI.toInternalNullable(scriptServiceBridge),
40-
null
39+
ProjectId.DEFAULT
4140
)
4241
);
4342
}

libs/logstash-bridge/src/main/java/org/elasticsearch/logstashbridge/ingest/ProcessorBridge.java

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -178,14 +178,28 @@ public Processor.Parameters toInternal() {
178178
* An external bridge for {@link Processor.Factory}
179179
*/
180180
interface Factory extends StableBridgeAPI<Processor.Factory> {
181-
ProcessorBridge create(
181+
@Deprecated // supply ProjectId
182+
default ProcessorBridge create(
182183
Map<String, ProcessorBridge.Factory> registry,
183184
String processorTag,
184185
String description,
185186
Map<String, Object> config
187+
) throws Exception {
188+
return this.create(registry, processorTag, description, config, ProjectId.DEFAULT);
189+
}
190+
191+
ProcessorBridge create(
192+
Map<String, ProcessorBridge.Factory> registry,
193+
String processorTag,
194+
String description,
195+
Map<String, Object> config,
196+
ProjectId projectId
186197
) throws Exception;
187198

188199
static Factory fromInternal(final Processor.Factory delegate) {
200+
if (delegate instanceof AbstractExternal.InternalProxy internalProxy) {
201+
return internalProxy.toExternal();
202+
}
189203
return new ProxyInternal(delegate);
190204
}
191205

@@ -203,17 +217,12 @@ public ProcessorBridge create(
203217
final Map<String, Factory> registry,
204218
final String processorTag,
205219
final String description,
206-
final Map<String, Object> config
220+
final Map<String, Object> config,
221+
final ProjectId projectId
207222
) throws Exception {
208223
final Map<String, Processor.Factory> internalRegistry = StableBridgeAPI.toInternal(registry);
209224
final Processor.Factory internalFactory = toInternal();
210-
final Processor internalProcessor = internalFactory.create(
211-
internalRegistry,
212-
processorTag,
213-
description,
214-
config,
215-
ProjectId.DEFAULT
216-
);
225+
final Processor internalProcessor = internalFactory.create(internalRegistry, processorTag, description, config, projectId);
217226
return ProcessorBridge.fromInternal(internalProcessor);
218227
}
219228

@@ -222,6 +231,52 @@ public Processor.Factory toInternal() {
222231
return this.internalDelegate;
223232
}
224233
}
234+
235+
/**
236+
* The {@code ProcessorBridge.Factory.AbstractExternal} is an abstract base class for implementing
237+
* the {@link ProcessorBridge.Factory} externally to the Elasticsearch code-base. It takes care of
238+
* the details of maintaining a singular internal-form implementation of {@link Processor.Factory}
239+
* that proxies calls to the external implementation.
240+
*/
241+
abstract class AbstractExternal implements Factory {
242+
InternalProxy internalDelegate;
243+
244+
@Override
245+
public Processor.Factory toInternal() {
246+
if (this.internalDelegate == null) {
247+
internalDelegate = new InternalProxy();
248+
}
249+
return this.internalDelegate;
250+
}
251+
252+
private class InternalProxy implements Processor.Factory {
253+
@Override
254+
public Processor create(
255+
final Map<String, Processor.Factory> processorFactories,
256+
final String tag,
257+
final String description,
258+
final Map<String, Object> config,
259+
final ProjectId projectId
260+
) throws Exception {
261+
final Map<String, ProcessorBridge.Factory> bridgedProcessorFactories = StableBridgeAPI.fromInternal(
262+
processorFactories,
263+
ProcessorBridge.Factory.ProxyInternal::new
264+
);
265+
final ProcessorBridge bridgedProcessor = AbstractExternal.this.create(
266+
bridgedProcessorFactories,
267+
tag,
268+
description,
269+
config,
270+
projectId
271+
);
272+
return bridgedProcessor.toInternal();
273+
}
274+
275+
ProcessorBridge.Factory toExternal() {
276+
return AbstractExternal.this;
277+
}
278+
}
279+
}
225280
}
226281

227282
}

0 commit comments

Comments
 (0)