1515 */
1616package io .serverlessworkflow .impl ;
1717
18+ import static io .serverlessworkflow .impl .WorkflowUtils .*;
1819import static io .serverlessworkflow .impl .json .JsonUtils .*;
1920
2021import com .fasterxml .jackson .databind .JsonNode ;
22+ import io .serverlessworkflow .api .types .Input ;
23+ import io .serverlessworkflow .api .types .Output ;
2124import io .serverlessworkflow .api .types .TaskBase ;
2225import io .serverlessworkflow .api .types .TaskItem ;
2326import io .serverlessworkflow .api .types .Workflow ;
2427import io .serverlessworkflow .impl .executors .DefaultTaskExecutorFactory ;
2528import io .serverlessworkflow .impl .executors .TaskExecutor ;
2629import io .serverlessworkflow .impl .executors .TaskExecutorFactory ;
30+ import io .serverlessworkflow .impl .expressions .ExpressionFactory ;
31+ import io .serverlessworkflow .impl .expressions .JQExpressionFactory ;
2732import io .serverlessworkflow .impl .json .JsonUtils ;
33+ import io .serverlessworkflow .impl .jsonschema .DefaultSchemaValidatorFactory ;
34+ import io .serverlessworkflow .impl .jsonschema .SchemaValidator ;
35+ import io .serverlessworkflow .impl .jsonschema .SchemaValidatorFactory ;
36+ import io .serverlessworkflow .resources .DefaultResourceLoaderFactory ;
37+ import io .serverlessworkflow .resources .ResourceLoaderFactory ;
38+ import java .nio .file .Path ;
2839import java .util .Collection ;
2940import java .util .Collections ;
3041import java .util .HashSet ;
3142import java .util .List ;
3243import java .util .Map ;
44+ import java .util .Optional ;
3345import java .util .concurrent .ConcurrentHashMap ;
3446
3547public class WorkflowDefinition {
3648
3749 private WorkflowDefinition (
3850 Workflow workflow ,
39- TaskExecutorFactory taskFactory ,
40- Collection < WorkflowExecutionListener > listeners ) {
51+ Collection < WorkflowExecutionListener > listeners ,
52+ WorkflowFactories factories ) {
4153 this .workflow = workflow ;
42- this .taskFactory = taskFactory ;
4354 this .listeners = listeners ;
55+ this .factories = factories ;
56+ if (workflow .getInput () != null ) {
57+ Input input = workflow .getInput ();
58+ this .inputSchemaValidator =
59+ getSchemaValidator (
60+ factories .getValidatorFactory (), schemaToNode (factories , input .getSchema ()));
61+ this .inputFilter = buildWorkflowFilter (factories .getExpressionFactory (), input .getFrom ());
62+ }
63+ if (workflow .getOutput () != null ) {
64+ Output output = workflow .getOutput ();
65+ this .outputSchemaValidator =
66+ getSchemaValidator (
67+ factories .getValidatorFactory (), schemaToNode (factories , output .getSchema ()));
68+ this .outputFilter = buildWorkflowFilter (factories .getExpressionFactory (), output .getAs ());
69+ }
4470 }
4571
4672 private final Workflow workflow ;
4773 private final Collection <WorkflowExecutionListener > listeners ;
48- private final TaskExecutorFactory taskFactory ;
74+ private final WorkflowFactories factories ;
75+ private Optional <SchemaValidator > inputSchemaValidator = Optional .empty ();
76+ private Optional <SchemaValidator > outputSchemaValidator = Optional .empty ();
77+ private Optional <WorkflowFilter > inputFilter = Optional .empty ();
78+ private Optional <WorkflowFilter > outputFilter = Optional .empty ();
79+
4980 private final Map <String , TaskExecutor <? extends TaskBase >> taskExecutors =
5081 new ConcurrentHashMap <>();
5182
5283 public static class Builder {
5384 private final Workflow workflow ;
5485 private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory .get ();
86+ private ExpressionFactory exprFactory = JQExpressionFactory .get ();
5587 private Collection <WorkflowExecutionListener > listeners ;
88+ private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory .get ();
89+ private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory .get ();
90+ private Path path ;
5691
5792 private Builder (Workflow workflow ) {
5893 this .workflow = workflow ;
@@ -71,13 +106,39 @@ public Builder withTaskExecutorFactory(TaskExecutorFactory factory) {
71106 return this ;
72107 }
73108
109+ public Builder withExpressionFactory (ExpressionFactory factory ) {
110+ this .exprFactory = factory ;
111+ return this ;
112+ }
113+
114+ public Builder withPath (Path path ) {
115+ this .path = path ;
116+ return this ;
117+ }
118+
119+ public Builder withResourceLoaderFactory (ResourceLoaderFactory resourceLoader ) {
120+ this .resourceLoaderFactory = resourceLoader ;
121+ return this ;
122+ }
123+
124+ public Builder withSchemaValidatorFactory (SchemaValidatorFactory factory ) {
125+ this .schemaValidatorFactory = factory ;
126+ return this ;
127+ }
128+
74129 public WorkflowDefinition build () {
75- return new WorkflowDefinition (
76- workflow ,
77- taskFactory ,
78- listeners == null
79- ? Collections .emptySet ()
80- : Collections .unmodifiableCollection (listeners ));
130+ WorkflowDefinition def =
131+ new WorkflowDefinition (
132+ workflow ,
133+ listeners == null
134+ ? Collections .emptySet ()
135+ : Collections .unmodifiableCollection (listeners ),
136+ new WorkflowFactories (
137+ taskFactory ,
138+ resourceLoaderFactory .getResourceLoader (path ),
139+ exprFactory ,
140+ schemaValidatorFactory ));
141+ return def ;
81142 }
82143 }
83144
@@ -86,7 +147,7 @@ public static Builder builder(Workflow workflow) {
86147 }
87148
88149 public WorkflowInstance execute (Object input ) {
89- return new WorkflowInstance (taskFactory , JsonUtils .fromValue (input ));
150+ return new WorkflowInstance (JsonUtils .fromValue (input ));
90151 }
91152
92153 enum State {
@@ -101,11 +162,15 @@ public class WorkflowInstance {
101162 private State state ;
102163 private WorkflowContext context ;
103164
104- private WorkflowInstance (TaskExecutorFactory factory , JsonNode input ) {
165+ private WorkflowInstance (JsonNode input ) {
105166 this .output = input ;
106- this . state = State . STARTED ;
167+ inputSchemaValidator . ifPresent ( v -> v . validate ( input )) ;
107168 this .context = WorkflowContext .builder (input ).build ();
169+ inputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
170+ this .state = State .STARTED ;
108171 processDo (workflow .getDo ());
172+ outputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
173+ outputSchemaValidator .ifPresent (v -> v .validate (output ));
109174 }
110175
111176 private void processDo (List <TaskItem > tasks ) {
@@ -118,7 +183,7 @@ private void processDo(List<TaskItem> tasks) {
118183 taskExecutors
119184 .computeIfAbsent (
120185 context .position ().jsonPointer (),
121- k -> taskFactory . getTaskExecutor (task .getTask ()))
186+ k -> factories . getTaskFactory (). getTaskExecutor (task .getTask (), factories ))
122187 .apply (context , output );
123188 listeners .forEach (l -> l .onTaskEnded (context .position (), task .getTask ()));
124189 context .position ().back ().back ();
0 commit comments