Skip to content

Commit caab8d3

Browse files
committed
Merge branch '3.5.x'
2 parents 1458b59 + 4b320a7 commit caab8d3

File tree

1 file changed

+52
-91
lines changed
  • framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain

1 file changed

+52
-91
lines changed

framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java

Lines changed: 52 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,10 @@ void test_map_with_preserved_order() {
149149
List<Integer> result = new ArrayList<>();
150150
AtomicReference<String> sessionId = new AtomicReference<>();
151151
//应该乱序
152-
ProcessFlow<Integer> flow = Flows.<Integer>create(repo, messenger, locks).id("start")
153-
.map(i -> {
154-
SleepUtil.sleep((6 - i) * 20L);
155-
return i * 10;
156-
})
157-
.map(i -> i + 10)
158-
.close(r -> result.add(r.get().getData()))
159-
.onComplete(id -> sessionId.set(id));
152+
ProcessFlow<Integer> flow = Flows.<Integer>create(repo, messenger, locks).id("start").map(i -> {
153+
SleepUtil.sleep((6 - i) * 20L);
154+
return i * 10;
155+
}).map(i -> i + 10).close(r -> result.add(r.get().getData())).onComplete(sessionId::set);
160156
FlowSession session = new FlowSession(true);
161157
Window window = session.begin();
162158
for (int i = 1; i < 6; i++) {
@@ -166,7 +162,7 @@ void test_map_with_preserved_order() {
166162
SleepUtil.sleep(100);
167163
window.complete();
168164

169-
FlowsTestUtil.waitUntil(() -> result.size() == 5, 10000);
165+
FlowsTestUtil.waitUntil(() -> sessionId.get() != null, 10000);
170166
assertEquals(session.getId(), sessionId.get());
171167
assertEquals(20, result.get(0));
172168
assertEquals(30, result.get(1));
@@ -367,7 +363,7 @@ void test_reduce_with_window_and_keyBy_complete_event() {
367363
.reduce(() -> ObjectUtils.<Tuple<String, Integer>>cast(Tuple.from("", 0)),
368364
(acc, data) -> Tuple.from(data.first(), acc.second() + data.second().second()))
369365
.just(data -> {
370-
reduced.add((Tuple<String, Integer>)data);
366+
reduced.add((Tuple<String, Integer>) data);
371367
})
372368
.close();
373369
FlowSession session = new FlowSession();
@@ -448,34 +444,23 @@ void test_nested_reduce() {
448444
@Test
449445
void test_buffer() {
450446
AtomicInteger counter = new AtomicInteger();
451-
Flows.<Integer>create(repo, messenger, locks)
452-
.window(3)
453-
.buffer()
454-
.flatMap(i -> {
455-
return Flows.flux(i.toArray(new Integer[0]));
456-
})
457-
.just(value -> counter.set(counter.get() + 1))
458-
.close()
459-
.offer(new Integer[] {1, 2, 3, 4, 5, 6});
447+
Flows.<Integer>create(repo, messenger, locks).window(3).buffer().flatMap(i -> {
448+
return Flows.flux(i.toArray(new Integer[0]));
449+
}).just(value -> counter.set(counter.get() + 1)).close().offer(new Integer[] {1, 2, 3, 4, 5, 6});
460450
FlowsTestUtil.waitUntil(() -> counter.get() == 6);
461451
}
462452

463453
@Test
464454
void should_get_one_result_when_reduce_given_multi_flatmap_data() {
465455
AtomicInteger counter = new AtomicInteger();
466-
Flows.<Integer>create(repo, messenger, locks)
467-
.flatMap(i -> {
468-
return Flows.flux(i * 10, i * 10 + 1);
469-
})
470-
.reduce(() -> 0, (acc, value) -> {
471-
System.out.println("reduce value=" + value + ", acc=" + (acc + value));
472-
return acc + value;
473-
})
474-
.just(value -> {
475-
System.out.println("value=" + value);
476-
})
477-
.close(r -> counter.set(r.get().getData()))
478-
.offer(new Integer[] {1, 2});
456+
Flows.<Integer>create(repo, messenger, locks).flatMap(i -> {
457+
return Flows.flux(i * 10, i * 10 + 1);
458+
}).reduce(() -> 0, (acc, value) -> {
459+
System.out.println("reduce value=" + value + ", acc=" + (acc + value));
460+
return acc + value;
461+
}).just(value -> {
462+
System.out.println("value=" + value);
463+
}).close(r -> counter.set(r.get().getData())).offer(new Integer[] {1, 2});
479464
FlowsTestUtil.waitUntil(() -> counter.get() != 0);
480465

481466
Assertions.assertEquals(62, counter.get());
@@ -971,21 +956,14 @@ void test_mermaid() {
971956
.fork(p -> p.map(i -> i * 2))
972957
.join(() -> "", (acc, i) -> acc + i.toString())
973958
.close();
974-
assertEquals("start((Start))"
975-
+ System.lineSeparator() + "start-->node0(map)"
976-
+ System.lineSeparator() + "node9-->node3"
977-
+ System.lineSeparator() + "node8-->node6"
978-
+ System.lineSeparator() + "node6-->end7((End))"
979-
+ System.lineSeparator() + "node5-->node6([+])"
980-
+ System.lineSeparator() + "node4-->node8(map)"
981-
+ System.lineSeparator() + "node4-->node5(map)"
982-
+ System.lineSeparator() + "node3-->node4{{=}}"
983-
+ System.lineSeparator() + "node2-->node3([+])"
984-
+ System.lineSeparator() + "node10-->node3"
985-
+ System.lineSeparator() + "node1-->node9(map)"
986-
+ System.lineSeparator() + "node1-->node2(map)"
987-
+ System.lineSeparator() + "node1-->node10(map)"
988-
+ System.lineSeparator() + "node0-->node1{?}", new Mermaid(flow).get());
959+
assertEquals("start((Start))" + System.lineSeparator() + "start-->node0(map)" + System.lineSeparator()
960+
+ "node9-->node3" + System.lineSeparator() + "node8-->node6" + System.lineSeparator()
961+
+ "node6-->end7((End))" + System.lineSeparator() + "node5-->node6([+])" + System.lineSeparator()
962+
+ "node4-->node8(map)" + System.lineSeparator() + "node4-->node5(map)" + System.lineSeparator()
963+
+ "node3-->node4{{=}}" + System.lineSeparator() + "node2-->node3([+])" + System.lineSeparator()
964+
+ "node10-->node3" + System.lineSeparator() + "node1-->node9(map)" + System.lineSeparator()
965+
+ "node1-->node2(map)" + System.lineSeparator() + "node1-->node10(map)" + System.lineSeparator()
966+
+ "node0-->node1{?}", new Mermaid(flow).get());
989967
}
990968

991969
@Test
@@ -1002,8 +980,8 @@ void test_flow_flat_map() {
1002980
assertEquals(4, result.size());
1003981
}
1004982

1005-
private <T> Supplier<List<FlowContext<T>>> contextSupplier(FlowContextRepo repo, String traceId,
1006-
String metaId, FlowNodeStatus status) {
983+
private <T> Supplier<List<FlowContext<T>>> contextSupplier(FlowContextRepo repo, String traceId, String metaId,
984+
FlowNodeStatus status) {
1007985
return () -> {
1008986
List<FlowContext<T>> all = repo.getContextsByTrace(traceId);
1009987
return all.stream()
@@ -1018,15 +996,13 @@ private <T> Supplier<List<FlowContext<T>>> contextSupplier(FlowContextRepo repo,
1018996
void testCleanErrorContext() {
1019997
long[] data = {1};
1020998
FlowContextMemoRepo testRepo = new FlowContextMemoRepo(false);
1021-
ProcessFlow<Integer> flowTest = Flows.<Integer>create(testRepo, messenger, locks)
1022-
.id("flow test start node")
1023-
.map(i -> {
999+
ProcessFlow<Integer> flowTest =
1000+
Flows.<Integer>create(testRepo, messenger, locks).id("flow test start node").map(i -> {
10241001
if (i == 1) {
10251002
throw new FlowTestException();
10261003
}
10271004
return i;
1028-
})
1029-
.close(r -> data[0] = r.get().getData());
1005+
}).close(r -> data[0] = r.get().getData());
10301006
flowTest.setId("flow test");
10311007
String traceId = flowTest.offer(1);
10321008

@@ -1038,18 +1014,13 @@ void testCleanErrorContext() {
10381014
@Test
10391015
void test_flow_flat_map_with_reduce_under_cold_stream() {
10401016
List<Integer> result = new ArrayList<>();
1041-
ProcessFlow<Integer> flow = Flows.<Integer>create()
1042-
.flatMap(num -> {
1043-
Integer[] maps = new Integer[num];
1044-
for (int i = 0; i < num; i++) {
1045-
maps[i] = i * 10;
1046-
}
1047-
return Flows.flux(maps);
1048-
})
1049-
.map(i -> i * 10)
1050-
.reduce(() -> 0, Integer::sum)
1051-
.just(i -> result.add(i))
1052-
.close();
1017+
ProcessFlow<Integer> flow = Flows.<Integer>create().flatMap(num -> {
1018+
Integer[] maps = new Integer[num];
1019+
for (int i = 0; i < num; i++) {
1020+
maps[i] = i * 10;
1021+
}
1022+
return Flows.flux(maps);
1023+
}).map(i -> i * 10).reduce(() -> 0, Integer::sum).just(i -> result.add(i)).close();
10531024
flow.offer(new Integer[] {2, 3});
10541025
FlowsTestUtil.waitUntil(() -> result.size() == 1);
10551026
assertEquals(1, result.size());
@@ -1059,18 +1030,13 @@ void test_flow_flat_map_with_reduce_under_cold_stream() {
10591030
@Test
10601031
void test_flow_flat_map_with_reduce_under_hot_stream() throws InterruptedException {
10611032
List<Integer> result = new ArrayList<>();
1062-
ProcessFlow<Integer> flow = Flows.<Integer>create()
1063-
.flatMap(num -> {
1064-
Integer[] maps = new Integer[num];
1065-
for (int i = 0; i < num; i++) {
1066-
maps[i] = i * 10;
1067-
}
1068-
return Flows.flux(maps);
1069-
})
1070-
.map(i -> i * 10)
1071-
.reduce(() -> 0, Integer::sum)
1072-
.just(i -> result.add(i))
1073-
.close();
1033+
ProcessFlow<Integer> flow = Flows.<Integer>create().flatMap(num -> {
1034+
Integer[] maps = new Integer[num];
1035+
for (int i = 0; i < num; i++) {
1036+
maps[i] = i * 10;
1037+
}
1038+
return Flows.flux(maps);
1039+
}).map(i -> i * 10).reduce(() -> 0, Integer::sum).just(i -> result.add(i)).close();
10741040
FlowSession session = new FlowSession();
10751041
Window window = session.begin();
10761042
flow.offer(2, session);
@@ -1089,19 +1055,14 @@ void test_flatMap_under_preserved_order() throws InterruptedException {
10891055
StringBuilder result = new StringBuilder();
10901056
final int count = 4;
10911057
final int flatmapSize = 2;
1092-
ProcessFlow<Integer> flow = Flows.<Integer>create(repo, messenger, locks)
1093-
.flatMap(num -> {
1094-
SleepUtil.sleep((count - num) * 20L);
1095-
String[] maps = new String[flatmapSize];
1096-
for (int i = 0; i < flatmapSize; i++) {
1097-
maps[i] = num + "-" + i;
1098-
}
1099-
return Flows.flux(maps);
1100-
})
1101-
.map(s -> "|" + s)
1102-
.reduce((acc, i) -> acc + i)
1103-
.map(i -> result.append(i.substring(1)))
1104-
.close();
1058+
ProcessFlow<Integer> flow = Flows.<Integer>create(repo, messenger, locks).flatMap(num -> {
1059+
SleepUtil.sleep((count - num) * 20L);
1060+
String[] maps = new String[flatmapSize];
1061+
for (int i = 0; i < flatmapSize; i++) {
1062+
maps[i] = num + "-" + i;
1063+
}
1064+
return Flows.flux(maps);
1065+
}).map(s -> "|" + s).reduce((acc, i) -> acc + i).map(i -> result.append(i.substring(1))).close();
11051066
FlowSession session = new FlowSession(true);
11061067
Window window = session.begin();
11071068
for (int i = 0; i < count; i++) {

0 commit comments

Comments
 (0)