@@ -11,16 +11,16 @@ import (
1111 "github.com/apache/arrow/go/v13/arrow"
1212 "github.com/apache/arrow/go/v13/arrow/array"
1313 "github.com/cloudquery/plugin-pb-go/specs"
14- "github.com/cloudquery/plugin-sdk/v2 /plugins/destination"
15- "github.com/cloudquery/plugin-sdk/v2 /schema"
14+ "github.com/cloudquery/plugin-sdk/v3 /plugins/destination"
15+ "github.com/cloudquery/plugin-sdk/v3 /schema"
1616 "github.com/rs/zerolog"
1717)
1818
1919// client is mostly used for testing the destination plugin.
2020type client struct {
2121 spec specs.Destination
2222 memoryDB map [string ][]arrow.Record
23- tables map [string ]* arrow. Schema
23+ tables map [string ]* schema. Table
2424 memoryDBLock sync.RWMutex
2525 errOnWrite bool
2626 blockingWrite bool
@@ -64,7 +64,7 @@ func getTestLogger(t *testing.T) zerolog.Logger {
6464func NewClient (_ context.Context , _ zerolog.Logger , spec specs.Destination ) (destination.Client , error ) {
6565 return & client {
6666 memoryDB : make (map [string ][]arrow.Record ),
67- tables : make (map [string ]* arrow. Schema ),
67+ tables : make (map [string ]* schema. Table ),
6868 spec : spec ,
6969 }, nil
7070}
@@ -73,9 +73,9 @@ func NewClientErrOnNew(context.Context, zerolog.Logger, specs.Destination) (dest
7373 return nil , fmt .Errorf ("newTestDestinationMemDBClientErrOnNew" )
7474}
7575
76- func (c * client ) overwrite (table * arrow. Schema , data arrow.Record ) {
77- pksIndex := schema . PrimaryKeyIndices ( table )
78- tableName := schema . TableName ( table )
76+ func (c * client ) overwrite (table * schema. Table , data arrow.Record ) {
77+ pksIndex := table . PrimaryKeysIndexes ( )
78+ tableName := table . Name
7979 for i , row := range c .memoryDB [tableName ] {
8080 found := true
8181 for _ , pkIndex := range pksIndex {
@@ -94,16 +94,17 @@ func (c *client) overwrite(table *arrow.Schema, data arrow.Record) {
9494 c .memoryDB [tableName ] = append (c .memoryDB [tableName ], data )
9595}
9696
97- func (c * client ) Migrate (_ context.Context , tables schema.Schemas ) error {
97+ func (c * client ) Migrate (_ context.Context , tables schema.Tables ) error {
9898 for _ , table := range tables {
99- tableName := schema . TableName ( table )
99+ tableName := table . Name
100100 memTable := c .memoryDB [tableName ]
101101 if memTable == nil {
102102 c .memoryDB [tableName ] = make ([]arrow.Record , 0 )
103103 c .tables [tableName ] = table
104104 continue
105105 }
106- changes := schema .GetSchemaChanges (table , c .tables [tableName ])
106+
107+ changes := table .GetChanges (c .tables [tableName ])
107108 // memdb doesn't support any auto-migrate
108109 if changes == nil {
109110 continue
@@ -114,16 +115,15 @@ func (c *client) Migrate(_ context.Context, tables schema.Schemas) error {
114115 return nil
115116}
116117
117- func (c * client ) Read (_ context.Context , table * arrow. Schema , source string , res chan <- arrow.Record ) error {
118- tableName := schema . TableName ( table )
118+ func (c * client ) Read (_ context.Context , table * schema. Table , source string , res chan <- arrow.Record ) error {
119+ tableName := table . Name
119120 if c .memoryDB [tableName ] == nil {
120121 return nil
121122 }
122- indices := table .FieldIndices (schema .CqSourceNameColumn .Name )
123- if len ( indices ) == 0 {
123+ sourceColIndex := table .Columns . Index (schema .CqSourceNameColumn .Name )
124+ if sourceColIndex == - 1 {
124125 return fmt .Errorf ("table %s doesn't have source column" , tableName )
125126 }
126- sourceColIndex := indices [0 ]
127127 var sortedRes []arrow.Record
128128 c .memoryDBLock .RLock ()
129129 for _ , row := range c .memoryDB [tableName ] {
@@ -140,7 +140,7 @@ func (c *client) Read(_ context.Context, table *arrow.Schema, source string, res
140140 return nil
141141}
142142
143- func (c * client ) Write (ctx context.Context , _ schema.Schemas , resources <- chan arrow.Record ) error {
143+ func (c * client ) Write (ctx context.Context , _ schema.Tables , resources <- chan arrow.Record ) error {
144144 if c .errOnWrite {
145145 return fmt .Errorf ("errOnWrite" )
146146 }
@@ -154,21 +154,23 @@ func (c *client) Write(ctx context.Context, _ schema.Schemas, resources <-chan a
154154
155155 for resource := range resources {
156156 c .memoryDBLock .Lock ()
157- tableName , err := schema .TableNameFromSchema (resource .Schema ())
158- if err != nil {
159- return err
157+ sc := resource .Schema ()
158+ tableName , ok := sc .Metadata ().GetValue (schema .MetadataTableName )
159+ if ! ok {
160+ return fmt .Errorf ("table name not found in schema metadata" )
160161 }
162+ table := c .tables [tableName ]
161163 if c .spec .WriteMode == specs .WriteModeAppend {
162164 c .memoryDB [tableName ] = append (c .memoryDB [tableName ], resource )
163165 } else {
164- c .overwrite (resource . Schema () , resource )
166+ c .overwrite (table , resource )
165167 }
166168 c .memoryDBLock .Unlock ()
167169 }
168170 return nil
169171}
170172
171- func (c * client ) WriteTableBatch (ctx context.Context , table * arrow. Schema , resources []arrow.Record ) error {
173+ func (c * client ) WriteTableBatch (ctx context.Context , table * schema. Table , resources []arrow.Record ) error {
172174 if c .errOnWrite {
173175 return fmt .Errorf ("errOnWrite" )
174176 }
@@ -179,7 +181,7 @@ func (c *client) WriteTableBatch(ctx context.Context, table *arrow.Schema, resou
179181 }
180182 return nil
181183 }
182- tableName := schema . TableName ( table )
184+ tableName := table . Name
183185 for _ , resource := range resources {
184186 c .memoryDBLock .Lock ()
185187 if c .spec .WriteMode == specs .WriteModeAppend {
@@ -201,17 +203,17 @@ func (c *client) Close(context.Context) error {
201203 return nil
202204}
203205
204- func (c * client ) DeleteStale (ctx context.Context , tables schema.Schemas , source string , syncTime time.Time ) error {
206+ func (c * client ) DeleteStale (ctx context.Context , tables schema.Tables , source string , syncTime time.Time ) error {
205207 for _ , table := range tables {
206208 c .deleteStaleTable (ctx , table , source , syncTime )
207209 }
208210 return nil
209211}
210212
211- func (c * client ) deleteStaleTable (_ context.Context , table * arrow. Schema , source string , syncTime time.Time ) {
212- sourceColIndex := table .FieldIndices (schema .CqSourceNameColumn .Name )[ 0 ]
213- syncColIndex := table .FieldIndices (schema .CqSyncTimeColumn .Name )[ 0 ]
214- tableName := schema . TableName ( table )
213+ func (c * client ) deleteStaleTable (_ context.Context , table * schema. Table , source string , syncTime time.Time ) {
214+ sourceColIndex := table .Columns . Index (schema .CqSourceNameColumn .Name )
215+ syncColIndex := table .Columns . Index (schema .CqSyncTimeColumn .Name )
216+ tableName := table . Name
215217 var filteredTable []arrow.Record
216218 for i , row := range c .memoryDB [tableName ] {
217219 if row .Column (sourceColIndex ).(* array.String ).Value (0 ) == source {
0 commit comments