|
1 | 1 | import pyarrow as pa |
2 | 2 | import structlog |
3 | 3 |
|
4 | | -from cloudquery.plugin_v3 import plugin_pb2, plugin_pb2_grpc |
| 4 | +from cloudquery.plugin_v3 import plugin_pb2, plugin_pb2_grpc, arrow |
5 | 5 | from cloudquery.sdk.message import SyncInsertMessage, SyncMigrateTableMessage |
6 | 6 | from cloudquery.sdk.plugin.plugin import Plugin, SyncOptions, TableOptions |
7 | 7 | from cloudquery.sdk.schema import tables_to_arrow_schemas |
@@ -48,19 +48,14 @@ def Sync(self, request, context): |
48 | 48 |
|
49 | 49 | for msg in self._plugin.sync(options): |
50 | 50 | if isinstance(msg, SyncInsertMessage): |
51 | | - sink = pa.BufferOutputStream() |
52 | | - writer = pa.ipc.new_stream(sink, msg.record.schema) |
53 | | - writer.write_batch(msg.record) |
54 | | - writer.close() |
55 | | - buf = sink.getvalue().to_pybytes() |
| 51 | + buf = arrow.record_to_bytes(msg.record) |
56 | 52 | yield plugin_pb2.Sync.Response( |
57 | 53 | insert=plugin_pb2.Sync.MessageInsert(record=buf) |
58 | 54 | ) |
59 | 55 | elif isinstance(msg, SyncMigrateTableMessage): |
| 56 | + buf = arrow.schema_to_bytes(msg.schema) |
60 | 57 | yield plugin_pb2.Sync.Response( |
61 | | - migrate_table=plugin_pb2.Sync.MessageMigrateTable( |
62 | | - table=msg.table.to_arrow_schema().serialize().to_pybytes() |
63 | | - ) |
| 58 | + migrate_table=plugin_pb2.Sync.MessageMigrateTable(table=buf) |
64 | 59 | ) |
65 | 60 | else: |
66 | 61 | # unknown sync message type |
|
0 commit comments