Skip to content

Commit c518b9c

Browse files
authored
[Fix #1022] Implementing default catalog (#1079)
Signed-off-by: fjtirado <[email protected]>
1 parent fca5896 commit c518b9c

File tree

4 files changed

+67
-25
lines changed

4 files changed

+67
-25
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
4242
import io.serverlessworkflow.impl.schema.SchemaValidator;
4343
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
44+
import java.net.URI;
4445
import java.util.ArrayList;
4546
import java.util.Collection;
4647
import java.util.Collections;
@@ -78,6 +79,7 @@ public class WorkflowApplication implements AutoCloseable {
7879
private final SchedulerListener schedulerListener;
7980
private final Optional<URITemplateResolver> templateResolver;
8081
private final Optional<FunctionReader> functionReader;
82+
private final URI defaultCatalogURI;
8183

8284
private WorkflowApplication(Builder builder) {
8385
this.taskFactory = builder.taskFactory;
@@ -102,6 +104,7 @@ private WorkflowApplication(Builder builder) {
102104
this.secretManager = builder.secretManager;
103105
this.templateResolver = builder.templateResolver;
104106
this.functionReader = builder.functionReader;
107+
this.defaultCatalogURI = builder.defaultCatalogURI;
105108
}
106109

107110
public TaskExecutorFactory taskFactory() {
@@ -184,6 +187,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
184187
private SchedulerListener schedulerListener;
185188
private Optional<URITemplateResolver> templateResolver;
186189
private Optional<FunctionReader> functionReader;
190+
private URI defaultCatalogURI;
187191

188192
private Builder() {
189193
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
@@ -281,6 +285,15 @@ public Builder withContextFactory(WorkflowModelFactory contextFactory) {
281285
return this;
282286
}
283287

288+
public Builder withDefaultCatalogURI(String defaultCatalogURI) {
289+
return withDefaultCatalogURI(URI.create(defaultCatalogURI));
290+
}
291+
292+
public Builder withDefaultCatalogURI(URI defaultCatalogURI) {
293+
this.defaultCatalogURI = defaultCatalogURI;
294+
return this;
295+
}
296+
284297
public WorkflowApplication build() {
285298
if (modelFactory == null) {
286299
modelFactory =
@@ -344,6 +357,9 @@ public WorkflowApplication build() {
344357
}
345358
templateResolver = ServiceLoader.load(URITemplateResolver.class).findFirst();
346359
functionReader = ServiceLoader.load(FunctionReader.class).findFirst();
360+
if (defaultCatalogURI == null) {
361+
defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog");
362+
}
347363
return new WorkflowApplication(this);
348364
}
349365
}
@@ -429,6 +445,10 @@ public Optional<FunctionReader> functionReader() {
429445
return functionReader;
430446
}
431447

448+
public URI defaultCatalogURI() {
449+
return defaultCatalogURI;
450+
}
451+
432452
public <T> Optional<T> additionalObject(
433453
String name, WorkflowContext workflowContext, TaskContext taskContext) {
434454
return Optional.ofNullable(additionalObjects.get(name))

impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallFunctionExecutorBuilder.java

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import io.serverlessworkflow.api.types.Task;
2222
import io.serverlessworkflow.api.types.TaskBase;
2323
import io.serverlessworkflow.api.types.Use;
24-
import io.serverlessworkflow.api.types.UseCatalogs;
25-
import io.serverlessworkflow.api.types.UseFunctions;
2624
import io.serverlessworkflow.impl.WorkflowDefinition;
2725
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2826
import io.serverlessworkflow.impl.WorkflowUtils;
@@ -43,33 +41,43 @@ public void init(
4341
CallFunction task, WorkflowDefinition definition, WorkflowMutablePosition position) {
4442
String functionName = task.getCall();
4543
Use use = definition.workflow().getUse();
44+
int indexOf = functionName.indexOf('@');
4645
Task function = null;
47-
if (use != null) {
48-
UseFunctions functions = use.getFunctions();
49-
if (functions != null) {
50-
function = functions.getAdditionalProperties().get(functionName);
51-
}
52-
if (function == null) {
53-
int indexOf = functionName.indexOf('@');
54-
if (indexOf > 0) {
55-
String catalogName = functionName.substring(indexOf + 1);
56-
UseCatalogs catalogs = use.getCatalogs();
57-
if (catalogs != null) {
58-
Catalog catalog = catalogs.getAdditionalProperties().get(catalogName);
59-
ResourceLoader loader = definition.resourceLoader();
60-
function =
61-
definition
62-
.resourceLoader()
63-
.loadURI(
64-
WorkflowUtils.concatURI(
65-
loader.uri(catalog.getEndpoint()),
66-
pathFromFunctionName(functionName.substring(0, indexOf))),
67-
h -> from(definition, h));
68-
}
46+
if (indexOf > 0) {
47+
// Catalog function
48+
URI catalogEndpoint;
49+
String catalogName = functionName.substring(indexOf + 1);
50+
ResourceLoader loader = definition.resourceLoader();
51+
if (catalogName.equalsIgnoreCase("default")) {
52+
catalogEndpoint = definition.application().defaultCatalogURI();
53+
} else {
54+
if (use == null || use.getCatalogs() == null) {
55+
throw new IllegalStateException(
56+
"Using catalog " + catalogName + ", but there is not catalog definition");
57+
}
58+
Catalog catalog = use.getCatalogs().getAdditionalProperties().get(catalogName);
59+
if (catalog == null) {
60+
throw new IllegalStateException(
61+
"Catalog "
62+
+ catalogName
63+
+ " is not included in Catalog dictionary: "
64+
+ use.getCatalogs().getAdditionalProperties());
6965
}
66+
catalogEndpoint = loader.uri(catalog.getEndpoint());
7067
}
68+
function =
69+
definition
70+
.resourceLoader()
71+
.loadURI(
72+
WorkflowUtils.concatURI(
73+
catalogEndpoint, pathFromFunctionName(functionName.substring(0, indexOf))),
74+
h -> from(definition, h));
75+
} else if (use != null && use.getFunctions() != null) {
76+
// search for inline function definition
77+
function = use.getFunctions().getAdditionalProperties().get(functionName);
7178
}
7279
if (function == null) {
80+
// try to load function if function name is an uri
7381
function =
7482
definition.resourceLoader().loadURI(URI.create(functionName), h -> from(definition, h));
7583
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/CustomFunctionTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ void testCustomFunction() {
6262
@ValueSource(
6363
strings = {
6464
"workflows-samples/call-custom-function-cataloged.yaml",
65-
"workflows-samples/call-custom-function-cataloged-global.yaml"
65+
"workflows-samples/call-custom-function-cataloged-global.yaml",
66+
"workflows-samples/call-custom-function-cataloged-default.yaml"
6667
})
6768
void testCustomCatalogFunction(String fileName) throws IOException {
6869
assertThat(
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
document:
2+
dsl: '1.0.2'
3+
namespace: test
4+
name: call-custom-function-cataloged-default
5+
version: '0.1.0'
6+
do:
7+
- log:
8+
call: log:1.0.0@default
9+
with:
10+
message: Hello, world!
11+
level: information
12+
timestamp: true
13+
format: '{TIMESTAMP} [{LEVEL}] ({CONTEXT}): {MESSAGE}'

0 commit comments

Comments
 (0)