Skip to content

Commit bd11010

Browse files
Kafka javadocs (#321)
* Kafka javadocs Signed-off-by: Francesco Guardiani <[email protected]> * Nit Signed-off-by: Francesco Guardiani <[email protected]>
1 parent 3a22557 commit bd11010

File tree

7 files changed

+34
-12
lines changed

7 files changed

+34
-12
lines changed

kafka/src/main/java/io/cloudevents/kafka/CloudEventDeserializer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import java.util.Map;
2828

2929
/**
30-
* Deserializer for {@link CloudEvent}.
30+
* Kafka {@link Deserializer} for {@link CloudEvent}.
3131
* <p>
32-
* To configure the {@link CloudEventDataMapper} to use, you can provide the instance through the configuration key
33-
* {@link CloudEventDeserializer#MAPPER_CONFIG}.
32+
* To configure a {@link CloudEventDataMapper}, you can provide the instance through the configuration key {@link CloudEventDeserializer#MAPPER_CONFIG}.
3433
*/
3534
public class CloudEventDeserializer implements Deserializer<CloudEvent> {
3635

36+
/**
37+
* The configuration key for the {@link CloudEventDataMapper}.
38+
*/
3739
public final static String MAPPER_CONFIG = "cloudevents.datamapper";
3840

3941
private CloudEventDataMapper<? extends CloudEventData> mapper = null;

kafka/src/main/java/io/cloudevents/kafka/CloudEventMessageDeserializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.kafka.common.serialization.Deserializer;
2323

2424
/**
25-
* Deserializer for {@link MessageReader}
25+
* Kafka {@link Deserializer} for {@link MessageReader}.
2626
*/
2727
public class CloudEventMessageDeserializer implements Deserializer<MessageReader> {
2828

kafka/src/main/java/io/cloudevents/kafka/CloudEventMessageSerializer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import java.util.Map;
2626

2727
/**
28-
* Serializer for {@link MessageReader}. This {@link Serializer} can't be used as a key serializer.
28+
* Kafka {@link Serializer} for {@link MessageReader}.
29+
* <p>
30+
* This {@link Serializer} can't be used as a key serializer.
2931
*/
3032
public class CloudEventMessageSerializer implements Serializer<MessageReader> {
3133

kafka/src/main/java/io/cloudevents/kafka/CloudEventSerializer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,23 @@
2828
import java.util.Map;
2929

3030
/**
31-
* Serializer for {@link CloudEvent}.
31+
* Kafka {@link Serializer} for {@link CloudEvent}.
3232
* <p>
33-
* To configure the encoding to serialize the event, you can use the {@link CloudEventSerializer#ENCODING_CONFIG} configuration key,
33+
* To configure the encoding to use when serializing the event, you can use the {@link CloudEventSerializer#ENCODING_CONFIG} configuration key,
3434
* which accepts both a {@link String} or a variant of the enum {@link Encoding}. If you configure the Encoding as {@link Encoding#STRUCTURED},
3535
* you MUST configure the {@link EventFormat} to use with {@link CloudEventSerializer#EVENT_FORMAT_CONFIG}, specifying a {@link String}
3636
* corresponding to the content type of the event format or specifying an instance of {@link EventFormat}.
3737
*/
3838
public class CloudEventSerializer implements Serializer<CloudEvent> {
3939

40+
/**
41+
* The configuration key for the {@link Encoding} to use when serializing the event.
42+
*/
4043
public final static String ENCODING_CONFIG = "cloudevents.serializer.encoding";
44+
45+
/**
46+
* The configuration key for the {@link EventFormat} to use when serializing the event in structured mode.
47+
*/
4148
public final static String EVENT_FORMAT_CONFIG = "cloudevents.serializer.event_format";
4249

4350
private Encoding encoding = Encoding.BINARY;

kafka/src/main/java/io/cloudevents/kafka/KafkaMessageFactory.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package io.cloudevents.kafka;
1919

20+
import io.cloudevents.SpecVersion;
2021
import io.cloudevents.core.message.MessageReader;
2122
import io.cloudevents.core.message.MessageWriter;
2223
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
2324
import io.cloudevents.core.message.impl.MessageUtils;
2425
import io.cloudevents.kafka.impl.KafkaBinaryMessageReaderImpl;
2526
import io.cloudevents.kafka.impl.KafkaHeaders;
2627
import io.cloudevents.kafka.impl.KafkaProducerMessageWriterImpl;
28+
import io.cloudevents.rw.CloudEventRWException;
2729
import io.cloudevents.rw.CloudEventWriter;
2830
import org.apache.kafka.clients.consumer.ConsumerRecord;
2931
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -34,6 +36,9 @@
3436
/**
3537
* This class provides a collection of methods to create {@link io.cloudevents.core.message.MessageReader}
3638
* and {@link io.cloudevents.core.message.MessageWriter} for Kafka Producer and Consumer.
39+
* <p>
40+
* These can be used as an alternative to {@link CloudEventDeserializer} and {@link CloudEventSerializer} to
41+
* manually serialize/deserialize {@link io.cloudevents.CloudEvent} messages.
3742
*/
3843
@ParametersAreNonnullByDefault
3944
public final class KafkaMessageFactory {
@@ -42,20 +47,21 @@ private KafkaMessageFactory() {
4247
}
4348

4449
/**
45-
* Create a {@link io.cloudevents.core.message.MessageReader} to read {@link ConsumerRecord}
50+
* Create a {@link io.cloudevents.core.message.MessageReader} to read {@link ConsumerRecord}.
4651
*
4752
* @param record the record to convert to {@link io.cloudevents.core.message.MessageReader}
4853
* @param <K> the type of the record key
4954
* @return the new {@link io.cloudevents.core.message.MessageReader}
55+
* @throws CloudEventRWException if something goes wrong while resolving the {@link SpecVersion} or if the message has unknown encoding
5056
*/
51-
public static <K> MessageReader createReader(ConsumerRecord<K, byte[]> record) throws IllegalArgumentException {
57+
public static <K> MessageReader createReader(ConsumerRecord<K, byte[]> record) throws CloudEventRWException {
5258
return createReader(record.headers(), record.value());
5359
}
5460

5561
/**
5662
* @see #createReader(ConsumerRecord)
5763
*/
58-
public static MessageReader createReader(Headers headers, byte[] payload) throws IllegalArgumentException {
64+
public static MessageReader createReader(Headers headers, byte[] payload) throws CloudEventRWException {
5965
return MessageUtils.parseStructuredOrBinaryMessage(
6066
() -> KafkaHeaders.getParsedKafkaHeader(headers, KafkaHeaders.CONTENT_TYPE),
6167
format -> new GenericStructuredMessageReader(format, payload),
@@ -65,7 +71,7 @@ public static MessageReader createReader(Headers headers, byte[] payload) throws
6571
}
6672

6773
/**
68-
* Create a {@link io.cloudevents.core.message.MessageWriter} to write a {@link org.apache.kafka.clients.producer.ProducerRecord}
74+
* Create a {@link io.cloudevents.core.message.MessageWriter} to write a {@link org.apache.kafka.clients.producer.ProducerRecord}.
6975
*
7076
* @param topic the topic where to write the record
7177
* @param partition the partition where to write the record

kafka/src/main/java/io/cloudevents/kafka/PartitionKeyExtensionInterceptor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@
1111
* This {@link ProducerInterceptor} implements the partitioning extension,
1212
* as described in the <a href="https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#31-key-mapping">CloudEvents Kafka specification</a>.
1313
* <p>
14-
* When using in your producer, it will pick the {@code partitionkey} extension from the event and will set it as record key.
14+
* When using in your {@link org.apache.kafka.clients.producer.KafkaProducer},
15+
* it will pick the {@code partitionkey} extension from the event and will set it as record key.
1516
* If the extension is missing, It won't replace the key from the original record.
1617
*/
1718
public class PartitionKeyExtensionInterceptor implements ProducerInterceptor<Object, CloudEvent> {
1819

20+
/**
21+
* The extension key of partition key extension.
22+
*/
1923
public static final String PARTITION_KEY_EXTENSION = "partitionkey";
2024

2125
@Override

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@
162162
<link>https://docs.spring.io/spring-framework/docs/current/javadoc-api/</link>
163163
<link>https://vertx.io/docs/apidocs/</link>
164164
<link>https://jakarta.ee/specifications/platform/8/apidocs/</link>
165+
<link>https://kafka.apache.org/25/javadoc/</link>
165166
<link>https://qpid.apache.org/releases/qpid-proton-j-0.33.7/api/</link>
166167
</links>
167168
</configuration>

0 commit comments

Comments
 (0)