Skip to content

Commit 4b7db6e

Browse files
committed
sse test
1 parent ddce764 commit 4b7db6e

File tree

4 files changed

+225
-1
lines changed

4 files changed

+225
-1
lines changed

concurrency/src/test/java/thread/schdule/SchedulerPoolTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,23 @@ public void testSchedulerGetResult() throws Exception {
232232
Thread.currentThread().join(10000);
233233
}
234234

235+
/**
236+
* scheduleWithFixedDelay 和 scheduleAtFixedRate 如果任务抛出了未处理的异常,会中断后续的调度计划
237+
*/
238+
@Test
239+
public void testBrokeFixedTask() throws Exception {
240+
AtomicInteger cnt = new AtomicInteger();
241+
customScheduler.scheduleWithFixedDelay(() -> {
242+
int index = cnt.incrementAndGet();
243+
log.info("index={}", index);
244+
if (index > 4) {
245+
throw new RuntimeException("Broke");
246+
}
247+
248+
}, 2, 1, TimeUnit.SECONDS);
249+
Thread.sleep(1000000);
250+
}
251+
235252
@Test
236253
public void testScheduleDrop() throws Exception {
237254
for (int i = 0; i < 10; i++) {

network/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,24 @@
2828
</exclusion>
2929
</exclusions>
3030
</dependency>
31+
3132
<dependency>
3233
<groupId>com.squareup.okhttp3</groupId>
3334
<artifactId>okhttp</artifactId>
3435
</dependency>
36+
<dependency>
37+
<groupId>com.squareup.okhttp3</groupId>
38+
<artifactId>okhttp-sse</artifactId>
39+
</dependency>
40+
<dependency>
41+
<groupId>com.squareup.okhttp3</groupId>
42+
<artifactId>logging-interceptor</artifactId>
43+
</dependency>
44+
<dependency>
45+
<groupId>com.squareup.okhttp3</groupId>
46+
<artifactId>mockwebserver</artifactId>
47+
</dependency>
48+
3549
<dependency>
3650
<groupId>org.apache.commons</groupId>
3751
<artifactId>commons-lang3</artifactId>
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package com.github.kuangcp.http.sse;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import lombok.extern.slf4j.Slf4j;
5+
import okhttp3.OkHttpClient;
6+
import okhttp3.Request;
7+
import okhttp3.RequestBody;
8+
import okhttp3.Response;
9+
import okhttp3.mockwebserver.MockResponse;
10+
import okhttp3.mockwebserver.MockWebServer;
11+
import okhttp3.sse.EventSource;
12+
import okhttp3.sse.EventSourceListener;
13+
import okhttp3.sse.EventSources;
14+
import org.jetbrains.annotations.NotNull;
15+
import org.jetbrains.annotations.Nullable;
16+
import org.junit.After;
17+
import org.junit.Before;
18+
import org.junit.Test;
19+
20+
import java.io.IOException;
21+
import java.net.SocketTimeoutException;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import static org.junit.Assert.assertTrue;
25+
import static org.junit.Assert.fail;
26+
27+
/**
28+
* @author Kuangcp
29+
* 2025-02-07 10:21
30+
*/
31+
@Slf4j
32+
public class ReadTest {
33+
private MockWebServer server;
34+
private OkHttpClient client;
35+
36+
ObjectMapper mapper = new ObjectMapper();
37+
38+
@Before
39+
public void setUp() {
40+
server = new MockWebServer();
41+
try {
42+
server.start();
43+
} catch (IOException e) {
44+
throw new RuntimeException(e);
45+
}
46+
47+
// 设置 OkHttpClient 超时参数
48+
client = new OkHttpClient.Builder()
49+
.connectTimeout(5, TimeUnit.SECONDS) // 连接超时5秒
50+
.writeTimeout(5, TimeUnit.SECONDS) // 写入超时5秒
51+
.readTimeout(5, TimeUnit.SECONDS) // 读取超时5秒
52+
.build();
53+
}
54+
55+
@After
56+
public void tearDown() throws IOException {
57+
server.shutdown();
58+
}
59+
60+
@Test
61+
public void testConnectTimeout() throws IOException {
62+
// 模拟服务器延迟响应
63+
server.enqueue(new MockResponse().setBodyDelay(6, TimeUnit.SECONDS));
64+
65+
Request request = new Request.Builder()
66+
.url(server.url("/"))
67+
.build();
68+
69+
try {
70+
client.newCall(request).execute();
71+
fail("Expected a SocketTimeoutException to be thrown for connect timeout");
72+
} catch (SocketTimeoutException e) {
73+
// 连接超时,测试通过
74+
assertTrue(true);
75+
}
76+
}
77+
78+
@Test
79+
public void testWriteTimeout() throws IOException {
80+
// 这个测试依赖于服务器的配置,确保服务器不会立即发送响应头
81+
server.enqueue(new MockResponse().setBodyDelay(6, TimeUnit.SECONDS));
82+
83+
Request request = new Request.Builder()
84+
.url(server.url("/"))
85+
.post(RequestBody.create(null, "This is the request body"))
86+
.build();
87+
88+
try {
89+
client.newCall(request).execute();
90+
fail("Expected a SocketTimeoutException to be thrown for write timeout");
91+
} catch (SocketTimeoutException e) {
92+
// 写入超时,测试通过
93+
assertTrue(true);
94+
}
95+
}
96+
97+
@Test
98+
public void testReadTimeout() throws IOException {
99+
// 模拟服务器立即发送响应头,但延迟发送响应体
100+
server.enqueue(new MockResponse()
101+
.setBody("Response body")
102+
// .setHeadersDelay(10,TimeUnit.SECONDS)
103+
.setBodyDelay(10, TimeUnit.SECONDS));
104+
105+
Request request = new Request.Builder()
106+
.url(server.url("/"))
107+
.build();
108+
try {
109+
Response response = client.newCall(request).execute();
110+
String responseBody = response.body().string();
111+
// System.out.println(responseBody);
112+
// 读取超时,测试通过
113+
fail("Expected a SocketTimeoutException to be thrown for read timeout");
114+
} catch (SocketTimeoutException e) {
115+
// 写入超时,测试通过
116+
assertTrue(true);
117+
}
118+
}
119+
120+
@Test
121+
public void testReadTimeoutWithEventSource() throws IOException, InterruptedException {
122+
// 模拟服务器立即发送响应头,但延迟发送响应体
123+
server.enqueue(new MockResponse()
124+
.setBody(": keep-alive") // SSE需要发送特定的响应头
125+
.addHeader("Content-Type", "text/event-stream") // 设置正确的MIME类型
126+
.setBodyDelay(5, TimeUnit.SECONDS)
127+
); // 延迟发送事件
128+
129+
Request request = new Request.Builder()
130+
.url(server.url("/"))
131+
.build();
132+
133+
EventSource.Factory factory = EventSources.createFactory(client);
134+
135+
EventSourceListener eventSourceListener = new EventSourceListener() {
136+
@Override
137+
public void onClosed(@NotNull EventSource eventSource) {
138+
log.info("closed {}", eventSource);
139+
super.onClosed(eventSource);
140+
}
141+
142+
@Override
143+
public void onEvent(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) {
144+
log.info("event");
145+
super.onEvent(eventSource, id, type, data);
146+
}
147+
148+
@Override
149+
public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) {
150+
assert t != null;
151+
log.error("返回异常信息:{}", t.getMessage());
152+
153+
try {
154+
log.error("response:{}", mapper.writeValueAsString(response));
155+
} catch (Exception e) {
156+
log.error("", e);
157+
}
158+
// super.onFailure(eventSource, t, response);
159+
}
160+
161+
@Override
162+
public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
163+
log.info("open {}", response);
164+
super.onOpen(eventSource, response);
165+
}
166+
};
167+
168+
log.info("request");
169+
final EventSource eventSource = factory.newEventSource(request, eventSourceListener);
170+
171+
// 等待足够的时间以触发超时
172+
Thread.sleep(15000); // 等待时间应大于服务器延迟时间
173+
174+
// 测试结束,关闭EventSource
175+
}
176+
}

pom.xml

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,28 @@
173173
<version>6.14.3</version>
174174
<scope>test</scope>
175175
</dependency>
176+
176177
<dependency>
177178
<groupId>com.squareup.okhttp3</groupId>
178179
<artifactId>okhttp</artifactId>
179-
<version>4.11.0</version>
180+
<version>4.12.0</version>
181+
</dependency>
182+
<dependency>
183+
<groupId>com.squareup.okhttp3</groupId>
184+
<artifactId>okhttp-sse</artifactId>
185+
<version>4.12.0</version>
180186
</dependency>
187+
<dependency>
188+
<groupId>com.squareup.okhttp3</groupId>
189+
<artifactId>logging-interceptor</artifactId>
190+
<version>4.12.0</version>
191+
</dependency>
192+
<dependency>
193+
<groupId>com.squareup.okhttp3</groupId>
194+
<artifactId>mockwebserver</artifactId>
195+
<version>4.12.0</version>
196+
</dependency>
197+
181198
<dependency>
182199
<groupId>io.netty</groupId>
183200
<artifactId>netty-all</artifactId>

0 commit comments

Comments
 (0)