55import com .google .protobuf .ByteString ;
66import io .cloudquery .schema .Column ;
77import io .cloudquery .schema .Table ;
8+ import io .cloudquery .schema .Table .TableBuilder ;
89import java .io .ByteArrayOutputStream ;
910import java .io .IOException ;
1011import java .nio .channels .Channels ;
12+ import java .util .ArrayList ;
1113import java .util .HashMap ;
1214import java .util .List ;
1315import java .util .Map ;
1416import org .apache .arrow .memory .BufferAllocator ;
1517import org .apache .arrow .memory .RootAllocator ;
1618import org .apache .arrow .vector .VectorSchemaRoot ;
19+ import org .apache .arrow .vector .ipc .ArrowReader ;
20+ import org .apache .arrow .vector .ipc .ArrowStreamReader ;
1721import org .apache .arrow .vector .ipc .ArrowStreamWriter ;
1822import org .apache .arrow .vector .types .pojo .Field ;
1923import org .apache .arrow .vector .types .pojo .Schema ;
2024
2125public class ArrowHelper {
26+ public static final String CQ_TABLE_NAME = "cq:table_name" ;
27+ public static final String CQ_TABLE_TITLE = "cq:table_title" ;
28+ public static final String CQ_TABLE_DESCRIPTION = "cq:table_description" ;
29+ public static final String CQ_TABLE_DEPENDS_ON = "cq:table_depends_on" ;
30+
2231 public static ByteString encode (Table table ) throws IOException {
2332 try (BufferAllocator bufferAllocator = new RootAllocator ()) {
2433 Schema schema = toArrowSchema (table );
@@ -34,6 +43,15 @@ public static ByteString encode(Table table) throws IOException {
3443 }
3544 }
3645
46+ public static Table decode (ByteString byteString ) throws IOException {
47+ try (BufferAllocator bufferAllocator = new RootAllocator ()) {
48+ try (ArrowReader reader = new ArrowStreamReader (byteString .newInput (), bufferAllocator )) {
49+ VectorSchemaRoot vectorSchemaRoot = reader .getVectorSchemaRoot ();
50+ return fromArrowSchema (vectorSchemaRoot .getSchema ());
51+ }
52+ }
53+ }
54+
3755 public static Schema toArrowSchema (Table table ) {
3856 List <Column > columns = table .getColumns ();
3957 Field [] fields = new Field [columns .size ()];
@@ -43,16 +61,42 @@ public static Schema toArrowSchema(Table table) {
4361 fields [i ] = field ;
4462 }
4563 Map <String , String > metadata = new HashMap <>();
46- metadata .put ("cq:table_name" , table .getName ());
64+ metadata .put (CQ_TABLE_NAME , table .getName ());
4765 if (table .getTitle () != null ) {
48- metadata .put ("cq:table_title" , table .getTitle ());
66+ metadata .put (CQ_TABLE_TITLE , table .getTitle ());
4967 }
5068 if (table .getDescription () != null ) {
51- metadata .put ("cq:table_description" , table .getDescription ());
69+ metadata .put (CQ_TABLE_DESCRIPTION , table .getDescription ());
5270 }
5371 if (table .getParent () != null ) {
54- metadata .put ("cq:table_depends_on" , table .getParent ().getName ());
72+ metadata .put (CQ_TABLE_DEPENDS_ON , table .getParent ().getName ());
5573 }
5674 return new Schema (asList (fields ), metadata );
5775 }
76+
77+ public static Table fromArrowSchema (Schema schema ) {
78+ List <Column > columns = new ArrayList <>();
79+ for (Field field : schema .getFields ()) {
80+ columns .add (Column .builder ().name (field .getName ()).type (field .getType ()).build ());
81+ }
82+
83+ Map <String , String > metaData = schema .getCustomMetadata ();
84+ String name = metaData .get (CQ_TABLE_NAME );
85+ String title = metaData .get (CQ_TABLE_TITLE );
86+ String description = metaData .get (CQ_TABLE_DESCRIPTION );
87+ String parent = metaData .get (CQ_TABLE_DEPENDS_ON );
88+
89+ TableBuilder tableBuilder = Table .builder ().name (name ).columns (columns );
90+ if (title != null ) {
91+ tableBuilder .title (title );
92+ }
93+ if (description != null ) {
94+ tableBuilder .description (description );
95+ }
96+ if (parent != null ) {
97+ tableBuilder .parent (Table .builder ().name (parent ).build ());
98+ }
99+
100+ return tableBuilder .build ();
101+ }
58102}
0 commit comments