1818import io .cloudevents .CloudEventData ;
1919import io .serverlessworkflow .api .types .FlowDirectiveEnum ;
2020import io .serverlessworkflow .fluent .func .FuncCallTaskBuilder ;
21- import io .serverlessworkflow .fluent .func .FuncDoTaskBuilder ;
2221import io .serverlessworkflow .fluent .func .FuncEmitTaskBuilder ;
2322import io .serverlessworkflow .fluent .func .FuncSwitchTaskBuilder ;
2423import io .serverlessworkflow .fluent .func .FuncTaskItemListBuilder ;
2827import io .serverlessworkflow .fluent .func .dsl .internal .CommonFuncOps ;
2928import java .util .Collection ;
3029import java .util .List ;
30+ import java .util .Map ;
3131import java .util .Objects ;
3232import java .util .function .Consumer ;
3333import java .util .function .Function ;
@@ -99,50 +99,90 @@ public static <T> Consumer<FuncEmitTaskBuilder> event(
9999 return OPS .event (type , function , clazz );
100100 }
101101
102+ /** Emit a JSON CloudEvent (PojoCloudEventData) from a POJO payload. */
103+ public static <T > Consumer <FuncEmitTaskBuilder > eventJson (String type , Class <T > clazz ) {
104+ return b -> new FuncEmitSpec ().type (type ).jsonData (clazz ).accept (b );
105+ }
106+
107+ public static <T > Consumer <FuncEmitTaskBuilder > eventBytes (
108+ String type , Function <T , byte []> serializer , Class <T > clazz ) {
109+ return b -> new FuncEmitSpec ().type (type ).bytesData (serializer , clazz ).accept (b );
110+ }
111+
112+ public static Consumer <FuncEmitTaskBuilder > eventBytesUtf8 (String type ) {
113+ return b -> new FuncEmitSpec ().type (type ).bytesDataUtf8 ().accept (b );
114+ }
115+
102116 public static FuncPredicateEventConfigurer event (String type ) {
103117 return OPS .event (type );
104118 }
105119
106- public static <T , R > FuncTaskConfigurer function (Function <T , R > fn ) {
120+ public static <T , R > FuncCallStep <T , R > function (Function <T , R > fn , Class <T > clazz ) {
121+ return new FuncCallStep <>(fn , clazz );
122+ }
123+
124+ public static <T , R > FuncCallStep <T , R > function (Function <T , R > fn ) {
107125 Class <T > clazz = ReflectionUtils .inferInputType (fn );
108- return list -> list . callFn ( f -> f . function ( fn , clazz ) );
126+ return new FuncCallStep <>( fn , clazz );
109127 }
110128
111- public static <T , R > FuncTaskConfigurer function (Function <T , R > fn , Class <T > clazz ) {
112- return list -> list .callFn (f -> f .function (fn , clazz ));
129+ public static <T , R > FuncCallStep <T , R > function (String name , Function <T , R > fn ) {
130+ Class <T > clazz = ReflectionUtils .inferInputType (fn );
131+ return new FuncCallStep <>(name , fn , clazz );
113132 }
114133
115- // ------------------ tasks ---------------- //
116- public static Consumer <FuncDoTaskBuilder > doTasks (FuncTaskConfigurer ... steps ) {
117- final Consumer <FuncTaskItemListBuilder > tasks = tasks (steps );
118- return d -> d .tasks (tasks );
134+ public static <T , R > FuncCallStep <T , R > function (String name , Function <T , R > fn , Class <T > clazz ) {
135+ return new FuncCallStep <>(name , fn , clazz );
119136 }
120137
138+ // ------------------ tasks ---------------- //
139+
121140 public static Consumer <FuncTaskItemListBuilder > tasks (FuncTaskConfigurer ... steps ) {
122141 Objects .requireNonNull (steps , "Steps in a tasks are required" );
123142 final List <FuncTaskConfigurer > snapshot = List .of (steps .clone ());
124143 return list -> snapshot .forEach (s -> s .accept (list ));
125144 }
126145
127- public static FuncTaskConfigurer emit (Consumer <FuncEmitTaskBuilder > emitTask ) {
128- return list -> list .emit (emitTask );
146+ public static EmitStep emit (Consumer <FuncEmitTaskBuilder > cfg ) {
147+ return new EmitStep (null , cfg );
148+ }
149+
150+ public static EmitStep emit (String name , Consumer <FuncEmitTaskBuilder > cfg ) {
151+ return new EmitStep (name , cfg );
152+ }
153+
154+ public static <T > EmitStep emit (String type , Function <T , CloudEventData > fn ) {
155+ // `event(type, fn)` is your Consumer<FuncEmitTaskBuilder> for EMIT
156+ return new EmitStep (null , event (type , fn ));
157+ }
158+
159+ public static <T > EmitStep emit (String name , String type , Function <T , CloudEventData > fn ) {
160+ return new EmitStep (name , event (type , fn ));
161+ }
162+
163+ public static <T > EmitStep emit (
164+ String name , String type , Function <T , byte []> serializer , Class <T > clazz ) {
165+ return new EmitStep (name , eventBytes (type , serializer , clazz ));
166+ }
167+
168+ public static <T > EmitStep emit (String type , Function <T , byte []> serializer , Class <T > clazz ) {
169+ return new EmitStep (null , eventBytes (type , serializer , clazz ));
129170 }
130171
131- public static <T > FuncTaskConfigurer emit (String type , Function < T , CloudEventData > fn ) {
132- return list -> list . emit ( event ( type , fn ));
172+ public static <T > EmitStep emitJson (String type , Class < T > clazz ) {
173+ return new EmitStep ( null , eventJson ( type , clazz ));
133174 }
134175
135- public static <T > FuncTaskConfigurer emit (
136- String name , String type , Function <T , CloudEventData > fn ) {
137- return list -> list .emit (name , event (type , fn ));
176+ public static <T > EmitStep emitJson (String name , String type , Class <T > clazz ) {
177+ return new EmitStep (name , eventJson (type , clazz ));
138178 }
139179
140- public static FuncTaskConfigurer listen (FuncListenSpec listen ) {
141- return list -> list . listen ( listen );
180+ public static ListenStep listen (FuncListenSpec spec ) {
181+ return new ListenStep ( null , spec );
142182 }
143183
144- public static FuncTaskConfigurer listen (String name , FuncListenSpec listen ) {
145- return list -> list . listen (name , listen );
184+ public static ListenStep listen (String name , FuncListenSpec spec ) {
185+ return new ListenStep (name , spec );
146186 }
147187
148188 public static FuncTaskConfigurer switchCase (
@@ -176,6 +216,12 @@ public static <T> FuncTaskConfigurer switchWhenOrElse(
176216 list .switchCase (FuncDSL .cases (caseOf (pred ).then (thenTask ), caseDefault (otherwise )));
177217 }
178218
219+ public static <T > FuncTaskConfigurer switchWhenOrElse (
220+ Predicate <T > pred , String thenTask , String otherwiseTask ) {
221+ return list ->
222+ list .switchCase (FuncDSL .cases (caseOf (pred ).then (thenTask ), caseDefault (otherwiseTask )));
223+ }
224+
179225 public static <T > FuncTaskConfigurer forEach (
180226 Function <T , Collection <?>> collection , Consumer <FuncTaskItemListBuilder > body ) {
181227 return list -> list .forEach (j -> j .collection (collection ).tasks (body ));
@@ -192,4 +238,12 @@ public static <T> FuncTaskConfigurer forEach(
192238 List <T > collection , Consumer <FuncTaskItemListBuilder > body ) {
193239 return list -> list .forEach (j -> j .collection (ctx -> collection ).tasks (body ));
194240 }
241+
242+ public static FuncTaskConfigurer set (String expr ) {
243+ return list -> list .set (expr );
244+ }
245+
246+ public static FuncTaskConfigurer set (Map <String , Object > map ) {
247+ return list -> list .set (s -> s .expr (map ));
248+ }
195249}
0 commit comments