Skip to content
This repository was archived by the owner on Jul 30, 2023. It is now read-only.

Commit 9ce1878

Browse files
authored
Merge pull request #58 from hero-matsumoto/stream_update
Streaming bug fix.(data missing , I/O exception occurred when shutdown.)
2 parents 57d6fc8 + 88e4de1 commit 9ce1878

File tree

2 files changed

+150
-85
lines changed

2 files changed

+150
-85
lines changed

mastodon4j/src/main/java/com/sys1yagi/mastodon4j/api/Dispatcher.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class Dispatcher {
1414
})
1515

1616
val lock = ReentrantLock()
17-
val shutdownTime = 5000L
17+
val shutdownTime = 1000L
1818

1919
fun invokeLater(task: Runnable) = executorService.execute(task)
2020

mastodon4j/src/main/java/com/sys1yagi/mastodon4j/api/method/Streaming.kt

Lines changed: 149 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,35 @@ class Streaming(private val client: MastodonClient) {
2020
val dispatcher = Dispatcher()
2121
dispatcher.invokeLater(Runnable {
2222
while (true) {
23-
val line = reader.readLine()
24-
if (line == null || line.isEmpty()) {
25-
continue
26-
}
27-
val payload = reader.readLine()
28-
val event = line.split(":")[1].trim()
29-
if (event == "update") {
30-
val start = payload.indexOf(":") + 1
31-
val json = payload.substring(start).trim()
32-
val status = client.getSerializer().fromJson(
33-
json,
34-
Status::class.java
35-
)
36-
handler.onStatus(status)
23+
try{
24+
val line = reader.readLine()
25+
if (line == null || line.isEmpty()) {
26+
continue
27+
}
28+
val type = line.split(":")[0].trim()
29+
if(type != "event"){
30+
continue
31+
}
32+
val event = line.split(":")[1].trim()
33+
val payload = reader.readLine()
34+
val payloadType = payload.split(":")[0].trim()
35+
if(payloadType != "data"){
36+
continue
37+
}
38+
if (event == "update") {
39+
val start = payload.indexOf(":") + 1
40+
val json = payload.substring(start).trim()
41+
val status = client.getSerializer().fromJson(
42+
json,
43+
Status::class.java
44+
)
45+
handler.onStatus(status)
46+
}
47+
}catch (e:java.io.InterruptedIOException){
48+
break
3749
}
3850
}
51+
reader.close()
3952
})
4053
return Shutdownable(dispatcher)
4154
} else {
@@ -51,22 +64,35 @@ class Streaming(private val client: MastodonClient) {
5164
val dispatcher = Dispatcher()
5265
dispatcher.invokeLater(Runnable {
5366
while (true) {
54-
val line = reader.readLine()
55-
if (line == null || line.isEmpty()) {
56-
continue
57-
}
58-
val payload = reader.readLine()
59-
val event = line.split(":")[1].trim()
60-
if (event == "update") {
61-
val start = payload.indexOf(":") + 1
62-
val json = payload.substring(start).trim()
63-
val status = client.getSerializer().fromJson(
64-
json,
65-
Status::class.java
66-
)
67-
handler.onStatus(status)
67+
try{
68+
val line = reader.readLine()
69+
if (line == null || line.isEmpty()) {
70+
continue
71+
}
72+
val type = line.split(":")[0].trim()
73+
if(type != "event"){
74+
continue
75+
}
76+
val event = line.split(":")[1].trim()
77+
val payload = reader.readLine()
78+
val payloadType = payload.split(":")[0].trim()
79+
if(payloadType != "data"){
80+
continue
81+
}
82+
if (event == "update") {
83+
val start = payload.indexOf(":") + 1
84+
val json = payload.substring(start).trim()
85+
val status = client.getSerializer().fromJson(
86+
json,
87+
Status::class.java
88+
)
89+
handler.onStatus(status)
90+
}
91+
}catch (e:java.io.InterruptedIOException){
92+
break
6893
}
6994
}
95+
reader.close()
7096
})
7197
return Shutdownable(dispatcher)
7298
} else {
@@ -85,22 +111,35 @@ class Streaming(private val client: MastodonClient) {
85111
val dispatcher = Dispatcher()
86112
dispatcher.invokeLater(Runnable {
87113
while (true) {
88-
val line = reader.readLine()
89-
if (line == null || line.isEmpty()) {
90-
continue
91-
}
92-
val payload = reader.readLine()
93-
val event = line.split(":")[1].trim()
94-
if (event == "update") {
95-
val start = payload.indexOf(":") + 1
96-
val json = payload.substring(start).trim()
97-
val status = client.getSerializer().fromJson(
98-
json,
99-
Status::class.java
100-
)
101-
handler.onStatus(status)
114+
try{
115+
val line = reader.readLine()
116+
if (line == null || line.isEmpty()) {
117+
continue
118+
}
119+
val type = line.split(":")[0].trim()
120+
if(type != "event"){
121+
continue
122+
}
123+
val event = line.split(":")[1].trim()
124+
val payload = reader.readLine()
125+
val payloadType = payload.split(":")[0].trim()
126+
if(payloadType != "data"){
127+
continue
128+
}
129+
if (event == "update") {
130+
val start = payload.indexOf(":") + 1
131+
val json = payload.substring(start).trim()
132+
val status = client.getSerializer().fromJson(
133+
json,
134+
Status::class.java
135+
)
136+
handler.onStatus(status)
137+
}
138+
}catch (e:java.io.InterruptedIOException){
139+
break
102140
}
103141
}
142+
reader.close()
104143
})
105144
return Shutdownable(dispatcher)
106145
} else {
@@ -119,22 +158,35 @@ class Streaming(private val client: MastodonClient) {
119158
val dispatcher = Dispatcher()
120159
dispatcher.invokeLater(Runnable {
121160
while (true) {
122-
val line = reader.readLine()
123-
if (line == null || line.isEmpty()) {
124-
continue
125-
}
126-
val payload = reader.readLine()
127-
val event = line.split(":")[1].trim()
128-
if (event == "update") {
129-
val start = payload.indexOf(":") + 1
130-
val json = payload.substring(start).trim()
131-
val status = client.getSerializer().fromJson(
132-
json,
133-
Status::class.java
134-
)
135-
handler.onStatus(status)
161+
try{
162+
val line = reader.readLine()
163+
if (line == null || line.isEmpty()) {
164+
continue
165+
}
166+
val type = line.split(":")[0].trim()
167+
if(type != "event"){
168+
continue
169+
}
170+
val event = line.split(":")[1].trim()
171+
val payload = reader.readLine()
172+
val payloadType = payload.split(":")[0].trim()
173+
if(payloadType != "data"){
174+
continue
175+
}
176+
if (event == "update") {
177+
val start = payload.indexOf(":") + 1
178+
val json = payload.substring(start).trim()
179+
val status = client.getSerializer().fromJson(
180+
json,
181+
Status::class.java
182+
)
183+
handler.onStatus(status)
184+
}
185+
}catch (e:java.io.InterruptedIOException){
186+
break
136187
}
137188
}
189+
reader.close()
138190
})
139191
return Shutdownable(dispatcher)
140192
} else {
@@ -152,37 +204,50 @@ class Streaming(private val client: MastodonClient) {
152204
val dispatcher = Dispatcher()
153205
dispatcher.invokeLater(Runnable {
154206
while (true) {
155-
val line = reader.readLine()
156-
if (line == null || line.isEmpty()) {
157-
continue
158-
}
159-
val event = line.split(":")[1].trim()
207+
try{
208+
val line = reader.readLine()
209+
if (line == null || line.isEmpty()) {
210+
continue
211+
}
212+
val type = line.split(":")[0].trim()
213+
if(type != "event"){
214+
continue
215+
}
216+
val event = line.split(":")[1].trim()
217+
val payload = reader.readLine()
218+
val payloadType = payload.split(":")[0].trim()
219+
if(payloadType != "data"){
220+
continue
221+
}
160222

161-
val payload = reader.readLine()
162-
val start = payload.indexOf(":") + 1
163-
val json = payload.substring(start).trim()
164-
if (event == "update") {
165-
val status = client.getSerializer().fromJson(
166-
json,
167-
Status::class.java
168-
)
169-
handler.onStatus(status)
170-
}
171-
if (event == "notification") {
172-
val notification = client.getSerializer().fromJson(
173-
json,
174-
Notification::class.java
175-
)
176-
handler.onNotification(notification)
177-
}
178-
if (event == "delete") {
179-
val id = client.getSerializer().fromJson(
180-
json,
181-
Long::class.java
182-
)
183-
handler.onDelete(id)
223+
val start = payload.indexOf(":") + 1
224+
val json = payload.substring(start).trim()
225+
if (event == "update") {
226+
val status = client.getSerializer().fromJson(
227+
json,
228+
Status::class.java
229+
)
230+
handler.onStatus(status)
231+
}
232+
if (event == "notification") {
233+
val notification = client.getSerializer().fromJson(
234+
json,
235+
Notification::class.java
236+
)
237+
handler.onNotification(notification)
238+
}
239+
if (event == "delete") {
240+
val id = client.getSerializer().fromJson(
241+
json,
242+
Long::class.java
243+
)
244+
handler.onDelete(id)
245+
}
246+
}catch (e:java.io.InterruptedIOException){
247+
break
184248
}
185249
}
250+
reader.close()
186251
})
187252
return Shutdownable(dispatcher)
188253
} else {

0 commit comments

Comments
 (0)