Skip to content

Commit 1fa515f

Browse files
committed
Adding backpressure pattern #3233
1 parent b8f51f0 commit 1fa515f

File tree

10 files changed

+380
-199
lines changed

10 files changed

+380
-199
lines changed

backpressure/README.md

Lines changed: 105 additions & 187 deletions
Large diffs are not rendered by default.

backpressure/pom.xml

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,27 @@
1111

1212
<artifactId>backpressure</artifactId>
1313

14-
<properties>
15-
<maven.compiler.source>21</maven.compiler.source>
16-
<maven.compiler.target>21</maven.compiler.target>
17-
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
18-
</properties>
14+
<dependencies>
15+
<dependency>
16+
<groupId>io.projectreactor</groupId>
17+
<artifactId>reactor-core</artifactId>
18+
<version>3.8.0-M1</version>
19+
</dependency>
20+
<dependency>
21+
<groupId>ch.qos.logback</groupId>
22+
<artifactId>logback-classic</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.junit.jupiter</groupId>
26+
<artifactId>junit-jupiter-engine</artifactId>
27+
<scope>test</scope>
28+
</dependency>
29+
<dependency>
30+
<groupId>io.projectreactor</groupId>
31+
<artifactId>reactor-test</artifactId>
32+
<version>3.8.0-M1</version>
33+
<scope>test</scope>
34+
</dependency>
35+
</dependencies>
1936

2037
</project>

backpressure/src/main/java/com/iluwatar/App.java

