23
23
import java .io .ByteArrayOutputStream ;
24
24
import java .io .IOException ;
25
25
import java .io .UncheckedIOException ;
26
+ import java .nio .ByteBuffer ;
26
27
import java .nio .charset .StandardCharsets ;
28
+ import java .util .Collections ;
29
+ import java .util .HashMap ;
30
+ import java .util .Map ;
27
31
import java .util .Objects ;
28
32
29
33
/**
30
- * Base Codec class for Avro encoder and decoder implementations
34
+ * Class containing implementation of Apache Avro serializer
31
35
*/
32
- class AvroSchemaRegistryUtils {
33
- private final ClientLogger logger = new ClientLogger ( AvroSchemaRegistryUtils . class ) ;
34
-
36
+ class AvroSerializer {
37
+ private static final Map < Class <?>, Schema > PRIMITIVE_SCHEMAS ;
38
+ private static final Schema NULL_SCHEMA = Schema . create ( Schema . Type . NULL );
35
39
private static final int V1_HEADER_LENGTH = 10 ;
36
40
private static final byte [] V1_HEADER = new byte []{-61 , 1 };
37
41
42
+ private final ClientLogger logger = new ClientLogger (AvroSerializer .class );
38
43
private final boolean avroSpecificReader ;
39
44
private final Schema .Parser parser ;
40
45
private final EncoderFactory encoderFactory ;
41
46
private final DecoderFactory decoderFactory ;
42
47
48
+ static {
49
+ final HashMap <Class <?>, Schema > schemas = new HashMap <>();
50
+
51
+ final Schema booleanSchema = Schema .create (Schema .Type .BOOLEAN );
52
+ schemas .put (Boolean .class , booleanSchema );
53
+ schemas .put (boolean .class , booleanSchema );
54
+
55
+ final Schema intSchema = Schema .create (Schema .Type .INT );
56
+ schemas .put (Integer .class , intSchema );
57
+ schemas .put (int .class , intSchema );
58
+
59
+ final Schema longSchema = Schema .create (Schema .Type .LONG );
60
+ schemas .put (Long .class , longSchema );
61
+ schemas .put (long .class , longSchema );
62
+
63
+ final Schema floatSchema = Schema .create (Schema .Type .FLOAT );
64
+ schemas .put (Float .class , floatSchema );
65
+ schemas .put (float .class , floatSchema );
66
+
67
+ final Schema doubleSchema = Schema .create (Schema .Type .DOUBLE );
68
+ schemas .put (Double .class , doubleSchema );
69
+ schemas .put (double .class , doubleSchema );
70
+
71
+ final Schema byteSchema = Schema .create (Schema .Type .BYTES );
72
+ schemas .put (byte .class , byteSchema );
73
+ schemas .put (Byte .class , byteSchema );
74
+ schemas .put (byte [].class , byteSchema );
75
+ schemas .put (Byte [].class , byteSchema );
76
+
77
+ // This class is abstract but not final.
78
+ schemas .put (ByteBuffer .class , byteSchema );
79
+
80
+ final Schema stringSchema = Schema .create (Schema .Type .STRING );
81
+ schemas .put (String .class , stringSchema );
82
+
83
+ PRIMITIVE_SCHEMAS = Collections .unmodifiableMap (schemas );
84
+ }
85
+
43
86
/**
44
87
* Instantiates AvroCodec instance
45
88
*
@@ -49,7 +92,7 @@ class AvroSchemaRegistryUtils {
49
92
* @param encoderFactory Encoder factory
50
93
* @param decoderFactory Decoder factory
51
94
*/
52
- AvroSchemaRegistryUtils (boolean avroSpecificReader , Schema .Parser parser , EncoderFactory encoderFactory ,
95
+ AvroSerializer (boolean avroSpecificReader , Schema .Parser parser , EncoderFactory encoderFactory ,
53
96
DecoderFactory decoderFactory ) {
54
97
55
98
this .avroSpecificReader = avroSpecificReader ;
@@ -67,41 +110,19 @@ Schema parseSchemaString(String schemaString) {
67
110
return this .parser .parse (schemaString );
68
111
}
69
112
70
- /**
71
- * @param object Schema object used to generate schema string
72
- *
73
- * @return string representation of schema
74
- *
75
- * @see AvroSchemaUtils for distinction between primitive and Avro schema generation
76
- */
77
- String getSchemaString (Object object ) {
78
- Schema schema = AvroSchemaUtils .getSchema (object );
79
- return schema .toString ();
80
- }
81
-
82
- /**
83
- * Returns schema name for storing schemas in schema registry store.
84
- *
85
- * @param object Schema object used to generate schema path
86
- *
87
- * @return schema name as string
88
- *
89
- * @throws IllegalArgumentException if {@code object} is not a primitive type and not of type {@link
90
- * GenericContainer}.
91
- */
92
- String getSchemaName (Object object ) {
93
- return AvroSchemaUtils .getSchema (object ).getFullName ();
94
- }
95
-
96
113
/**
97
114
* Returns A byte[] containing Avro encoding of object parameter.
98
115
*
99
116
* @param object Object to be encoded into byte stream
100
117
*
101
118
* @return A set of bytes that represent the object.
119
+ *
120
+ * @throws IllegalArgumentException If the object is not a serializable type.
121
+ * @throws IllegalStateException if the object could not be serialized to an object stream or there was a
122
+ * runtime exception during serialization.
102
123
*/
103
- <T > byte [] encode (T object ) throws IOException {
104
- final Schema schema = AvroSchemaUtils . getSchema (object );
124
+ <T > byte [] encode (T object ) {
125
+ final Schema schema = getSchema (object );
105
126
106
127
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream ()) {
107
128
if (object instanceof byte []) {
@@ -159,31 +180,31 @@ <T> T decode(byte[] bytes, byte[] schemaBytes, TypeReference<T> typeReference) {
159
180
}
160
181
161
182
/**
162
- * Returns correct reader for decoding payload.
183
+ * Returns Avro schema for specified object, including null values
163
184
*
164
- * @param writerSchema Avro schema fetched from schema registry store
185
+ * @param object object for which Avro schema is being returned
165
186
*
166
- * @return correct Avro DatumReader object given encoder configuration
187
+ * @return Avro schema for object's data structure
188
+ *
189
+ * @throws IllegalArgumentException if object type is unsupported.
167
190
*/
168
- @ SuppressWarnings ( "unchecked" )
169
- private < T > DatumReader < T > getDatumReader ( Schema writerSchema , TypeReference < T > typeReference ) {
170
- boolean writerSchemaIsPrimitive = writerSchema . getType () != null
171
- && AvroSchemaUtils . getPrimitiveSchemas (). containsKey ( writerSchema . getType ());
191
+ static Schema getSchema ( Object object ) {
192
+ if ( object instanceof GenericContainer ) {
193
+ return (( GenericContainer ) object ). getSchema ();
194
+ }
172
195
173
- if (writerSchemaIsPrimitive ) {
174
- if (avroSpecificReader ) {
175
- return new SpecificDatumReader <>(writerSchema );
176
- } else {
177
- return new GenericDatumReader <>(writerSchema );
178
- }
196
+ if (object == null ) {
197
+ return NULL_SCHEMA ;
179
198
}
180
199
181
- // Suppressing this warning because we know that the Type is a representation of the Class<T>
182
- final Class < T > clazz = ( Class < T >) typeReference . getJavaType ( );
183
- if (SpecificRecord . class . isAssignableFrom ( clazz ) ) {
184
- return new SpecificDatumReader <>( writerSchema ) ;
200
+ final Class <?> objectClass = object . getClass ();
201
+ final Schema primitiveSchema = getPrimitiveSchema ( objectClass );
202
+ if (primitiveSchema != null ) {
203
+ return primitiveSchema ;
185
204
} else {
186
- return new GenericDatumReader <>(writerSchema );
205
+ throw new IllegalArgumentException ("Unsupported Avro type. Supported types are null, GenericContainer,"
206
+ + " Boolean, Integer, Long, Float, Double, String, Byte[], Byte, ByteBuffer, and their primitive"
207
+ + " equivalents. Actual: " + objectClass );
187
208
}
188
209
}
189
210
@@ -195,15 +216,64 @@ private <T> DatumReader<T> getDatumReader(Schema writerSchema, TypeReference<T>
195
216
* </ul>
196
217
*
197
218
* @param schemaBytes Bytes to read from.
219
+ *
198
220
* @return true if the object has the single object payload header; false otherwise.
199
221
*
200
222
* @see <a href="https://avro.apache.org/docs/current/spec.html#single_object_encoding">Single Object Encoding</a>
201
223
*/
202
- private static boolean isSingleObjectEncoded (byte [] schemaBytes ) {
224
+ static boolean isSingleObjectEncoded (byte [] schemaBytes ) {
203
225
if (schemaBytes .length < V1_HEADER_LENGTH ) {
204
226
return false ;
205
227
}
206
228
207
229
return V1_HEADER [0 ] == schemaBytes [0 ] && V1_HEADER [1 ] == schemaBytes [1 ];
208
230
}
231
+
232
+ /**
233
+ * Gets a schema for the given class if it is an Avro primitive type.
234
+ *
235
+ * @param clazz Object class
236
+ *
237
+ * @return Matching primitive schema, otherwise {@code null} if it is not.
238
+ */
239
+ private static Schema getPrimitiveSchema (Class <?> clazz ) {
240
+ final Schema schema = PRIMITIVE_SCHEMAS .get (clazz );
241
+ if (schema != null ) {
242
+ return schema ;
243
+ } else if (CharSequence .class .isAssignableFrom (clazz )) {
244
+ return PRIMITIVE_SCHEMAS .get (String .class );
245
+ } else if (ByteBuffer .class .isAssignableFrom (clazz )) {
246
+ return PRIMITIVE_SCHEMAS .get (Byte [].class );
247
+ } else {
248
+ return null ;
249
+ }
250
+ }
251
+
252
+ /**
253
+ * Returns correct reader for decoding payload.
254
+ *
255
+ * @param writerSchema Avro schema fetched from schema registry store
256
+ *
257
+ * @return correct Avro DatumReader object given encoder configuration
258
+ */
259
+ @ SuppressWarnings ("unchecked" )
260
+ private <T > DatumReader <T > getDatumReader (Schema writerSchema , TypeReference <T > typeReference ) {
261
+ // Suppressing this warning because we know that the Type is a representation of the Class<T>
262
+ final Class <T > clazz = (Class <T >) typeReference .getJavaType ();
263
+ final Schema primitiveSchema = getPrimitiveSchema (clazz );
264
+
265
+ if (primitiveSchema != null ) {
266
+ if (avroSpecificReader ) {
267
+ return new SpecificDatumReader <>(writerSchema );
268
+ } else {
269
+ return new GenericDatumReader <>(writerSchema );
270
+ }
271
+ }
272
+
273
+ if (SpecificRecord .class .isAssignableFrom (clazz )) {
274
+ return new SpecificDatumReader <>(writerSchema );
275
+ } else {
276
+ return new GenericDatumReader <>(writerSchema );
277
+ }
278
+ }
209
279
}
0 commit comments