Skip to content

Commit 64ff0bb

Browse files
authored
Merge pull request #11 from leancloud/fix/fetcher-self-close
do not triger wake when fetch exit by itself
2 parents 8b8cc4b + c6983b6 commit 64ff0bb

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

src/main/java/cn/leancloud/kafka/consumer/Fetcher.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,15 +144,15 @@ public void run() {
144144
}
145145
} catch (ExecutionException ex) {
146146
unsubscribedStatus = UnsubscribedStatus.ERROR;
147-
close();
147+
markClosed();
148148
break;
149149
} catch (Exception ex) {
150150
if (ex instanceof InterruptedException) {
151151
Thread.currentThread().interrupt();
152152
}
153153

154154
unsubscribedStatus = UnsubscribedStatus.ERROR;
155-
close();
155+
markClosed();
156156
logger.error("Fetcher quit with unexpected exception. Will rebalance after poll timeout.", ex);
157157
break;
158158
}
@@ -164,7 +164,7 @@ public void run() {
164164

165165
@Override
166166
public void close() {
167-
closed = true;
167+
markClosed();
168168
consumer.wakeup();
169169
}
170170

@@ -177,6 +177,10 @@ Map<ConsumerRecord<K, V>, Future<ConsumerRecord<K, V>>> pendingFutures() {
177177
return pendingFutures;
178178
}
179179

180+
private void markClosed() {
181+
closed = true;
182+
}
183+
180184
private boolean closed() {
181185
return closed;
182186
}

src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,15 +203,15 @@ private void ensureInInit() {
203203

204204
private String fetcherThreadName(Collection<String> topics) {
205205
final String firstTopic = topics.iterator().next();
206-
String postfix = firstTopic.substring(0, min(10, firstTopic.length()));
207-
postfix += topics.size() > 1 || firstTopic.length() > 10 ? "..." : "";
206+
String postfix = firstTopic.substring(0, min(50, firstTopic.length()));
207+
postfix += topics.size() > 1 || firstTopic.length() > 50 ? "..." : "";
208208
return "kafka-fetcher-for-" + postfix;
209209
}
210210

211211
private String fetcherThreadName(Pattern pattern) {
212212
final String patternInString = pattern.toString();
213-
String postfix = patternInString.substring(0, min(10, patternInString.length()));
214-
postfix += patternInString.length() > 10 ? "..." : "";
213+
String postfix = patternInString.substring(0, min(50, patternInString.length()));
214+
postfix += patternInString.length() > 50 ? "..." : "";
215215
return "kafka-fetcher-for-" + postfix;
216216
}
217217
}

0 commit comments

Comments
 (0)