1515 */
1616package io .serverlessworkflow .impl ;
1717
18+ import static io .serverlessworkflow .impl .WorkflowUtils .*;
1819import static io .serverlessworkflow .impl .json .JsonUtils .*;
1920
2021import com .fasterxml .jackson .databind .JsonNode ;
22+ import io .serverlessworkflow .api .types .Input ;
23+ import io .serverlessworkflow .api .types .Output ;
2124import io .serverlessworkflow .api .types .TaskBase ;
2225import io .serverlessworkflow .api .types .TaskItem ;
2326import io .serverlessworkflow .api .types .Workflow ;
2427import io .serverlessworkflow .impl .executors .DefaultTaskExecutorFactory ;
2528import io .serverlessworkflow .impl .executors .TaskExecutor ;
2629import io .serverlessworkflow .impl .executors .TaskExecutorFactory ;
30+ import io .serverlessworkflow .impl .expressions .ExpressionFactory ;
31+ import io .serverlessworkflow .impl .expressions .JQExpressionFactory ;
2732import io .serverlessworkflow .impl .json .JsonUtils ;
33+ import io .serverlessworkflow .impl .jsonschema .DefaultSchemaValidatorFactory ;
34+ import io .serverlessworkflow .impl .jsonschema .SchemaValidator ;
35+ import io .serverlessworkflow .impl .jsonschema .SchemaValidatorFactory ;
36+ import io .serverlessworkflow .resources .DefaultResourceLoader ;
37+ import io .serverlessworkflow .resources .ResourceLoader ;
2838import java .util .Collection ;
2939import java .util .Collections ;
3040import java .util .HashSet ;
3141import java .util .List ;
3242import java .util .Map ;
43+ import java .util .Optional ;
3344import java .util .concurrent .ConcurrentHashMap ;
3445
3546public class WorkflowDefinition {
3647
3748 private WorkflowDefinition (
3849 Workflow workflow ,
39- TaskExecutorFactory taskFactory ,
40- Collection < WorkflowExecutionListener > listeners ) {
50+ Collection < WorkflowExecutionListener > listeners ,
51+ WorkflowFactories factories ) {
4152 this .workflow = workflow ;
42- this .taskFactory = taskFactory ;
4353 this .listeners = listeners ;
54+ this .factories = factories ;
55+ if (workflow .getInput () != null ) {
56+ Input input = workflow .getInput ();
57+ this .inputSchemaValidator =
58+ getSchemaValidator (
59+ factories .getValidatorFactory (), schemaToNode (factories , input .getSchema ()));
60+ this .inputFilter = buildWorkflowFilter (factories .getExpressionFactory (), input .getFrom ());
61+ }
62+ if (workflow .getOutput () != null ) {
63+ Output output = workflow .getOutput ();
64+ this .outputSchemaValidator =
65+ getSchemaValidator (
66+ factories .getValidatorFactory (), schemaToNode (factories , output .getSchema ()));
67+ this .outputFilter = buildWorkflowFilter (factories .getExpressionFactory (), output .getAs ());
68+ }
4469 }
4570
4671 private final Workflow workflow ;
4772 private final Collection <WorkflowExecutionListener > listeners ;
48- private final TaskExecutorFactory taskFactory ;
73+ private final WorkflowFactories factories ;
74+ private Optional <SchemaValidator > inputSchemaValidator = Optional .empty ();
75+ private Optional <SchemaValidator > outputSchemaValidator = Optional .empty ();
76+ private Optional <WorkflowFilter > inputFilter = Optional .empty ();
77+ private Optional <WorkflowFilter > outputFilter = Optional .empty ();
78+
4979 private final Map <String , TaskExecutor <? extends TaskBase >> taskExecutors =
5080 new ConcurrentHashMap <>();
5181
5282 public static class Builder {
5383 private final Workflow workflow ;
5484 private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory .get ();
85+ private ExpressionFactory exprFactory = JQExpressionFactory .get ();
5586 private Collection <WorkflowExecutionListener > listeners ;
87+ private ResourceLoader resourceLoader = DefaultResourceLoader .get ();
88+ private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory .get ();
5689
5790 private Builder (Workflow workflow ) {
5891 this .workflow = workflow ;
@@ -71,13 +104,28 @@ public Builder withTaskExecutorFactory(TaskExecutorFactory factory) {
71104 return this ;
72105 }
73106
107+ public Builder withExpressionFactory (ExpressionFactory factory ) {
108+ this .exprFactory = factory ;
109+ return this ;
110+ }
111+
112+ public Builder withResourceLoader (ResourceLoader resourceLoader ) {
113+ this .resourceLoader = resourceLoader ;
114+ return this ;
115+ }
116+
117+ public Builder withSchemaValidatorFactory (SchemaValidatorFactory factory ) {
118+ this .schemaValidatorFactory = factory ;
119+ return this ;
120+ }
121+
74122 public WorkflowDefinition build () {
75123 return new WorkflowDefinition (
76124 workflow ,
77- taskFactory ,
78125 listeners == null
79126 ? Collections .emptySet ()
80- : Collections .unmodifiableCollection (listeners ));
127+ : Collections .unmodifiableCollection (listeners ),
128+ new WorkflowFactories (taskFactory , resourceLoader , exprFactory , schemaValidatorFactory ));
81129 }
82130 }
83131
@@ -86,7 +134,7 @@ public static Builder builder(Workflow workflow) {
86134 }
87135
88136 public WorkflowInstance execute (Object input ) {
89- return new WorkflowInstance (taskFactory , JsonUtils .fromValue (input ));
137+ return new WorkflowInstance (JsonUtils .fromValue (input ));
90138 }
91139
92140 enum State {
@@ -101,11 +149,15 @@ public class WorkflowInstance {
101149 private State state ;
102150 private WorkflowContext context ;
103151
104- private WorkflowInstance (TaskExecutorFactory factory , JsonNode input ) {
152+ private WorkflowInstance (JsonNode input ) {
105153 this .output = input ;
106- this . state = State . STARTED ;
154+ inputSchemaValidator . ifPresent ( v -> v . validate ( input )) ;
107155 this .context = WorkflowContext .builder (input ).build ();
156+ inputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
157+ this .state = State .STARTED ;
108158 processDo (workflow .getDo ());
159+ outputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
160+ outputSchemaValidator .ifPresent (v -> v .validate (output ));
109161 }
110162
111163 private void processDo (List <TaskItem > tasks ) {
@@ -118,7 +170,7 @@ private void processDo(List<TaskItem> tasks) {
118170 taskExecutors
119171 .computeIfAbsent (
120172 context .position ().jsonPointer (),
121- k -> taskFactory . getTaskExecutor (task .getTask ()))
173+ k -> factories . getTaskFactory (). getTaskExecutor (task .getTask (), factories ))
122174 .apply (context , output );
123175 listeners .forEach (l -> l .onTaskEnded (context .position (), task .getTask ()));
124176 context .position ().back ().back ();
0 commit comments