|
17 | 17 |
|
18 | 18 | package io.cloudevents.avro;
|
19 | 19 |
|
| 20 | +import java.util.Map; |
| 21 | +import java.net.URI; |
| 22 | +import java.time.Instant; |
| 23 | +import java.time.OffsetDateTime; |
| 24 | +import java.time.ZoneOffset; |
| 25 | + |
20 | 26 | import io.cloudevents.CloudEvent;
|
| 27 | +import io.cloudevents.CloudEventData; |
21 | 28 | import io.cloudevents.SpecVersion;
|
22 | 29 | import io.cloudevents.AvroCloudEvent;
|
23 | 30 | import io.cloudevents.AvroCloudEventData;
|
24 | 31 | import io.cloudevents.core.builder.CloudEventBuilder;
|
| 32 | +import io.cloudevents.core.v1.CloudEventV1; |
| 33 | +import io.cloudevents.rw.CloudEventRWException; |
| 34 | +import io.cloudevents.rw.CloudEventReader; |
| 35 | +import io.cloudevents.rw.CloudEventDataMapper; |
| 36 | +import io.cloudevents.rw.CloudEventWriter; |
| 37 | +import io.cloudevents.rw.CloudEventWriterFactory; |
| 38 | + |
| 39 | +public class AvroDeserializer implements CloudEventReader { |
| 40 | + |
| 41 | + private final AvroCloudEvent avroCloudEvent; |
| 42 | + |
| 43 | + public AvroDeserializer(AvroCloudEvent avroCloudEvent) { |
| 44 | + this.avroCloudEvent = avroCloudEvent; |
| 45 | + } |
| 46 | + |
| 47 | + @Override |
| 48 | + public <W extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<W, R> writerFactory, |
| 49 | + CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException { |
| 50 | + |
| 51 | + Map<CharSequence, Object> avroCloudEventAttrs = this.avroCloudEvent.getAttribute(); |
| 52 | + SpecVersion specVersion = SpecVersion.parse((String)avroCloudEventAttrs.get(CloudEventV1.SPECVERSION)); |
| 53 | + final CloudEventWriter<R> writer = writerFactory.create(specVersion); |
| 54 | + |
| 55 | + for (Map.Entry<CharSequence, Object> entry: avroCloudEventAttrs.entrySet()) { |
| 56 | + String key = entry.getKey().toString(); |
25 | 57 |
|
26 |
| -public class AvroDeserializer { |
| 58 | + if (key.equals(CloudEventV1.TIME)) { |
| 59 | + // OffsetDateTime |
| 60 | + Long timeAsLong = (Long) entry.getValue(); |
| 61 | + Instant timeAsInstant = Instant.ofEpochMilli(timeAsLong); |
| 62 | + OffsetDateTime value = OffsetDateTime.ofInstant(timeAsInstant, ZoneOffset.UTC); |
| 63 | + writer.withContextAttribute(key, value); |
27 | 64 |
|
28 |
| - public static CloudEvent fromAvro(AvroCloudEvent avroCloudEvent) { |
| 65 | + } else if (key.equals(CloudEventV1.DATASCHEMA)) { |
| 66 | + // URI |
| 67 | + URI value = URI.create((String) entry.getValue()); |
| 68 | + writer.withContextAttribute(key, value); |
| 69 | + } else { |
| 70 | + // String |
| 71 | + writer.withContextAttribute(key, (String) entry.getValue()); |
| 72 | + } |
| 73 | + } |
29 | 74 |
|
| 75 | + // TOOD: data |
| 76 | + return writer.end(); |
30 | 77 | }
|
31 | 78 |
|
32 | 79 | }
|
0 commit comments