@@ -26,11 +26,11 @@ This experiment evaluates the frameworks' capability to handle complex AI-genera
2626
2727** Performance Metrics:**
2828
29- | Metric | AIFlow | LCEL |
30- | ---| ---| ---|
31- | TTFT | 210ms ± 10ms | 280ms ± 5ms |
32- | Total Latency | 10.9s ± 0.1s | 21.8s ± 0.1s |
33- | Log Delay | 5.4s ± 0.1s | 10.9s ± 0.1s |
29+ | Metric | AIFlow | LCEL | RX+LLM |
30+ | ---| ---| ---| --- |
31+ | TTFT | 210ms ± 10ms | 280ms ± 5ms | 300ms ± 10ms |
32+ | Total Latency | 10.9s ± 0.1s | 21.8s ± 0.1s | 21.8s ± 0.1s |
33+ | Log Delay | 5.4s ± 0.1s | 10.9s ± 0.1s | 11s ± 0.1s |
3434
3535** Code Implementation:**
3636
@@ -207,6 +207,125 @@ asyncio.run(main())
207207print (f " take time: { (time.time() - start) * 1000 :.3f } (ms) " )
208208```
209209
210+ ** RX+LLM:**
211+
212+ ``` java
213+ package modelengine.fel.engine ;
214+
215+ import reactor.core.publisher.Flux ;
216+ import reactor.core.publisher.Mono ;
217+
218+ import java.util.ArrayList ;
219+ import java.util.List ;
220+ import java.util.Map ;
221+ import java.util.concurrent.CountDownLatch ;
222+ import java.util.concurrent.atomic.AtomicReference ;
223+
224+ public class DesensitizeCase {
225+ public static void main (String [] args ) throws InterruptedException {
226+ String userInput = " hi" ;
227+ long startTime = System . currentTimeMillis();
228+ System . out. printf(" start at: %.3f(ms)%n" , (double ) startTime);
229+
230+ Flux<String > result = Mono . just(Map . of(" user_input" , userInput))
231+ .flatMapMany(DesensitizeCase :: mockStreamModel)
232+ .transform(DesensitizeCase :: classic)
233+ .map(DesensitizeCase :: log)
234+ .map(DesensitizeCase :: mockDesensitize1)
235+ .map(DesensitizeCase :: mockDesensitize2);
236+
237+ CountDownLatch waiter = new CountDownLatch (1 );
238+ result. subscribe(text - > System . out. printf(" %.3f output: %s%n" , (double ) System . currentTimeMillis(), text),
239+ error - > System . err. println(" Error: " + error),
240+ () - > {
241+ long endTime = System . currentTimeMillis();
242+ System . out. printf(" take time: %.3f(ms)%n" , (double ) (endTime - startTime));
243+ waiter. countDown();
244+ });
245+
246+ waiter. await();
247+ }
248+
249+ private static Flux<String > mockStreamModel (Map<String , String > input ) {
250+ List<String > dataList = new ArrayList<> ();
251+ dataList. add(" <think>" );
252+ for (int i = 0 ; i < 48 ; i++ ) {
253+ dataList. add(String . valueOf(i));
254+ }
255+ dataList. add(" </think>" );
256+ for (int i = 0 ; i < 50 ; i++ ) {
257+ dataList. add(String . valueOf(i + 100 ));
258+ }
259+
260+ long startTime = System . currentTimeMillis();
261+ return Flux . generate(() - > 0 , (state, sink) - > {
262+ long diff = System . currentTimeMillis() - startTime;
263+ if (diff < state * 100 ) {
264+ sleep(state * 100 - diff);
265+ }
266+ sink. next(dataList. get(state));
267+ if (state == 99 ) {
268+ sink. complete();
269+ }
270+ return state + 1 ;
271+ });
272+ }
273+
274+ private static Flux<Chunk > classic (Flux<String > stream ) {
275+ AtomicReference<Boolean > isThinking = new AtomicReference<> (false );
276+ return stream. map(message - > {
277+ if (message. trim(). equals(" <think>" )) {
278+ isThinking. set(true );
279+ return new Chunk (true , message);
280+ }
281+ if (message. trim(). equals(" </think>" )) {
282+ isThinking. set(false );
283+ return new Chunk (true , message);
284+ }
285+ if (Boolean . TRUE. equals(isThinking. get())) {
286+ return new Chunk (true , message);
287+ }
288+ return new Chunk (false , message);
289+ });
290+ }
291+
292+ private static String mockDesensitize1 (Chunk input ) {
293+ sleep(100 );
294+ return input. content. replace(" 3" , " *" );
295+ }
296+
297+ private static String mockDesensitize2 (String input ) {
298+ sleep(100 );
299+ return input. replace(" 4" , " *" );
300+ }
301+
302+ private static Chunk log (Chunk chunk ) {
303+ if (! chunk. isThinkContent) {
304+ System . out. println(" log content:" + chunk. content);
305+ }
306+ return chunk;
307+ }
308+
309+ private static void sleep (long ms ) {
310+ try {
311+ Thread . sleep(ms);
312+ } catch (InterruptedException e) {
313+ Thread . currentThread(). interrupt();
314+ }
315+ }
316+
317+ private static class Chunk {
318+ private final boolean isThinkContent;
319+ private final String content;
320+
321+ private Chunk (boolean isThinkContent , String content ) {
322+ this . isThinkContent = isThinkContent;
323+ this . content = content;
324+ }
325+ }
326+ }
327+ ```
328+
210329---
211330
212331### Experiment 2: Backpressure Handling Under Single-Thread Constraints
@@ -233,10 +352,10 @@ This test examines how each framework manages processing bottlenecks when downst
233352
234353** Performance Metrics:**
235354
236- | Metric | AIFlow | LCEL |
237- | ---| ---| ---|
238- | TTFT | 210ms ± 10ms | 280ms ± 10ms |
239- | Total Latency | 12.5s ± 0.1s | 21.9s ± 0.1s |
355+ | Metric | AIFlow | LCEL | RX+LLM |
356+ | ---| ---| ---| --- |
357+ | TTFT | 210ms ± 10ms | 280ms ± 10ms | 300ms ± 10ms |
358+ | Total Latency | 12.5s ± 0.1s | 21.9s ± 0.1s | 21.8s ± 0.1s |
240359
241360** Code Implementation:**
242361
@@ -338,6 +457,75 @@ asyncio.run(main())
338457print (f " take time: { (time.time() - start) * 1000 :.3f } (ms) " )
339458```
340459
460+ ** RX+LLM:**
461+
462+ ``` java
463+ package modelengine.fel.engine ;
464+
465+ import reactor.core.publisher.Flux ;
466+ import reactor.core.publisher.Mono ;
467+
468+ import java.util.Map ;
469+ import java.util.concurrent.CountDownLatch ;
470+
471+ public class BackPressureCase {
472+ public static void main (String [] args ) throws InterruptedException {
473+ String userInput = " hi" ;
474+ long startTime = System . currentTimeMillis();
475+ System . out. printf(" start at: %.3f(ms)%n" , (double ) startTime);
476+
477+ Flux<String > result = Mono . just(Map . of(" user_input" , userInput))
478+ .flatMapMany(BackPressureCase :: mockStreamModel)
479+ .map(BackPressureCase :: mockDesensitize)
480+ .map(BackPressureCase :: mockTTS);
481+
482+ CountDownLatch waiter = new CountDownLatch (1 );
483+ result. subscribe(text - > System . out. printf(" %.3f output: %s%n" , (double ) System . currentTimeMillis(), text),
484+ error - > System . err. println(" Error: " + error),
485+ () - > {
486+ long endTime = System . currentTimeMillis();
487+ System . out. printf(" take time: %.3f(ms)%n" , (double ) (endTime - startTime));
488+ waiter. countDown();
489+ });
490+
491+ waiter. await();
492+ }
493+
494+ private static Flux<String > mockStreamModel (Map<String , String > input ) {
495+ long startTime = System . currentTimeMillis();
496+ return Flux . generate(() - > 0 , (state, sink) - > {
497+ long diff = System . currentTimeMillis() - startTime;
498+ if (diff < state * 50 ) {
499+ sleep(state * 50 - diff);
500+ }
501+ sink. next(String . valueOf(state));
502+ if (state == 99 ) {
503+ sink. complete();
504+ }
505+ return state + 1 ;
506+ });
507+ }
508+
509+ private static String mockDesensitize (String input ) {
510+ sleep(100 );
511+ return input. replace(" 3" , " *" );
512+ }
513+
514+ private static String mockTTS (String input ) {
515+ sleep(100 );
516+ return input;
517+ }
518+
519+ private static void sleep (long ms ) {
520+ try {
521+ Thread . sleep(ms);
522+ } catch (InterruptedException e) {
523+ Thread . currentThread(). interrupt();
524+ }
525+ }
526+ }
527+ ```
528+
341529---
342530
343531### Experiment 3: Concurrent Processing Performance
@@ -363,10 +551,10 @@ This experiment measures throughput optimization under constrained parallel proc
363551
364552** Performance Metrics:**
365553
366- | Metric | AIFlow | LCEL |
367- | ---| ---| ---|
368- | TTFT | 310ms ± 10ms | 380ms ± 10ms |
369- | Total Latency | 11.2s ± 0.1s | 31.6s ± 0.1s |
554+ | Metric | AIFlow | LCEL | RX+LLM |
555+ | ---| ---| ---| --- |
556+ | TTFT | 310ms ± 10ms | 380ms ± 10ms | 400ms ± 10ms |
557+ | Total Latency | 11.2s ± 0.1s | 31.6s ± 0.1s | 12.1s ± 0.1s |
370558
371559** Code Implementation:**
372560
@@ -455,3 +643,63 @@ print(f"start at: {start * 1000:.3f}(ms)")
455643asyncio.run(main())
456644print (f " take time: { (time.time() - start) * 1000 :.3f } (ms) " )
457645```
646+
647+ ** RX+LLM:**
648+
649+ ``` java
650+ package modelengine.fel.engine ;
651+
652+ import reactor.core.publisher.Flux ;
653+ import reactor.core.publisher.Mono ;
654+ import reactor.core.scheduler.Schedulers ;
655+
656+ import java.time.Duration ;
657+ import java.util.Map ;
658+ import java.util.concurrent.CountDownLatch ;
659+
660+ public class ConcurrencyCase {
661+ public static void main (String [] args ) throws InterruptedException {
662+ String userInput = " hi" ;
663+ long startTime = System . currentTimeMillis();
664+ System . out. printf(" start at: %.3f(ms)%n" , (double ) startTime);
665+
666+ Flux<String > result = Mono . just(Map . of(" user_input" , userInput))
667+ .flatMapMany(ConcurrencyCase :: mockStreamModel)
668+ .parallel(3 )
669+ .runOn(Schedulers . parallel())
670+ .map(ConcurrencyCase :: mockDesensitize)
671+ .sequential();
672+
673+ CountDownLatch waiter = new CountDownLatch (1 );
674+ result. subscribe(text - > System . out. printf(" %.3f output: %s%n" , (double ) System . currentTimeMillis(), text),
675+ error - > System . err. println(" Error: " + error),
676+ () - > {
677+ long endTime = System . currentTimeMillis();
678+ System . out. printf(" take time: %.3f(ms)%n" , (double ) (endTime - startTime));
679+ waiter. countDown();
680+ });
681+
682+ waiter. await();
683+ }
684+
685+ private static Flux<String > mockStreamModel (Map<String , String > input ) {
686+ Flux<Integer > firstElement = Flux . just(0 );
687+ Flux<Integer > restElements = Flux . range(1 , 99 ). delayElements(Duration . ofMillis(100 ));
688+
689+ return Flux . concat(firstElement, restElements). map(String :: valueOf);
690+ }
691+
692+ private static String mockDesensitize (String input ) {
693+ sleep(300 );
694+ return input. replace(" 3" , " *" );
695+ }
696+
697+ private static void sleep (long ms ) {
698+ try {
699+ Thread . sleep(ms);
700+ } catch (InterruptedException e) {
701+ Thread . currentThread(). interrupt();
702+ }
703+ }
704+ }
705+ ```
0 commit comments