Skip to content

Commit 371d31e

Browse files
committed
fix time serde and add data deserialization support
Signed-off-by: Ning Sun <[email protected]>
1 parent b0f17b8 commit 371d31e

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

formats/avro/src/main/java/io/cloudevents/avro/AvroDeserializer.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,12 @@
1919

2020
import java.util.Map;
2121
import java.net.URI;
22-
import java.time.Instant;
2322
import java.time.OffsetDateTime;
24-
import java.time.ZoneOffset;
2523

26-
import io.cloudevents.CloudEvent;
2724
import io.cloudevents.CloudEventData;
2825
import io.cloudevents.SpecVersion;
2926
import io.cloudevents.AvroCloudEvent;
30-
import io.cloudevents.AvroCloudEventData;
31-
import io.cloudevents.core.builder.CloudEventBuilder;
27+
import io.cloudevents.core.data.BytesCloudEventData;
3228
import io.cloudevents.core.v1.CloudEventV1;
3329
import io.cloudevents.rw.CloudEventRWException;
3430
import io.cloudevents.rw.CloudEventReader;
@@ -57,9 +53,7 @@ public <W extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<W, R> w
5753

5854
if (key.equals(CloudEventV1.TIME)) {
5955
// OffsetDateTime
60-
Long timeAsLong = (Long) entry.getValue();
61-
Instant timeAsInstant = Instant.ofEpochMilli(timeAsLong);
62-
OffsetDateTime value = OffsetDateTime.ofInstant(timeAsInstant, ZoneOffset.UTC);
56+
OffsetDateTime value = OffsetDateTime.parse((String) entry.getValue());
6357
writer.withContextAttribute(key, value);
6458

6559
} else if (key.equals(CloudEventV1.DATASCHEMA)) {
@@ -72,8 +66,13 @@ public <W extends CloudEventWriter<R>, R> R read(CloudEventWriterFactory<W, R> w
7266
}
7367
}
7468

75-
// TOOD: data
76-
return writer.end();
69+
byte[] data = (byte[]) this.avroCloudEvent.getData();
70+
71+
if (data != null) {
72+
return writer.end(mapper.map(BytesCloudEventData.wrap(data)));
73+
} else {
74+
return writer.end();
75+
}
7776
}
7877

7978
}

formats/avro/src/main/java/io/cloudevents/avro/AvroFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.cloudevents.CloudEvent;
2525
import io.cloudevents.CloudEventData;
2626
import io.cloudevents.AvroCloudEvent;
27+
import io.cloudevents.core.builder.CloudEventBuilder;
2728
import io.cloudevents.core.format.EventDeserializationException;
2829
import io.cloudevents.core.format.EventFormat;
2930
import io.cloudevents.core.format.EventSerializationException;
@@ -55,7 +56,7 @@ public CloudEvent deserialize(byte[] bytes, CloudEventDataMapper<? extends Cloud
5556
try {
5657
AvroCloudEvent avroCloudEvent = AvroCloudEvent.getDecoder().decode(input);
5758

58-
return AvroDeserializer.fromAvro(avroCloudEvent);
59+
return new AvroDeserializer(avroCloudEvent).read(CloudEventBuilder::fromSpecVersion, mapper);
5960
} catch (IOException e) {
6061
throw new EventDeserializationException(e);
6162
}

formats/avro/src/main/java/io/cloudevents/avro/AvroSerializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ public static final AvroCloudEvent toAvro(CloudEvent e) {
3737
attrs.put(CloudEventV1.SPECVERSION, e.getSpecVersion().toString());
3838
attrs.put(CloudEventV1.ID, e.getId());
3939
attrs.put(CloudEventV1.SOURCE, e.getSource());
40-
// convert to long
41-
attrs.put(CloudEventV1.TIME, e.getTime().toInstant().toEpochMilli());
40+
// convert to string
41+
attrs.put(CloudEventV1.TIME, e.getTime().toString());
4242
// convert
4343
attrs.put(CloudEventV1.DATASCHEMA, e.getDataSchema().toString());
4444
attrs.put(CloudEventV1.SUBJECT, e.getSubject());

0 commit comments

Comments
 (0)