@@ -136,40 +136,3 @@ def __len__(self) -> int:
136136
137137 def __bool__ (self ) -> bool :
138138 return bool (self ._steps )
139-
140-
141- # Do not use multiple threads to encode record batches, as parallelism
142- # should be managed by beam.
143- _ARROW_CODER_IPC_OPTIONS = pa .ipc .IpcWriteOptions (use_threads = False )
144-
145-
146- # TODO(b/190756453): Make this into the upstream
147- # (preference: Arrow, Beam, tfx_bsl).
148- class _ArrowRecordBatchCoder (beam .coders .Coder ):
149- """Custom coder for Arrow record batches."""
150-
151- def encode (self , value : pa .RecordBatch ) -> bytes :
152- sink = pa .BufferOutputStream ()
153- writer = pa .ipc .new_stream (
154- sink , value .schema , options = _ARROW_CODER_IPC_OPTIONS )
155- writer .write_batch (value )
156- writer .close ()
157- return sink .getvalue ().to_pybytes ()
158-
159- def decode (self , encoded : bytes ) -> pa .RecordBatch :
160- reader = pa .ipc .open_stream (encoded )
161- result = reader .read_next_batch ()
162- try :
163- reader .read_next_batch ()
164- except StopIteration :
165- pass
166- else :
167- raise ValueError ("Expected only one RecordBatch in the stream." )
168- return result
169-
170- def to_type_hint (self ):
171- return pa .RecordBatch
172-
173-
174- beam .coders .typecoders .registry .register_coder (pa .RecordBatch ,
175- _ArrowRecordBatchCoder )
0 commit comments