Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ version=0.18.0
group=net.dehora.nakadi
# one per line to keep diffs clean
modules=\
nakadi-java-avro,\
nakadi-java-client,\
nakadi-java-gson,\
nakadi-java-metrics,\
nakadi-java-zign


8 changes: 6 additions & 2 deletions gradle/libs.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ versions += [
rxjava2: "2.0.9",
slf4j: "1.8.0-beta2",
logback: "1.3.0-alpha5",
mockito: "1.+"
mockito: "1.+",
avro: "1.11.1",
jacksonavro: "2.14.1"
]

libs += [
Expand All @@ -29,7 +31,9 @@ libs += [
slf4jsimple: "org.slf4j:slf4j-simple:$versions.slf4j",
logback_core: "ch.qos.logback:logback-core:$versions.logback",
logback_classic: "ch.qos.logback:logback-classic:$versions.logback",
mockito_core: "org.mockito:mockito-core:$versions.mockito"
mockito_core: "org.mockito:mockito-core:$versions.mockito",
avro: "org.apache.avro:avro:$versions.avro",
jacksonavro: "com.fasterxml.jackson.dataformat:jackson-dataformat-avro:$versions.jacksonavro"
]

ext {
Expand Down
88 changes: 88 additions & 0 deletions nakadi-java-avro/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
plugins {
id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0"
}

apply plugin: "com.github.davidmc24.gradle.plugin.avro-base"

dependencies {
implementation project(path: ':nakadi-java-client', configuration: 'shadow')
// avro is required by gradle avro plugin https://github.com/davidmc24/gradle-avro-plugin
implementation project.libs.avro
implementation project.libs.jacksonavro

testImplementation project.libs.junit
}

sourceSets {
main {
java {
srcDirs = ["src/main/java", "build/generated/sources"]
}
}
}

publishing {
publications {
mavenJava(MavenPublication) {

artifact(jar) {
}

artifact(sourceJar) {
}

artifact(javadocJar) {
}

pom.withXml {

def _name = project.name.toString()

asNode().with {
appendNode('url', 'https://github.com/dehora/nakadi-java')
appendNode('name', _name)
appendNode('description', 'Client driver support')
appendNode('scm').with {
appendNode('url', '[email protected]:dehora/nakadi-java.git')
}
appendNode('licenses').with {
appendNode('license').with {
appendNode('name', 'MIT License')
appendNode('url', 'https://mit-license.org/')
}
}
}

def dependenciesNode = asNode().appendNode('dependencies')
configurations.implementation.allDependencies.each {
def dependencyNode = dependenciesNode.appendNode('dependency')
dependencyNode.appendNode('groupId', it.group)
dependencyNode.appendNode('artifactId', it.name)
dependencyNode.appendNode('version', it.version)
}

def developersNode = asNode().appendNode('developers')
def developerNode = developersNode.appendNode('developer')
developerNode.appendNode('id', developerId)
developerNode.appendNode('email', developerEmail)
developerNode.appendNode('name', developerName)
}
}
}
}

signing {
sign publishing.publications.mavenJava
}


import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask

def generateAvro = tasks.register("generateAvro", GenerateAvroJavaTask) {
source("src/main/resources/nakadi-envelope-schema", "src/test/resources/avro-schemas")
outputDir = file("build/generated/sources")
}

tasks.named("compileJava").configure {
source(generateAvro)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package nakadi.avro;

import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import nakadi.BusinessEventMapped;
import nakadi.DataChangeEvent;
import nakadi.EventMetadata;
import nakadi.PublishingBatchSerializer;
import nakadi.SerializationContext;
import org.apache.avro.Schema;
import org.zalando.nakadi.generated.avro.Envelope;
import org.zalando.nakadi.generated.avro.Metadata;
import org.zalando.nakadi.generated.avro.PublishingBatch;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* The serializer uses jackson extension to serialize business pojos to avro events.
*/
public class AvroPublishingBatchSerializer implements PublishingBatchSerializer {

private AvroMapper avroMapper;
private Map<String, ObjectWriter> objectWriterCache;

public AvroPublishingBatchSerializer(AvroMapper avroMapper) {
this.avroMapper = avroMapper;
this.objectWriterCache = new ConcurrentHashMap<>();
}

@Override
public <T> byte[] toBytes(SerializationContext context, Collection<T> events) {
try {
List<Envelope> envelops = events.stream()
.map(event -> toEnvelope(context, event))
.collect(Collectors.toList());
return PublishingBatch.newBuilder().setEvents(envelops)
.build().toByteBuffer().array();
} catch (IOException io) {
throw new RuntimeException(io);
}
}

private <T> Envelope toEnvelope(SerializationContext context, T event) {
try {
EventMetadata metadata;
Object data;
if (event instanceof BusinessEventMapped) {
metadata = ((BusinessEventMapped) event).metadata();
data = ((BusinessEventMapped) event).data();
} else if (event instanceof DataChangeEvent) {
metadata = ((DataChangeEvent) event).metadata();
data = ((DataChangeEvent) event).data();
} else {
throw new InvalidEventTypeException("Unexpected event category `" +
event.getClass() + "` provided during avro serialization");
}

byte[] eventBytes = objectWriterCache.computeIfAbsent(context.name(),
(et) -> avroMapper.writer(new AvroSchema(new Schema.Parser().parse(context.schema()))))
.writeValueAsBytes(data);

return Envelope.newBuilder()
.setMetadata(Metadata.newBuilder()
.setEventType(context.name()) // metadata.eventType ?
.setVersion(context.version())
.setOccurredAt(metadata.occurredAt().toInstant())
.setEid(metadata.eid())
.setPartition(metadata.partition())
.setPartitionCompactionKey(metadata.partitionCompactionKey())
.build())
.setPayload(ByteBuffer.wrap(eventBytes))
.build();
} catch (IOException io) {
throw new RuntimeException(io);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package nakadi.avro;

import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import nakadi.EventType;
import nakadi.EventTypeSchema;
import nakadi.NakadiClient;
import nakadi.SerializationContext;
import nakadi.SerializationSupport;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class AvroSerializationSupport implements SerializationSupport {

private final AvroPublishingBatchSerializer payloadSerializer;
private final Map<String, SerializationContext> contextCache;

public AvroSerializationSupport(AvroPublishingBatchSerializer payloadSerializer) {
this.payloadSerializer = payloadSerializer;
this.contextCache = new ConcurrentHashMap<>();
}

public static SerializationSupport newInstance() {
return new AvroSerializationSupport(new AvroPublishingBatchSerializer(new AvroMapper()));
}

@Override
public <T> byte[] serializePayload(NakadiClient client, String eventTypeName, Collection<T> events) {
SerializationContext context = contextCache.computeIfAbsent(
eventTypeName, (et) -> new AvroSerializationContext(
client.resources().eventTypes().findByName(et)));
return payloadSerializer.toBytes(context, events);
}

@Override
public String contentType() {
return "application/avro-binary";
}

private static class AvroSerializationContext implements SerializationContext {

private final EventType eventType;

private AvroSerializationContext(EventType eventType) {
if (eventType.schema().type() != EventTypeSchema.Type.avro_schema) {
throw new InvalidSchemaException(String.format(
"Event type `%s` schema is `%s`, but expected Avro",
eventType.name(), eventType.schema().type()));
}

this.eventType = eventType;
}

@Override
public String name() {
return eventType.name();
}

@Override
public String schema() {
return eventType.schema().schema();
}

@Override
public String version() {
return eventType.schema().version();
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package nakadi.avro;

public class InvalidEventTypeException extends RuntimeException {
public InvalidEventTypeException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package nakadi.avro;

public class InvalidSchemaException extends RuntimeException {
public InvalidSchemaException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "PublishingBatch",
"namespace": "org.zalando.nakadi.generated.avro",
"type": "record",
"fields": [
{
"name": "events",
"type": {
"type": "array",
"items": {
"type": "Envelope"
}
}
}
]
}
Loading