Lines changed: 0 additions & 7 deletions
This file was deleted.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.iluwatar.backpressure;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import lombok.extern.slf4j.Slf4j;
5+
6+
/**
7+
* The Backpressure pattern is a flow control mechanism. It allows a consumer to signal to a
8+
* producer to slow down or stop sending data when it's overwhelmed.
9+
* <li>Prevents memory overflow, CPU thrashing, and resource exhaustion.
10+
* <li>Ensures fair usage of resources in distributed systems.
11+
* <li>Avoids buffer bloat and latency spikes.
12+
* Key concepts of this design paradigm involves
13+
* <li>Publisher/Producer: Generates data.
14+
* <li>Subscriber/Consumer: Receives and processes data.
15+
*
16+
* <p>In this example we will create a {@link Publisher} and a {@link Subscriber}. Publisher will
17+
* emit a stream of integer values with a predefined delay. Subscriber takes 500 ms to process one
18+
* integer. Since the subscriber can't process the items fast enough we apply backpressure to the
19+
* publisher so that it will request 10 items first, process 5 items and request for the next 5
20+
* again. After processing 5 items subscriber will keep requesting for another 5 until the stream
21+
* ends.
22+
*/
23+
@Slf4j
24+
public class App {
25+
26+
protected static CountDownLatch latch;
27+
28+
/**
29+
* Program entry point.
30+
*
31+
* @param args command line args
32+
*/
33+
public static void main(String[] args) throws InterruptedException {
34+
35+
/*
36+
* This custom subscriber applies backpressure:
37+
* - Has a processing delay of 0.5 milliseconds
38+
* - Requests 10 items initially
39+
* - Process 5 items and request for the next 5 items
40+
*/
41+
Subscriber sub = new Subscriber();
42+
// slow publisher emit 15 numbers with a delay of 200 milliseconds
43+
Publisher.publish(1, 17, 200).subscribe(sub);
44+
45+
latch = new CountDownLatch(1);
46+
latch.await();
47+
48+
sub = new Subscriber();
49+
// fast publisher emit 15 numbers with a delay of 1 millisecond
50+
Publisher.publish(1, 17, 1).subscribe(sub);
51+
52+
latch = new CountDownLatch(1);
53+
latch.await();
54+
}
55+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.iluwatar.backpressure;
2+
3+
import java.time.Duration;
4+
import reactor.core.publisher.Flux;
5+
6+
/**
7+
* This class is the publisher that generates the data stream.
8+
*/
9+
public class Publisher {
10+
11+
/**
12+
* On message method will trigger when the subscribed event is published.
13+
*
14+
* @param start starting integer
15+
* @param count how many integers to emit
16+
* @param delay delay between each item in milliseconds
17+
* @return a flux stream of integers
18+
*/
19+
public static Flux<Integer> publish(int start, int count, int delay) {
20+
return Flux.range(start, count).delayElements(Duration.ofMillis(delay)).log();
21+
}
22+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.iluwatar.backpressure;
2+
3+
import lombok.NonNull;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.reactivestreams.Subscription;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import reactor.core.publisher.BaseSubscriber;
9+
10+
/**
11+
* This class is the custom subscriber that subscribes to the data stream.
12+
*/
13+
@Slf4j
14+
public class Subscriber extends BaseSubscriber<Integer> {
15+
16+
private static final Logger logger = LoggerFactory.getLogger(Subscriber.class);
17+
18+
@Override
19+
protected void hookOnSubscribe(@NonNull Subscription subscription) {
20+
logger.info("subscribe()");
21+
request(10); //request 10 items initially
22+
}
23+
24+
@Override
25+
protected void hookOnNext(@NonNull Integer value) {
26+
processItem();
27+
logger.info("process({})", value);
28+
if (value % 5 == 0) {
29+
// request for the next 5 items after processing first 5
30+
request(5);
31+
}
32+
}
33+
34+
@Override
35+
protected void hookOnComplete() {
36+
App.latch.countDown();
37+
}
38+
39+
private void processItem() {
40+
try {
41+
Thread.sleep(500); // simulate slow processing
42+
} catch (InterruptedException e) {
43+
logger.error(e.getMessage(), e);
44+
}
45+
}
46+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.iluwatar.backpressure;
2+
3+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
4+
5+
import org.junit.jupiter.api.Test;
6+
7+
public class AppTest {
8+
9+
@Test
10+
void shouldExecuteApplicationWithoutException() {
11+
assertDoesNotThrow(() -> App.main(new String[] {}));
12+
}
13+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
3+
*
4+
* The MIT License
5+
* Copyright © 2014-2022 Ilkka Seppälä
6+
*
7+
* Permission is hereby granted, free of charge, to any person obtaining a copy
8+
* of this software and associated documentation files (the "Software"), to deal
9+
* in the Software without restriction, including without limitation the rights
10+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
* copies of the Software, and to permit persons to whom the Software is
12+
* furnished to do so, subject to the following conditions:
13+
*
14+
* The above copyright notice and this permission notice shall be included in
15+
* all copies or substantial portions of the Software.
16+
*
17+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
* THE SOFTWARE.
24+
*/
25+
package com.iluwatar.backpressure;
26+
27+
import ch.qos.logback.classic.Logger;
28+
import ch.qos.logback.classic.spi.ILoggingEvent;
29+
import ch.qos.logback.core.read.ListAppender;
30+
import java.util.List;
31+
import java.util.stream.Collectors;
32+
import org.junit.jupiter.api.extension.AfterEachCallback;
33+
import org.junit.jupiter.api.extension.BeforeEachCallback;
34+
import org.junit.jupiter.api.extension.ExtensionContext;
35+
import org.slf4j.LoggerFactory;
36+
37+
public class LoggerExtension implements BeforeEachCallback, AfterEachCallback {
38+
39+
private final ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
40+
private final Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
41+
42+
@Override
43+
public void afterEach(ExtensionContext extensionContext) throws Exception {
44+
listAppender.stop();
45+
listAppender.list.clear();
46+
logger.detachAppender(listAppender);
47+
}
48+
49+
@Override
50+
public void beforeEach(ExtensionContext extensionContext) throws Exception {
51+
logger.addAppender(listAppender);
52+
listAppender.start();
53+
}
54+
55+
public List<String> getMessages() {
56+
return listAppender.list.stream().map(e -> e.getMessage()).collect(Collectors.toList());
57+
}
58+
59+
public List<String> getFormattedMessages() {
60+
return listAppender.list.stream()
61+
.map(e -> e.getFormattedMessage())
62+
.collect(Collectors.toList());
63+
}
64+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.iluwatar.backpressure;
2+
3+
import static com.iluwatar.backpressure.Publisher.publish;
4+
5+
import java.time.Duration;
6+
import org.junit.jupiter.api.Test;
7+
import reactor.core.publisher.Flux;
8+
import reactor.test.StepVerifier;
9+
10+
public class PublisherTest {
11+
12+
@Test
13+
public void testPublish() {
14+
15+
Flux<Integer> flux = publish(1, 3, 200);
16+
17+
StepVerifier.withVirtualTime(() -> flux)
18+
.expectSubscription()
19+
.expectNoEvent(Duration.ofMillis(200))
20+
.expectNext(1)
21+
.expectNoEvent(Duration.ofSeconds(200))
22+
.expectNext(2)
23+
.expectNoEvent(Duration.ofSeconds(200))
24+
.expectNext(3)
25+
.verifyComplete();
26+
}
27+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.iluwatar.backpressure;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import java.util.concurrent.CountDownLatch;
6+
import org.junit.jupiter.api.Test;
7+
import org.junit.jupiter.api.extension.RegisterExtension;
8+
9+
public class SubscriberTest {
10+
11+
@RegisterExtension
12+
public LoggerExtension loggerExtension = new LoggerExtension();
13+
14+
@Test
15+
public void testSubscribe() throws InterruptedException {
16+
App.latch = new CountDownLatch(1);
17+
Subscriber sub = new Subscriber();
18+
Publisher.publish(1, 8, 100).subscribe(sub);
19+
20+
App.latch.await();
21+
assertEquals(22, loggerExtension.getFormattedMessages().size());
22+
assertEquals("subscribe()", loggerExtension.getFormattedMessages().get(2));
23+
assertEquals("request(10)", loggerExtension.getFormattedMessages().get(3));
24+
assertEquals("request(5)", loggerExtension.getFormattedMessages().get(14));
25+
}
26+
}

0 commit comments

Comments
 (0)