@@ -73,12 +73,16 @@ class AvroDataIO
7373 */
7474 const DEFLATE_CODEC = 'deflate ' ;
7575
76+ /**
77+ * @var string codec value for snappy codec
78+ * cf: http://google.github.io/snappy/
79+ */
80+ const SNAPPY_CODEC = 'snappy ' ;
81+
7682 /**
7783 * @var array array of valid codec names
78- * @todo Avro implementations are required to implement deflate codec as well,
79- * so implement it already!
8084 */
81- private static $ valid_codecs = array (self ::NULL_CODEC );
85+ private static $ valid_codecs = array (self ::NULL_CODEC , self :: DEFLATE_CODEC , self :: SNAPPY_CODEC );
8286
8387 /**
8488 * @var AvroSchema cached version of metadata schema object
@@ -111,13 +115,14 @@ public static function metadata_schema()
111115 * @param string $file_path file_path of file to open
112116 * @param string $mode one of AvroFile::READ_MODE or AvroFile::WRITE_MODE
113117 * @param string $schema_json JSON of writer's schema
118+ * @param string $codec compression codec
114119 * @return AvroDataIOWriter instance of AvroDataIOWriter
115120 *
116121 * @throws AvroDataIOException if $writers_schema is not provided
117122 * or if an invalid $mode is given.
118123 */
119124 public static function open_file ($ file_path , $ mode =AvroFile::READ_MODE ,
120- $ schema_json =null )
125+ $ schema_json =null , $ codec = self :: NULL_CODEC )
121126 {
122127 $ schema = !is_null ($ schema_json )
123128 ? AvroSchema::parse ($ schema_json ) : null ;
@@ -128,7 +133,7 @@ public static function open_file($file_path, $mode=AvroFile::READ_MODE,
128133 if (is_null ($ schema ))
129134 throw new AvroDataIOException ('Writing an Avro file requires a schema. ' );
130135 $ file = new AvroFile ($ file_path , AvroFile::WRITE_MODE );
131- $ io = self ::open_writer ($ file , $ schema );
136+ $ io = self ::open_writer ($ file , $ schema, $ codec );
132137 break ;
133138 case AvroFile::READ_MODE :
134139 $ file = new AvroFile ($ file_path , AvroFile::READ_MODE );
@@ -145,7 +150,7 @@ public static function open_file($file_path, $mode=AvroFile::READ_MODE,
145150 /**
146151 * @return array array of valid codecs
147152 */
148- private static function valid_codecs ()
153+ public static function valid_codecs ()
149154 {
150155 return self ::$ valid_codecs ;
151156 }
@@ -162,12 +167,13 @@ public static function is_valid_codec($codec)
162167 /**
163168 * @param AvroIO $io
164169 * @param AvroSchema $schema
170+ * @param string $codec
165171 * @return AvroDataIOWriter
166172 */
167- protected static function open_writer ($ io , $ schema )
173+ protected static function open_writer ($ io , $ schema, $ codec = self :: NULL_CODEC )
168174 {
169175 $ writer = new AvroIODatumWriter ($ schema );
170- return new AvroDataIOWriter ($ io , $ writer , $ schema );
176+ return new AvroDataIOWriter ($ io , $ writer , $ schema, $ codec );
171177 }
172178
173179 /**
@@ -220,11 +226,18 @@ class AvroDataIOReader
220226 */
221227 private $ block_count ;
222228
229+ /**
230+ * @var string compression codec
231+ */
232+ private $ codec ;
233+
223234 /**
224235 * @param AvroIO $io source from which to read
225236 * @param AvroIODatumReader $datum_reader reader that understands
226237 * the data schema
227238 * @throws AvroDataIOException if $io is not an instance of AvroIO
239+ * or the codec specified in the header
240+ * is not supported
228241 * @uses read_header()
229242 */
230243 public function __construct ($ io , $ datum_reader )
@@ -242,6 +255,7 @@ public function __construct($io, $datum_reader)
242255 AvroDataIO::METADATA_CODEC_ATTR );
243256 if ($ codec && !AvroDataIO::is_valid_codec ($ codec ))
244257 throw new AvroDataIOException (sprintf ('Uknown codec: %s ' , $ codec ));
258+ $ this ->codec = $ codec ;
245259
246260 $ this ->block_count = 0 ;
247261 // FIXME: Seems unsanitary to set writers_schema here.
@@ -276,8 +290,10 @@ private function read_header()
276290 }
277291
278292 /**
279- * @internal Would be nice to implement data() as an iterator, I think
280293 * @return array of data from object container.
294+ * @throws AvroDataIOException
295+ * @throws AvroIOException
296+ * @internal Would be nice to implement data() as an iterator, I think
281297 */
282298 public function data ()
283299 {
@@ -293,9 +309,29 @@ public function data()
293309 if ($ this ->is_eof ())
294310 break ;
295311
296- $ this ->read_block_header ();
312+ $ length = $ this ->read_block_header ();
313+ $ decoder = $ this ->decoder ;
314+ if ($ this ->codec == AvroDataIO::DEFLATE_CODEC ) {
315+ if (!function_exists ('gzinflate ' )) {
316+ throw new AvroDataIOException ('"gzinflate" function not available, "zlib" extension required. ' );
317+ }
318+ $ compressed = $ decoder ->read ($ length );
319+ $ datum = gzinflate ($ compressed );
320+ $ decoder = new AvroIOBinaryDecoder (new AvroStringIO ($ datum ));
321+ } elseif ($ this ->codec == AvroDataIO::SNAPPY_CODEC ) {
322+ if (!function_exists ('snappy_uncompress ' )) {
323+ throw new AvroDataIOException ('"snappy_uncompress" function not available, "snappy" extension required. ' );
324+ }
325+ $ compressed = $ decoder ->read ($ length -4 );
326+ $ datum = snappy_uncompress ($ compressed );
327+ $ crc32 = unpack ('N ' , $ decoder ->read (4 ));
328+ if ($ crc32 [1 ] != crc32 ($ datum )) {
329+ throw new AvroDataIOException ('Invalid CRC32 checksum. ' );
330+ }
331+ $ decoder = new AvroIOBinaryDecoder (new AvroStringIO ($ datum ));
332+ }
297333 }
298- $ data []= $ this ->datum_reader ->read ($ this -> decoder );
334+ $ data []= $ this ->datum_reader ->read ($ decoder );
299335 $ this ->block_count -= 1 ;
300336 }
301337 return $ data ;
@@ -414,13 +450,19 @@ private static function generate_sync_marker()
414450 */
415451 private $ metadata ;
416452
453+ /**
454+ * @var string compression codec
455+ */
456+ private $ codec ;
457+
417458 /**
418459 * @param AvroIO $io
419460 * @param AvroIODatumWriter $datum_writer
420461 * @param AvroSchema $writers_schema
462+ * @param string $codec
421463 * @throws AvroDataIOException
422464 */
423- public function __construct ($ io , $ datum_writer , $ writers_schema =null )
465+ public function __construct ($ io , $ datum_writer , $ writers_schema =null , $ codec =AvroDataIO:: NULL_CODEC )
424466 {
425467 if (!($ io instanceof AvroIO))
426468 throw new AvroDataIOException ('io must be instance of AvroIO ' );
@@ -435,16 +477,21 @@ public function __construct($io, $datum_writer, $writers_schema=null)
435477
436478 if ($ writers_schema )
437479 {
480+ if (!AvroDataIO::is_valid_codec ($ codec ))
481+ throw new AvroDataIOException (
482+ sprintf ('codec %s is not supported ' , $ codec ));
483+
438484 $ this ->sync_marker = self ::generate_sync_marker ();
439- $ this ->metadata [AvroDataIO::METADATA_CODEC_ATTR ] = AvroDataIO:: NULL_CODEC ;
485+ $ this ->metadata [AvroDataIO::METADATA_CODEC_ATTR ] = $ this -> codec = $ codec ;
440486 $ this ->metadata [AvroDataIO::METADATA_SCHEMA_ATTR ] = strval ($ writers_schema );
441487 $ this ->write_header ();
442488 }
443489 else
444490 {
445491 $ dfr = new AvroDataIOReader ($ this ->io , new AvroIODatumReader ());
446492 $ this ->sync_marker = $ dfr ->sync_marker ;
447- $ this ->metadata [AvroDataIO::METADATA_CODEC_ATTR ] = $ dfr ->metadata [AvroDataIO::METADATA_CODEC_ATTR ];
493+ $ this ->metadata [AvroDataIO::METADATA_CODEC_ATTR ] = $ this ->codec
494+ = $ dfr ->metadata [AvroDataIO::METADATA_CODEC_ATTR ];
448495
449496 $ schema_from_file = $ dfr ->metadata [AvroDataIO::METADATA_SCHEMA_ATTR ];
450497 $ this ->metadata [AvroDataIO::METADATA_SCHEMA_ATTR ] = $ schema_from_file ;
@@ -489,27 +536,29 @@ private function flush()
489536
490537 /**
491538 * Writes a block of data to the AvroIO object container.
492- * @throws AvroDataIOException if the codec provided by the encoder
493- * is not supported
494- * @internal Should the codec check happen in the constructor?
495- * Why wait until we're writing data?
496539 */
497540 private function write_block ()
498541 {
499542 if ($ this ->block_count > 0 )
500543 {
501544 $ this ->encoder ->write_long ($ this ->block_count );
502545 $ to_write = strval ($ this ->buffer );
503- $ this ->encoder ->write_long (strlen ($ to_write ));
504546
505- if (AvroDataIO::is_valid_codec (
506- $ this ->metadata [AvroDataIO::METADATA_CODEC_ATTR ]))
507- $ this ->write ($ to_write );
508- else
509- throw new AvroDataIOException (
510- sprintf ('codec %s is not supported ' ,
511- $ this ->metadata [AvroDataIO::METADATA_CODEC_ATTR ]));
547+ if ($ this ->codec == AvroDataIO::DEFLATE_CODEC ) {
548+ if (!function_exists ('gzinflate ' )) {
549+ throw new AvroDataIOException ('"gzinflate" function not available, "zlib" extension required. ' );
550+ }
551+ $ to_write = gzdeflate ($ to_write );
552+ } elseif ($ this ->codec == AvroDataIO::SNAPPY_CODEC ) {
553+ if (!function_exists ('snappy_compress ' )) {
554+ throw new AvroDataIOException ('"snappy_compress" function not available, "snappy" extension required. ' );
555+ }
556+ $ crc32 = pack ('N ' , crc32 ($ to_write ));
557+ $ to_write = snappy_compress ($ to_write ) . $ crc32 ;
558+ }
512559
560+ $ this ->encoder ->write_long (strlen ($ to_write ));
561+ $ this ->write ($ to_write );
513562 $ this ->write ($ this ->sync_marker );
514563 $ this ->buffer ->truncate ();
515564 $ this ->block_count = 0 ;
0 commit comments