1515 */
1616package io .serverlessworkflow .impl ;
1717
18+ import static io .serverlessworkflow .impl .WorkflowUtils .*;
1819import static io .serverlessworkflow .impl .json .JsonUtils .*;
1920
2021import com .fasterxml .jackson .databind .JsonNode ;
22+ import io .serverlessworkflow .api .types .Input ;
23+ import io .serverlessworkflow .api .types .Output ;
2124import io .serverlessworkflow .api .types .TaskBase ;
2225import io .serverlessworkflow .api .types .TaskItem ;
2326import io .serverlessworkflow .api .types .Workflow ;
2427import io .serverlessworkflow .impl .executors .DefaultTaskExecutorFactory ;
2528import io .serverlessworkflow .impl .executors .TaskExecutor ;
2629import io .serverlessworkflow .impl .executors .TaskExecutorFactory ;
30+ import io .serverlessworkflow .impl .expressions .ExpressionFactory ;
31+ import io .serverlessworkflow .impl .expressions .JQExpressionFactory ;
2732import io .serverlessworkflow .impl .json .JsonUtils ;
33+ import io .serverlessworkflow .impl .jsonschema .DefaultSchemaValidatorFactory ;
34+ import io .serverlessworkflow .impl .jsonschema .SchemaValidator ;
35+ import io .serverlessworkflow .impl .jsonschema .SchemaValidatorFactory ;
36+ import io .serverlessworkflow .resources .DefaultResourceLoader ;
37+ import io .serverlessworkflow .resources .ResourceLoader ;
2838import java .util .Collection ;
2939import java .util .Collections ;
3040import java .util .HashSet ;
3141import java .util .List ;
3242import java .util .Map ;
43+ import java .util .Optional ;
3344import java .util .concurrent .ConcurrentHashMap ;
3445
3546public class WorkflowDefinition {
3647
3748 private WorkflowDefinition (
3849 Workflow workflow ,
3950 TaskExecutorFactory taskFactory ,
40- Collection <WorkflowExecutionListener > listeners ) {
51+ Collection <WorkflowExecutionListener > listeners ,
52+ WorkflowFactories factories ) {
4153 this .workflow = workflow ;
4254 this .taskFactory = taskFactory ;
4355 this .listeners = listeners ;
56+ this .factories = factories ;
57+ if (workflow .getInput () != null ) {
58+ Input input = workflow .getInput ();
59+ this .inputSchemaValidator =
60+ getSchemaValidator (
61+ factories .getValidatorFactory (), schemaToNode (factories , input .getSchema ()));
62+ this .inputFilter = buildWorkflowFilter (factories .getExpressionFactory (), input .getFrom ());
63+ }
64+ if (workflow .getOutput () != null ) {
65+ Output output = workflow .getOutput ();
66+ this .outputSchemaValidator =
67+ getSchemaValidator (
68+ factories .getValidatorFactory (), schemaToNode (factories , output .getSchema ()));
69+ this .outputFilter = buildWorkflowFilter (factories .getExpressionFactory (), output .getAs ());
70+ }
4471 }
4572
4673 private final Workflow workflow ;
4774 private final Collection <WorkflowExecutionListener > listeners ;
4875 private final TaskExecutorFactory taskFactory ;
76+ private final WorkflowFactories factories ;
77+ private Optional <SchemaValidator > inputSchemaValidator = Optional .empty ();
78+ private Optional <SchemaValidator > outputSchemaValidator = Optional .empty ();
79+ private Optional <WorkflowFilter > inputFilter = Optional .empty ();
80+ private Optional <WorkflowFilter > outputFilter = Optional .empty ();
81+
4982 private final Map <String , TaskExecutor <? extends TaskBase >> taskExecutors =
5083 new ConcurrentHashMap <>();
5184
5285 public static class Builder {
5386 private final Workflow workflow ;
5487 private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory .get ();
88+ private ExpressionFactory exprFactory = JQExpressionFactory .get ();
5589 private Collection <WorkflowExecutionListener > listeners ;
90+ private ResourceLoader resourceLoader = DefaultResourceLoader .get ();
91+ private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory .get ();
5692
5793 private Builder (Workflow workflow ) {
5894 this .workflow = workflow ;
@@ -71,13 +107,29 @@ public Builder withTaskExecutorFactory(TaskExecutorFactory factory) {
71107 return this ;
72108 }
73109
110+ public Builder withExpressionFactory (ExpressionFactory factory ) {
111+ this .exprFactory = factory ;
112+ return this ;
113+ }
114+
115+ public Builder withResourceLoader (ResourceLoader resourceLoader ) {
116+ this .resourceLoader = resourceLoader ;
117+ return this ;
118+ }
119+
120+ public Builder withSchemaValidatorFactory (SchemaValidatorFactory factory ) {
121+ this .schemaValidatorFactory = factory ;
122+ return this ;
123+ }
124+
74125 public WorkflowDefinition build () {
75126 return new WorkflowDefinition (
76127 workflow ,
77128 taskFactory ,
78129 listeners == null
79130 ? Collections .emptySet ()
80- : Collections .unmodifiableCollection (listeners ));
131+ : Collections .unmodifiableCollection (listeners ),
132+ new WorkflowFactories (resourceLoader , exprFactory , schemaValidatorFactory ));
81133 }
82134 }
83135
@@ -103,9 +155,13 @@ public class WorkflowInstance {
103155
104156 private WorkflowInstance (TaskExecutorFactory factory , JsonNode input ) {
105157 this .output = input ;
106- this . state = State . STARTED ;
158+ inputSchemaValidator . ifPresent ( v -> v . validate ( input )) ;
107159 this .context = WorkflowContext .builder (input ).build ();
160+ inputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
161+ this .state = State .STARTED ;
108162 processDo (workflow .getDo ());
163+ outputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
164+ outputSchemaValidator .ifPresent (v -> v .validate (output ));
109165 }
110166
111167 private void processDo (List <TaskItem > tasks ) {
@@ -118,7 +174,7 @@ private void processDo(List<TaskItem> tasks) {
118174 taskExecutors
119175 .computeIfAbsent (
120176 context .position ().jsonPointer (),
121- k -> taskFactory .getTaskExecutor (task .getTask ()))
177+ k -> taskFactory .getTaskExecutor (task .getTask (), factories ))
122178 .apply (context , output );
123179 listeners .forEach (l -> l .onTaskEnded (context .position (), task .getTask ()));
124180 context .position ().back ().back ();
0 commit comments