From edcda6d82583f6d9a2c6b0ad210c173193ff6886 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 15 Dec 2022 16:27:32 +0100 Subject: [PATCH] publishing pojo as avro (#1) * with this commit it is easy to use Avro as never 1. update library dependencies 2. in Nakadi client builder provide Avro serializer: final NakadiClient client = NakadiClient.newBuilder() ... .serializationSupport(AvroSerializationSupport.newInstance()) .build(); the pojo business / data change events will be serialized to Avro. --- gradle.properties | 3 +- gradle/libs.gradle | 8 +- nakadi-java-avro/build.gradle | 88 ++++++++++ .../avro/AvroPublishingBatchSerializer.java | 85 ++++++++++ .../nakadi/avro/AvroSerializationSupport.java | 71 ++++++++ .../avro/InvalidEventTypeException.java | 7 + .../nakadi/avro/InvalidSchemaException.java | 7 + .../batch.publishing.avsc | 16 ++ .../nakadi-envelope-schema/envelope.avsc | 129 ++++++++++++++ .../AvroPublishingBatchSerializerTest.java | 158 ++++++++++++++++++ .../main/java/nakadi/EventResourceReal.java | 56 +++---- .../java/nakadi/EventTypeResourceReal.java | 1 + .../src/main/java/nakadi/EventTypeSchema.java | 8 +- .../nakadi/JsonPublishingBatchSerializer.java | 22 +++ .../java/nakadi/JsonSerializationSupport.java | 57 +++++++ .../src/main/java/nakadi/NakadiClient.java | 18 +- .../nakadi/PublishingBatchSerializer.java | 9 + .../java/nakadi/SerializationContext.java | 9 + .../java/nakadi/SerializationSupport.java | 11 ++ .../nakadi/metrics/dropwizard/EmptyTest.java | 13 -- 20 files changed, 725 insertions(+), 51 deletions(-) create mode 100644 nakadi-java-avro/build.gradle create mode 100644 nakadi-java-avro/src/main/java/nakadi/avro/AvroPublishingBatchSerializer.java create mode 100644 nakadi-java-avro/src/main/java/nakadi/avro/AvroSerializationSupport.java create mode 100644 nakadi-java-avro/src/main/java/nakadi/avro/InvalidEventTypeException.java create mode 100644 nakadi-java-avro/src/main/java/nakadi/avro/InvalidSchemaException.java create mode 100644 nakadi-java-avro/src/main/resources/nakadi-envelope-schema/batch.publishing.avsc create mode 100644 nakadi-java-avro/src/main/resources/nakadi-envelope-schema/envelope.avsc create mode 100644 nakadi-java-avro/src/test/java/nakadi/avro/AvroPublishingBatchSerializerTest.java create mode 100644 nakadi-java-client/src/main/java/nakadi/JsonPublishingBatchSerializer.java create mode 100644 nakadi-java-client/src/main/java/nakadi/JsonSerializationSupport.java create mode 100644 nakadi-java-client/src/main/java/nakadi/PublishingBatchSerializer.java create mode 100644 nakadi-java-client/src/main/java/nakadi/SerializationContext.java create mode 100644 nakadi-java-client/src/main/java/nakadi/SerializationSupport.java delete mode 100644 nakadi-java-metrics/src/test/java/nakadi/metrics/dropwizard/EmptyTest.java diff --git a/gradle.properties b/gradle.properties index 22f128b9..a999d861 100644 --- a/gradle.properties +++ b/gradle.properties @@ -4,9 +4,8 @@ version=0.18.0 group=net.dehora.nakadi # one per line to keep diffs clean modules=\ + nakadi-java-avro,\ nakadi-java-client,\ nakadi-java-gson,\ nakadi-java-metrics,\ nakadi-java-zign - - diff --git a/gradle/libs.gradle b/gradle/libs.gradle index 6453cb2b..c50385ee 100644 --- a/gradle/libs.gradle +++ b/gradle/libs.gradle @@ -13,7 +13,9 @@ versions += [ rxjava2: "2.0.9", slf4j: "1.8.0-beta2", logback: "1.3.0-alpha5", - mockito: "1.+" + mockito: "1.+", + avro: "1.11.1", + jacksonavro: "2.14.1" ] libs += [ @@ -29,7 +31,9 @@ libs += [ slf4jsimple: "org.slf4j:slf4j-simple:$versions.slf4j", logback_core: "ch.qos.logback:logback-core:$versions.logback", logback_classic: "ch.qos.logback:logback-classic:$versions.logback", - mockito_core: "org.mockito:mockito-core:$versions.mockito" + mockito_core: "org.mockito:mockito-core:$versions.mockito", + avro: "org.apache.avro:avro:$versions.avro", + jacksonavro: "com.fasterxml.jackson.dataformat:jackson-dataformat-avro:$versions.jacksonavro" ] ext { diff --git a/nakadi-java-avro/build.gradle b/nakadi-java-avro/build.gradle new file mode 100644 index 00000000..a7c3843d --- /dev/null +++ b/nakadi-java-avro/build.gradle @@ -0,0 +1,88 @@ +plugins { + id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0" +} + +apply plugin: "com.github.davidmc24.gradle.plugin.avro-base" + +dependencies { + implementation project(path: ':nakadi-java-client', configuration: 'shadow') + // avro is required by gradle avro plugin https://github.com/davidmc24/gradle-avro-plugin + implementation project.libs.avro + implementation project.libs.jacksonavro + + testImplementation project.libs.junit +} + +sourceSets { + main { + java { + srcDirs = ["src/main/java", "build/generated/sources"] + } + } +} + +publishing { + publications { + mavenJava(MavenPublication) { + + artifact(jar) { + } + + artifact(sourceJar) { + } + + artifact(javadocJar) { + } + + pom.withXml { + + def _name = project.name.toString() + + asNode().with { + appendNode('url', 'https://github.com/dehora/nakadi-java') + appendNode('name', _name) + appendNode('description', 'Client driver support') + appendNode('scm').with { + appendNode('url', 'git@github.com:dehora/nakadi-java.git') + } + appendNode('licenses').with { + appendNode('license').with { + appendNode('name', 'MIT License') + appendNode('url', 'https://mit-license.org/') + } + } + } + + def dependenciesNode = asNode().appendNode('dependencies') + configurations.implementation.allDependencies.each { + def dependencyNode = dependenciesNode.appendNode('dependency') + dependencyNode.appendNode('groupId', it.group) + dependencyNode.appendNode('artifactId', it.name) + dependencyNode.appendNode('version', it.version) + } + + def developersNode = asNode().appendNode('developers') + def developerNode = developersNode.appendNode('developer') + developerNode.appendNode('id', developerId) + developerNode.appendNode('email', developerEmail) + developerNode.appendNode('name', developerName) + } + } + } +} + +signing { + sign publishing.publications.mavenJava +} + + +import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask + +def generateAvro = tasks.register("generateAvro", GenerateAvroJavaTask) { + source("src/main/resources/nakadi-envelope-schema", "src/test/resources/avro-schemas") + outputDir = file("build/generated/sources") +} + +tasks.named("compileJava").configure { + source(generateAvro) +} diff --git a/nakadi-java-avro/src/main/java/nakadi/avro/AvroPublishingBatchSerializer.java b/nakadi-java-avro/src/main/java/nakadi/avro/AvroPublishingBatchSerializer.java new file mode 100644 index 00000000..1e548f82 --- /dev/null +++ b/nakadi-java-avro/src/main/java/nakadi/avro/AvroPublishingBatchSerializer.java @@ -0,0 +1,85 @@ +package nakadi.avro; + +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.dataformat.avro.AvroMapper; +import com.fasterxml.jackson.dataformat.avro.AvroSchema; +import nakadi.BusinessEventMapped; +import nakadi.DataChangeEvent; +import nakadi.EventMetadata; +import nakadi.PublishingBatchSerializer; +import nakadi.SerializationContext; +import org.apache.avro.Schema; +import org.zalando.nakadi.generated.avro.Envelope; +import org.zalando.nakadi.generated.avro.Metadata; +import org.zalando.nakadi.generated.avro.PublishingBatch; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * The serializer uses jackson extension to serialize business pojos to avro events. + */ +public class AvroPublishingBatchSerializer implements PublishingBatchSerializer { + + private AvroMapper avroMapper; + private Map objectWriterCache; + + public AvroPublishingBatchSerializer(AvroMapper avroMapper) { + this.avroMapper = avroMapper; + this.objectWriterCache = new ConcurrentHashMap<>(); + } + + @Override + public byte[] toBytes(SerializationContext context, Collection events) { + try { + List envelops = events.stream() + .map(event -> toEnvelope(context, event)) + .collect(Collectors.toList()); + return PublishingBatch.newBuilder().setEvents(envelops) + .build().toByteBuffer().array(); + } catch (IOException io) { + throw new RuntimeException(io); + } + } + + private Envelope toEnvelope(SerializationContext context, T event) { + try { + EventMetadata metadata; + Object data; + if (event instanceof BusinessEventMapped) { + metadata = ((BusinessEventMapped) event).metadata(); + data = ((BusinessEventMapped) event).data(); + } else if (event instanceof DataChangeEvent) { + metadata = ((DataChangeEvent) event).metadata(); + data = ((DataChangeEvent) event).data(); + } else { + throw new InvalidEventTypeException("Unexpected event category `" + + event.getClass() + "` provided during avro serialization"); + } + + byte[] eventBytes = objectWriterCache.computeIfAbsent(context.name(), + (et) -> avroMapper.writer(new AvroSchema(new Schema.Parser().parse(context.schema())))) + .writeValueAsBytes(data); + + return Envelope.newBuilder() + .setMetadata(Metadata.newBuilder() + .setEventType(context.name()) // metadata.eventType ? + .setVersion(context.version()) + .setOccurredAt(metadata.occurredAt().toInstant()) + .setEid(metadata.eid()) + .setPartition(metadata.partition()) + .setPartitionCompactionKey(metadata.partitionCompactionKey()) + .build()) + .setPayload(ByteBuffer.wrap(eventBytes)) + .build(); + } catch (IOException io) { + throw new RuntimeException(io); + } + } + +} diff --git a/nakadi-java-avro/src/main/java/nakadi/avro/AvroSerializationSupport.java b/nakadi-java-avro/src/main/java/nakadi/avro/AvroSerializationSupport.java new file mode 100644 index 00000000..5797bc60 --- /dev/null +++ b/nakadi-java-avro/src/main/java/nakadi/avro/AvroSerializationSupport.java @@ -0,0 +1,71 @@ +package nakadi.avro; + +import com.fasterxml.jackson.dataformat.avro.AvroMapper; +import nakadi.EventType; +import nakadi.EventTypeSchema; +import nakadi.NakadiClient; +import nakadi.SerializationContext; +import nakadi.SerializationSupport; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class AvroSerializationSupport implements SerializationSupport { + + private final AvroPublishingBatchSerializer payloadSerializer; + private final Map contextCache; + + public AvroSerializationSupport(AvroPublishingBatchSerializer payloadSerializer) { + this.payloadSerializer = payloadSerializer; + this.contextCache = new ConcurrentHashMap<>(); + } + + public static SerializationSupport newInstance() { + return new AvroSerializationSupport(new AvroPublishingBatchSerializer(new AvroMapper())); + } + + @Override + public byte[] serializePayload(NakadiClient client, String eventTypeName, Collection events) { + SerializationContext context = contextCache.computeIfAbsent( + eventTypeName, (et) -> new AvroSerializationContext( + client.resources().eventTypes().findByName(et))); + return payloadSerializer.toBytes(context, events); + } + + @Override + public String contentType() { + return "application/avro-binary"; + } + + private static class AvroSerializationContext implements SerializationContext { + + private final EventType eventType; + + private AvroSerializationContext(EventType eventType) { + if (eventType.schema().type() != EventTypeSchema.Type.avro_schema) { + throw new InvalidSchemaException(String.format( + "Event type `%s` schema is `%s`, but expected Avro", + eventType.name(), eventType.schema().type())); + } + + this.eventType = eventType; + } + + @Override + public String name() { + return eventType.name(); + } + + @Override + public String schema() { + return eventType.schema().schema(); + } + + @Override + public String version() { + return eventType.schema().version(); + } + + } +} diff --git a/nakadi-java-avro/src/main/java/nakadi/avro/InvalidEventTypeException.java b/nakadi-java-avro/src/main/java/nakadi/avro/InvalidEventTypeException.java new file mode 100644 index 00000000..52c7e2f1 --- /dev/null +++ b/nakadi-java-avro/src/main/java/nakadi/avro/InvalidEventTypeException.java @@ -0,0 +1,7 @@ +package nakadi.avro; + +public class InvalidEventTypeException extends RuntimeException { + public InvalidEventTypeException(String msg) { + super(msg); + } +} diff --git a/nakadi-java-avro/src/main/java/nakadi/avro/InvalidSchemaException.java b/nakadi-java-avro/src/main/java/nakadi/avro/InvalidSchemaException.java new file mode 100644 index 00000000..65a51649 --- /dev/null +++ b/nakadi-java-avro/src/main/java/nakadi/avro/InvalidSchemaException.java @@ -0,0 +1,7 @@ +package nakadi.avro; + +public class InvalidSchemaException extends RuntimeException { + public InvalidSchemaException(String msg) { + super(msg); + } +} diff --git a/nakadi-java-avro/src/main/resources/nakadi-envelope-schema/batch.publishing.avsc b/nakadi-java-avro/src/main/resources/nakadi-envelope-schema/batch.publishing.avsc new file mode 100644 index 00000000..966800fe --- /dev/null +++ b/nakadi-java-avro/src/main/resources/nakadi-envelope-schema/batch.publishing.avsc @@ -0,0 +1,16 @@ +{ + "name": "PublishingBatch", + "namespace": "org.zalando.nakadi.generated.avro", + "type": "record", + "fields": [ + { + "name": "events", + "type": { + "type": "array", + "items": { + "type": "Envelope" + } + } + } + ] +} diff --git a/nakadi-java-avro/src/main/resources/nakadi-envelope-schema/envelope.avsc b/nakadi-java-avro/src/main/resources/nakadi-envelope-schema/envelope.avsc new file mode 100644 index 00000000..eacf729e --- /dev/null +++ b/nakadi-java-avro/src/main/resources/nakadi-envelope-schema/envelope.avsc @@ -0,0 +1,129 @@ +{ + "name": "Envelope", + "namespace": "org.zalando.nakadi.generated.avro", + "type": "record", + "fields": [ + { + "name": "metadata", + "type": { + "name": "Metadata", + "type": "record", + "doc": "Event metadata defines data about the payload and additional information for Nakadi operations", + "fields": [ + { + "name": "occurred_at", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "eid", + "type": { + "type": "string", + "logicalType": "uuid" + } + }, + { + "name": "flow_id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "received_at", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ], + "default": null + }, + { + "name": "version", + "type": "string" + }, + { + "name": "published_by", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "event_type", + "type": "string" + }, + { + "name": "partition", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "partition_keys", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, + { + "name": "partition_compaction_key", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "parent_eids", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "string", + "logicalType": "uuid" + } + } + ], + "default": null + }, + { + "name": "span_ctx", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "event_owner", + "type": [ + "null", + "string" + ], + "default": null + } + ] + } + }, + { + "name": "payload", + "type": { + "type": "bytes" + } + } + ] +} diff --git a/nakadi-java-avro/src/test/java/nakadi/avro/AvroPublishingBatchSerializerTest.java b/nakadi-java-avro/src/test/java/nakadi/avro/AvroPublishingBatchSerializerTest.java new file mode 100644 index 00000000..748115c0 --- /dev/null +++ b/nakadi-java-avro/src/test/java/nakadi/avro/AvroPublishingBatchSerializerTest.java @@ -0,0 +1,158 @@ +package nakadi.avro; + +import com.fasterxml.jackson.dataformat.avro.AvroMapper; +import com.fasterxml.jackson.dataformat.avro.AvroSchema; +import nakadi.BusinessEventMapped; +import nakadi.EventMetadata; +import nakadi.SerializationContext; +import org.apache.avro.Schema; +import org.junit.Assert; +import org.junit.Test; +import org.zalando.nakadi.generated.avro.PublishingBatch; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Objects; + +public class AvroPublishingBatchSerializerTest { + + private final String schema = "{\"type\":\"record\",\"name\":\"BusinessPayload\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"string\"]},{\"name\":\"b\",\"type\":[\"null\",\"string\"]},{\"name\":\"id\",\"type\":[\"null\",\"string\"]}]}"; + + @Test + public void testToBytes() throws IOException { + BusinessPayload bp = new BusinessPayload("22", "A", "B"); + BusinessEventMapped event = + new BusinessEventMapped() + .metadata(EventMetadata.newPreparedEventMetadata()) + .data(bp); + + AvroPublishingBatchSerializer avroPublishingBatchSerializer = new AvroPublishingBatchSerializer(new AvroMapper()); + byte[] bytesBatch = avroPublishingBatchSerializer.toBytes( + new TestSerializationContext("ad-2022-12-13", schema, "1.0.0"), + Collections.singletonList(event) + ); + + PublishingBatch publishingBatch = PublishingBatch.fromByteBuffer(ByteBuffer.wrap(bytesBatch)); + byte[] eventBytes = publishingBatch.getEvents().get(0).getPayload().array(); + BusinessPayload actual = new AvroMapper().reader( + new AvroSchema(new Schema.Parser().parse(schema))) + .readValue(eventBytes, BusinessPayload.class); + + Assert.assertEquals(bp, actual); + } + + @Test + public void testToBytes() throws IOException { + BusinessPayload bp = new BusinessPayload("22", "A", "B"); + BusinessEventMapped event = + new BusinessEventMapped() + .metadata(EventMetadata.newPreparedEventMetadata()) + .data(bp); + + AvroPublishingBatchSerializer avroPublishingBatchSerializer = new AvroPublishingBatchSerializer(new AvroMapper()); + byte[] bytesBatch = avroPublishingBatchSerializer.toBytes( + new TestSerializationContext("ad-2022-12-13", schema, "1.0.0"), + Collections.singletonList(event) + ); + + PublishingBatch publishingBatch = PublishingBatch.fromByteBuffer(ByteBuffer.wrap(bytesBatch)); + byte[] eventBytes = publishingBatch.getEvents().get(0).getPayload().array(); + BusinessPayload actual = new AvroMapper().reader( + new AvroSchema(new Schema.Parser().parse(schema))) + .readValue(eventBytes, BusinessPayload.class); + + Assert.assertEquals(bp, actual); + } + + + static class BusinessPayload { + String id; + String a; + String b; + + public BusinessPayload() { + } + + public BusinessPayload(String id, String a, String b) { + this.id = id; + this.a = a; + this.b = b; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getA() { + return a; + } + + public void setA(String a) { + this.a = a; + } + + public String getB() { + return b; + } + + public void setB(String b) { + this.b = b; + } + + @Override + public String toString() { + return "BusinessPayload{" + "id='" + id + '\'' + + ", a='" + a + '\'' + + ", b='" + b + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BusinessPayload that = (BusinessPayload) o; + return Objects.equals(id, that.id) && + Objects.equals(a, that.a) && + Objects.equals(b, that.b); + } + + @Override + public int hashCode() { + return Objects.hash(id, a, b); + } + } + + private class TestSerializationContext implements SerializationContext { + + private String name; + private String schema; + private String version; + + public TestSerializationContext(String name, String schema, String version) { + this.name = name; + this.schema = schema; + this.version = version; + } + + @Override + public String name() { + return name; + } + + @Override + public String schema() { + return schema; + } + + @Override + public String version() { + return version; + } + } +} diff --git a/nakadi-java-client/src/main/java/nakadi/EventResourceReal.java b/nakadi-java-client/src/main/java/nakadi/EventResourceReal.java index 36e279ca..a723b04b 100644 --- a/nakadi-java-client/src/main/java/nakadi/EventResourceReal.java +++ b/nakadi-java-client/src/main/java/nakadi/EventResourceReal.java @@ -13,7 +13,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,19 +37,24 @@ public class EventResourceReal implements EventResource { private boolean enablePublishingCompression; private final CompressionSupport compressionSupport; + private final SerializationSupport serializationSupport; public EventResourceReal(NakadiClient client) { - this(client, client.jsonSupport(), client.compressionSupport()); + this(client, client.jsonSupport(), client.compressionSupport(), client.getSerializationSupport()); } @VisibleForTesting - EventResourceReal(NakadiClient client, JsonSupport jsonSupport, CompressionSupport compressionSupport) { + EventResourceReal(NakadiClient client, + JsonSupport jsonSupport, + CompressionSupport compressionSupport, + SerializationSupport serializationSupport) { this.client = client; this.jsonSupport = jsonSupport; this.compressionSupport = compressionSupport; if(client != null && client.enablePublishingCompression()) { this.enablePublishingCompression = true; } + this.serializationSupport = serializationSupport; } private static Response timed(Supplier sender, NakadiClient client, int eventCount) { @@ -109,18 +114,15 @@ public final Response send(String eventTypeName, Collection events) { NakadiException.throwNonNull(events, "Please provide one or more events"); NakadiException.throwNonNull(headers, "Please provide some headers"); - if (events.size() == 0) { + if (events.isEmpty()) { throw new NakadiException(Problem.localProblem("event send called with zero events", "")); } - List> collect = - events.stream().map(e -> new EventRecord<>(eventTypeName, e)).collect(Collectors.toList()); - - if (collect.get(0).event() instanceof String) { + if (new ArrayList<>(events).get(0) instanceof String) { return sendUsingSupplier(eventTypeName, () -> ("[" + Joiner.on(",").join(events) + "]").getBytes(Charsets.UTF_8), headers); } else { - return sendBatchOfEvents(collect, headers); + return sendBatchOfEvents(eventTypeName, events, headers); } } @@ -183,37 +185,27 @@ private Response sendUsingSupplier(String eventTypeName, ContentSupplier supplie 1); } - private Response sendBatchOfEvents(List> events, Map headers) { + private Response sendBatchOfEvents(String eventTypeName, Collection events, Map headers) { NakadiException.throwNonNull(events, "Please provide one or more event records"); - String topic = events.get(0).eventType(); - List eventList = - events.stream().map(this::mapEventRecordToSerdes).collect(Collectors.toList()); - - ContentSupplier supplier; + final ContentSupplier supplier; if(enablePublishingCompression) { - supplier = supplyObjectAsCompressedAndSetHeaders(eventList, headers); + supplier = supplyObjectAsCompressedAndSetHeaders(eventTypeName, events, headers); } else { - supplier = () -> jsonSupport.toJsonBytesCompressed(eventList); + supplier = () -> serializationSupport.serializePayload(client, eventTypeName, events); } - final ContentSupplier finalSupplier = supplier; - // todo: close return timed(() -> client.resourceProvider() .newResource() .retryPolicy(retryPolicy) .postEventsThrowing( - collectionUri(topic).buildString(), + collectionUri(eventTypeName).buildString(), options(headers), - finalSupplier), + supplier), client, - eventList.size()); - } - - @VisibleForTesting Object mapEventRecordToSerdes(EventRecord er) { - return jsonSupport.transformEventRecord(er); + events.size()); } private ResourceOptions options(Map headers) { @@ -223,7 +215,7 @@ private ResourceOptions options(Map headers) { options.flowId(flowId); } options.headers(headers); - ResourceSupport.optionsWithJsonContent(options); + options.header(ResourceOptions.HEADER_CONTENT_TYPE, serializationSupport.contentType()); return options; } @@ -234,9 +226,9 @@ private UriBuilder collectionUri(String topic) { .path(PATH_COLLECTION); } - private ContentSupplier supplyObjectAsCompressedAndSetHeaders(T sending, Map headers) { - final byte[] json = jsonSupport.toJsonBytesCompressed(sending); - return supplyBytesAsCompressedAndSetHeaders(json, headers); + private ContentSupplier supplyObjectAsCompressedAndSetHeaders(String eventTypeName, Collection sending, Map headers) { + final byte[] batchBytes = serializationSupport.serializePayload(client, eventTypeName, sending); + return supplyBytesAsCompressedAndSetHeaders(batchBytes, headers); } private ContentSupplier supplyStringAsCompressedAndSetHeaders(String sending, Map headers) { @@ -249,10 +241,10 @@ private ContentSupplier supplyStringAsCompressedAndSetHeaders(String sending } private ContentSupplier supplyBytesAsCompressedAndSetHeaders( - byte[] json, Map headers) { + byte[] batchBytes, Map headers) { // force the compression outside the lambda to access the length - final byte[] compressed = compressionSupport.compress(json); + final byte[] compressed = compressionSupport.compress(batchBytes); ContentSupplier supplier = () -> compressed; headers.put("Content-Length", compressed.length); headers.put("Content-Encoding", compressionSupport.name()); diff --git a/nakadi-java-client/src/main/java/nakadi/EventTypeResourceReal.java b/nakadi-java-client/src/main/java/nakadi/EventTypeResourceReal.java index cfa49c77..666e5b36 100644 --- a/nakadi-java-client/src/main/java/nakadi/EventTypeResourceReal.java +++ b/nakadi-java-client/src/main/java/nakadi/EventTypeResourceReal.java @@ -1,6 +1,7 @@ package nakadi; import com.google.gson.reflect.TypeToken; + import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collections; diff --git a/nakadi-java-client/src/main/java/nakadi/EventTypeSchema.java b/nakadi-java-client/src/main/java/nakadi/EventTypeSchema.java index 93ca0b44..3f97cf8f 100644 --- a/nakadi-java-client/src/main/java/nakadi/EventTypeSchema.java +++ b/nakadi-java-client/src/main/java/nakadi/EventTypeSchema.java @@ -40,6 +40,11 @@ public String version() { return version; } + public EventTypeSchema version(String version) { + this.version = version; + return this; + } + /** * @return the time the event type was created. */ @@ -71,6 +76,7 @@ public OffsetDateTime createdAt() { } public enum Type { - json_schema + json_schema, + avro_schema } } diff --git a/nakadi-java-client/src/main/java/nakadi/JsonPublishingBatchSerializer.java b/nakadi-java-client/src/main/java/nakadi/JsonPublishingBatchSerializer.java new file mode 100644 index 00000000..5a9e6e0f --- /dev/null +++ b/nakadi-java-client/src/main/java/nakadi/JsonPublishingBatchSerializer.java @@ -0,0 +1,22 @@ +package nakadi; + +import java.util.Collection; +import java.util.stream.Collectors; + +public class JsonPublishingBatchSerializer implements PublishingBatchSerializer { + + private final JsonSupport jsonSupport; + + public JsonPublishingBatchSerializer(final JsonSupport jsonSupport) { + this.jsonSupport = jsonSupport; + } + + @Override + public byte[] toBytes(SerializationContext context, Collection events) { + return jsonSupport.toJsonBytesCompressed(events.stream() + .map(e -> new EventRecord<>(context.name(), e)) + .map(jsonSupport::transformEventRecord) + .collect(Collectors.toList())); + } + +} diff --git a/nakadi-java-client/src/main/java/nakadi/JsonSerializationSupport.java b/nakadi-java-client/src/main/java/nakadi/JsonSerializationSupport.java new file mode 100644 index 00000000..a59b90aa --- /dev/null +++ b/nakadi-java-client/src/main/java/nakadi/JsonSerializationSupport.java @@ -0,0 +1,57 @@ +package nakadi; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class JsonSerializationSupport implements SerializationSupport { + + private JsonPublishingBatchSerializer payloadSerializer; + private Map contextCache; + + public JsonSerializationSupport(JsonPublishingBatchSerializer payloadSerializer) { + this.payloadSerializer = payloadSerializer; + this.contextCache = new ConcurrentHashMap<>(); + } + + public static SerializationSupport newInstance(JsonSupport jsonSupport) { + return new JsonSerializationSupport(new JsonPublishingBatchSerializer(jsonSupport)); + } + + @Override + public byte[] serializePayload(NakadiClient client, String eventTypeName, Collection events) { + SerializationContext context = contextCache.computeIfAbsent(eventTypeName, JsonSerializationContext::new); + return payloadSerializer.toBytes(context, events); + } + + @Override + public String contentType() { + return ResourceSupport.APPLICATION_JSON_CHARSET_UTF_8; + } + + private static class JsonSerializationContext implements SerializationContext { + + private String eventTypeName; + + public JsonSerializationContext(String eventTypeName) { + this.eventTypeName = eventTypeName; + } + + @Override + public String name() { + return eventTypeName; + } + + @Override + public String schema() { + throw new UnsupportedOperationException("Serialization Context does not use JSON schema"); + } + + @Override + public String version() { + throw new UnsupportedOperationException("Serialization Context does not use schema version"); + } + } + + ; +} diff --git a/nakadi-java-client/src/main/java/nakadi/NakadiClient.java b/nakadi-java-client/src/main/java/nakadi/NakadiClient.java index 5f143058..74da1a28 100644 --- a/nakadi-java-client/src/main/java/nakadi/NakadiClient.java +++ b/nakadi-java-client/src/main/java/nakadi/NakadiClient.java @@ -27,6 +27,7 @@ public class NakadiClient { private final String certificatePath; private final boolean enablePublishingCompression; private final CompressionSupport compressionSupport; + private final SerializationSupport serializationSupport; private NakadiClient(Builder builder) { NakadiException.throwNonNull(builder.baseURI, "Please provide a base URI."); @@ -39,6 +40,7 @@ private NakadiClient(Builder builder) { this.certificatePath = builder.certificatePath; this.enablePublishingCompression = builder.enablePublishingCompression; this.compressionSupport = builder.compressionSupport; + this.serializationSupport = builder.serializationSupport; } /** @@ -108,7 +110,11 @@ public Resources resources() { return resources; } - @SuppressWarnings("WeakerAccess") + public SerializationSupport getSerializationSupport() { + return serializationSupport; + } + + @SuppressWarnings("WeakerAccess") public static class Builder { private URI baseURI; @@ -123,6 +129,7 @@ public static class Builder { private boolean enablePublishingCompression; private CompressionSupport compressionSupport; private String certificatePath; + private SerializationSupport serializationSupport; Builder() { connectTimeout = 20_000; @@ -145,6 +152,10 @@ public NakadiClient build() { jsonSupport = new GsonSupport(); } + if (serializationSupport == null) { + serializationSupport = JsonSerializationSupport.newInstance(jsonSupport); + } + if (compressionSupport == null) { compressionSupport = new CompressionSupportGzip(); } @@ -311,5 +322,10 @@ public Builder jsonSupport(JsonSupport jsonSupport) { this.jsonSupport = jsonSupport; return this; } + + public Builder serializationSupport(SerializationSupport serializationSupport) { + this.serializationSupport = serializationSupport; + return this; + } } } diff --git a/nakadi-java-client/src/main/java/nakadi/PublishingBatchSerializer.java b/nakadi-java-client/src/main/java/nakadi/PublishingBatchSerializer.java new file mode 100644 index 00000000..af3e26d9 --- /dev/null +++ b/nakadi-java-client/src/main/java/nakadi/PublishingBatchSerializer.java @@ -0,0 +1,9 @@ +package nakadi; + +import java.util.Collection; + +public interface PublishingBatchSerializer { + + byte[] toBytes(SerializationContext context, Collection events); + +} diff --git a/nakadi-java-client/src/main/java/nakadi/SerializationContext.java b/nakadi-java-client/src/main/java/nakadi/SerializationContext.java new file mode 100644 index 00000000..720e8e08 --- /dev/null +++ b/nakadi-java-client/src/main/java/nakadi/SerializationContext.java @@ -0,0 +1,9 @@ +package nakadi; + +public interface SerializationContext { + String name(); + + String schema(); + + String version(); +} diff --git a/nakadi-java-client/src/main/java/nakadi/SerializationSupport.java b/nakadi-java-client/src/main/java/nakadi/SerializationSupport.java new file mode 100644 index 00000000..6db817fe --- /dev/null +++ b/nakadi-java-client/src/main/java/nakadi/SerializationSupport.java @@ -0,0 +1,11 @@ +package nakadi; + +import java.util.Collection; + +public interface SerializationSupport { + + byte[] serializePayload(NakadiClient client, String eventTypeName, Collection events); + + String contentType(); + +} diff --git a/nakadi-java-metrics/src/test/java/nakadi/metrics/dropwizard/EmptyTest.java b/nakadi-java-metrics/src/test/java/nakadi/metrics/dropwizard/EmptyTest.java deleted file mode 100644 index 89747f8a..00000000 --- a/nakadi-java-metrics/src/test/java/nakadi/metrics/dropwizard/EmptyTest.java +++ /dev/null @@ -1,13 +0,0 @@ -package nakadi.metrics.dropwizard; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class EmptyTest { - - @Test - public void test() { - assertEquals("a", "a"); - } -}