Skip to content

Commit f6cd532

Browse files
committed
init commit
0 parents  commit f6cd532

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+3934
-0
lines changed

.gitignore

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/target
2+
/classes
3+
/checkouts
4+
*.jar
5+
*.class
6+
.hgignore
7+
.hg/
8+
*.iml
9+
.idea/

pom.xml

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>cn.leancloud</groupId>
8+
<artifactId>kafka-java-consumer</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<name>${project.groupId}:${project.artifactId}</name>
12+
<description>A kafka consumer client for Java.</description>
13+
<url>https://github.com/leancloud/kafka-java-consumer</url>
14+
15+
<licenses>
16+
<license>
17+
<name>MIT License</name>
18+
<url>http://www.opensource.org/licenses/mit-license.php</url>
19+
</license>
20+
</licenses>
21+
22+
<organization>
23+
<name>LeanCloud</name>
24+
<url>https://leancloud.cn/</url>
25+
</organization>
26+
27+
<developers>
28+
<developer>
29+
<name>Rui Guo</name>
30+
<email>[email protected]</email>
31+
<organization>LeanCloud</organization>
32+
<organizationUrl>https://leancloud.cn/</organizationUrl>
33+
</developer>
34+
</developers>
35+
36+
<scm>
37+
<connection>scm:git:git://github.com/leancloud/kafka-java-consumer.git</connection>
38+
<developerConnection>scm:git:https://github.com/leancloud/kafka-java-consumer.git</developerConnection>
39+
<url>http://github.com/leancloud/kafka-java-consumer/tree/master</url>
40+
<tag>HEAD</tag>
41+
</scm>
42+
43+
<distributionManagement>
44+
<repository>
45+
<id>ossrh</id>
46+
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
47+
</repository>
48+
<snapshotRepository>
49+
<id>ossrh</id>
50+
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
51+
</snapshotRepository>
52+
</distributionManagement>
53+
54+
<dependencies>
55+
<dependency>
56+
<groupId>org.apache.kafka</groupId>
57+
<artifactId>kafka-clients</artifactId>
58+
<version>1.1.1</version>
59+
</dependency>
60+
<dependency>
61+
<groupId>com.google.code.findbugs</groupId>
62+
<artifactId>jsr305</artifactId>
63+
<version>3.0.2</version>
64+
</dependency>
65+
<dependency>
66+
<groupId>org.slf4j</groupId>
67+
<artifactId>slf4j-api</artifactId>
68+
<version>1.7.29</version>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.apache.logging.log4j</groupId>
72+
<artifactId>log4j-slf4j-impl</artifactId>
73+
<version>2.12.1</version>
74+
<scope>test</scope>
75+
</dependency>
76+
<dependency>
77+
<groupId>org.apache.logging.log4j</groupId>
78+
<artifactId>log4j-api</artifactId>
79+
<version>2.12.1</version>
80+
<scope>test</scope>
81+
</dependency>
82+
<dependency>
83+
<groupId>org.apache.logging.log4j</groupId>
84+
<artifactId>log4j-core</artifactId>
85+
<version>2.12.1</version>
86+
<scope>test</scope>
87+
</dependency>
88+
<dependency>
89+
<groupId>junit</groupId>
90+
<artifactId>junit</artifactId>
91+
<version>4.12</version>
92+
<scope>test</scope>
93+
</dependency>
94+
<dependency>
95+
<groupId>org.assertj</groupId>
96+
<artifactId>assertj-core</artifactId>
97+
<version>3.13.2</version>
98+
<scope>test</scope>
99+
</dependency>
100+
<dependency>
101+
<groupId>org.awaitility</groupId>
102+
<artifactId>awaitility</artifactId>
103+
<version>4.0.1</version>
104+
<scope>test</scope>
105+
</dependency>
106+
<dependency>
107+
<groupId>org.mockito</groupId>
108+
<artifactId>mockito-core</artifactId>
109+
<version>3.0.0</version>
110+
<scope>test</scope>
111+
</dependency>
112+
</dependencies>
113+
114+
<build>
115+
116+
<plugins>
117+
<plugin>
118+
<groupId>com.github.spotbugs</groupId>
119+
<artifactId>spotbugs-maven-plugin</artifactId>
120+
<version>3.1.12.2</version>
121+
<dependencies>
122+
<dependency>
123+
<groupId>com.github.spotbugs</groupId>
124+
<artifactId>spotbugs</artifactId>
125+
<version>4.0.0-beta4</version>
126+
</dependency>
127+
</dependencies>
128+
</plugin>
129+
<plugin>
130+
<groupId>org.codehaus.mojo</groupId>
131+
<artifactId>versions-maven-plugin</artifactId>
132+
<version>2.7</version>
133+
<configuration>
134+
<generateBackupPoms>false</generateBackupPoms>
135+
</configuration>
136+
</plugin>
137+
<plugin>
138+
<artifactId>maven-compiler-plugin</artifactId>
139+
<version>3.8.1</version>
140+
<configuration>
141+
<source>1.8</source>
142+
<target>1.8</target>
143+
<useIncrementalCompilation>false</useIncrementalCompilation>
144+
<compilerArgs>
145+
<arg>-parameters</arg>
146+
</compilerArgs>
147+
</configuration>
148+
</plugin>
149+
<plugin>
150+
<groupId>org.apache.maven.plugins</groupId>
151+
<artifactId>maven-gpg-plugin</artifactId>
152+
<version>1.6</version>
153+
<executions>
154+
<execution>
155+
<id>sign-artifacts</id>
156+
<phase>verify</phase>
157+
<goals>
158+
<goal>sign</goal>
159+
</goals>
160+
</execution>
161+
</executions>
162+
</plugin>
163+
<plugin>
164+
<groupId>org.apache.maven.plugins</groupId>
165+
<artifactId>maven-jar-plugin</artifactId>
166+
<version>3.1.2</version>
167+
<configuration>
168+
<excludes>
169+
<exclude>**/.gitignore</exclude>
170+
</excludes>
171+
</configuration>
172+
</plugin>
173+
<plugin>
174+
<groupId>org.apache.maven.plugins</groupId>
175+
<artifactId>maven-javadoc-plugin</artifactId>
176+
<version>3.0.1</version>
177+
<configuration>
178+
<source>8</source>
179+
</configuration>
180+
<executions>
181+
<execution>
182+
<id>attach-javadocs</id>
183+
<goals>
184+
<goal>jar</goal>
185+
</goals>
186+
</execution>
187+
</executions>
188+
</plugin>
189+
<plugin>
190+
<groupId>org.apache.maven.plugins</groupId>
191+
<artifactId>maven-shade-plugin</artifactId>
192+
<version>3.2.1</version>
193+
</plugin>
194+
<plugin>
195+
<groupId>org.apache.maven.plugins</groupId>
196+
<artifactId>maven-source-plugin</artifactId>
197+
<version>3.0.1</version>
198+
<executions>
199+
<execution>
200+
<id>attach-sources</id>
201+
<goals>
202+
<goal>jar</goal>
203+
</goals>
204+
</execution>
205+
</executions>
206+
</plugin>
207+
<plugin>
208+
<groupId>org.sonatype.plugins</groupId>
209+
<artifactId>nexus-staging-maven-plugin</artifactId>
210+
<version>1.6.7</version>
211+
<extensions>true</extensions>
212+
<configuration>
213+
<serverId>ossrh</serverId>
214+
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
215+
<autoReleaseAfterClose>false</autoReleaseAfterClose>
216+
<skipNexusStagingDeployMojo>true</skipNexusStagingDeployMojo>
217+
</configuration>
218+
</plugin>
219+
<plugin>
220+
<groupId>org.apache.maven.plugins</groupId>
221+
<artifactId>maven-assembly-plugin</artifactId>
222+
<version>3.1.1</version>
223+
<configuration>
224+
<descriptorRefs>
225+
<descriptorRef>jar-with-dependencies</descriptorRef>
226+
</descriptorRefs>
227+
</configuration>
228+
<executions>
229+
<execution>
230+
<id>assemble-all</id>
231+
<phase>package</phase>
232+
<goals>
233+
<goal>single</goal>
234+
</goals>
235+
</execution>
236+
</executions>
237+
</plugin>
238+
</plugins>
239+
</build>
240+
241+
<!-- profile definitions -->
242+
<profiles>
243+
<!-- GPG Signature on release -->
244+
<profile>
245+
<id>release-sign-artifacts</id>
246+
<activation>
247+
<property>
248+
<name>performRelease</name>
249+
<value>true</value>
250+
</property>
251+
</activation>
252+
</profile>
253+
</profiles>
254+
</project>
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
import org.apache.kafka.clients.consumer.Consumer;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
6+
import org.apache.kafka.common.TopicPartition;
7+
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.Set;
11+
12+
import static java.util.Comparator.comparing;
13+
import static java.util.function.BinaryOperator.maxBy;
14+
import static java.util.stream.Collectors.toSet;
15+
16+
abstract class AbstractCommitPolicy<K,V> implements CommitPolicy<K,V> {
17+
protected final Consumer<K, V> consumer;
18+
final Map<TopicPartition, Long> topicOffsetHighWaterMark;
19+
final Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets;
20+
21+
AbstractCommitPolicy(Consumer<K, V> consumer) {
22+
this.consumer = consumer;
23+
this.topicOffsetHighWaterMark = new HashMap<>();
24+
this.completedTopicOffsets = new HashMap<>();
25+
}
26+
27+
@Override
28+
public void addPendingRecord(ConsumerRecord<K, V> record) {
29+
topicOffsetHighWaterMark.merge(
30+
new TopicPartition(record.topic(), record.partition()),
31+
record.offset() + 1,
32+
Math::max);
33+
}
34+
35+
@Override
36+
public void completeRecord(ConsumerRecord<K, V> record) {
37+
completedTopicOffsets.merge(
38+
new TopicPartition(record.topic(), record.partition()),
39+
new OffsetAndMetadata(record.offset() + 1L),
40+
maxBy(comparing(OffsetAndMetadata::offset)));
41+
}
42+
43+
@Override
44+
public Set<TopicPartition> partialCommit() {
45+
consumer.commitSync(completedTopicOffsets);
46+
final Set<TopicPartition> partitions = checkCompletedPartitions();
47+
completedTopicOffsets.clear();
48+
for (TopicPartition p : partitions) {
49+
topicOffsetHighWaterMark.remove(p);
50+
}
51+
return partitions;
52+
}
53+
54+
Set<TopicPartition> checkCompletedPartitions() {
55+
return completedTopicOffsets
56+
.entrySet()
57+
.stream()
58+
.filter(entry -> topicOffsetMeetHighWaterMark(entry.getKey(), entry.getValue()))
59+
.map(Map.Entry::getKey)
60+
.collect(toSet());
61+
}
62+
63+
Map<TopicPartition, Long> topicOffsetHighWaterMark() {
64+
return topicOffsetHighWaterMark;
65+
}
66+
67+
Map<TopicPartition, OffsetAndMetadata> completedTopicOffsets() {
68+
return completedTopicOffsets;
69+
}
70+
71+
private boolean topicOffsetMeetHighWaterMark(TopicPartition topicPartition, OffsetAndMetadata offset) {
72+
final Long offsetHighWaterMark = topicOffsetHighWaterMark.get(topicPartition);
73+
if (offsetHighWaterMark != null) {
74+
return offset.offset() >= offsetHighWaterMark;
75+
}
76+
// maybe this partition revoked before a msg of this partition was processed
77+
return true;
78+
}
79+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package cn.leancloud.kafka.consumer;
2+
3+
import org.apache.kafka.clients.consumer.Consumer;
4+
import org.apache.kafka.common.TopicPartition;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.util.Collections;
9+
import java.util.HashSet;
10+
import java.util.Set;
11+
12+
final class AsyncCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
13+
private static final Logger logger = LoggerFactory.getLogger(AsyncCommitPolicy.class);
14+
15+
private final int maxPendingAsyncCommits;
16+
private int pendingAsyncCommitCounter;
17+
private boolean forceSync;
18+
19+
AsyncCommitPolicy(Consumer<K, V> consumer, int maxPendingAsyncCommits) {
20+
super(consumer);
21+
this.maxPendingAsyncCommits = maxPendingAsyncCommits;
22+
}
23+
24+
@Override
25+
public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
26+
if (!noPendingRecords || completedTopicOffsets.isEmpty()) {
27+
return Collections.emptySet();
28+
}
29+
30+
if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) {
31+
consumer.commitSync();
32+
pendingAsyncCommitCounter = 0;
33+
forceSync = false;
34+
} else {
35+
++pendingAsyncCommitCounter;
36+
consumer.commitAsync((offsets, exception) -> {
37+
--pendingAsyncCommitCounter;
38+
assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter;
39+
if (exception != null) {
40+
logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
41+
forceSync = true;
42+
43+
}
44+
});
45+
}
46+
47+
final Set<TopicPartition> partitions = new HashSet<>(completedTopicOffsets.keySet());
48+
completedTopicOffsets.clear();
49+
topicOffsetHighWaterMark.clear();
50+
return partitions;
51+
}
52+
}

0 commit comments

Comments
 (0)