@@ -16,7 +16,7 @@ import (
1616
1717// client is mostly used for testing the destination plugin.
1818type client struct {
19- memoryDB map [string ][]arrow.Record
19+ memoryDB map [string ][]arrow.RecordBatch
2020 tables map [string ]* schema.Table
2121 memoryDBLock sync.RWMutex
2222 errOnWrite bool
@@ -42,7 +42,7 @@ func WithBlockingWrite() Option {
4242
4343func GetNewClient (options ... Option ) plugin.NewClientFunc {
4444 c := & client {
45- memoryDB : make (map [string ][]arrow.Record ),
45+ memoryDB : make (map [string ][]arrow.RecordBatch ),
4646 memoryDBLock : sync.RWMutex {},
4747 tables : map [string ]* schema.Table {
4848 "table1" : {
@@ -112,13 +112,13 @@ func NewMemDBClientErrOnNew(context.Context, zerolog.Logger, []byte, plugin.NewC
112112 return nil , errors .New ("newTestDestinationMemDBClientErrOnNew" )
113113}
114114
115- func (c * client ) overwrite (table * schema.Table , record arrow.Record ) {
115+ func (c * client ) overwrite (table * schema.Table , record arrow.RecordBatch ) {
116116 for i := int64 (0 ); i < record .NumRows (); i ++ {
117117 c .overwriteRow (table , record .NewSlice (i , i + 1 ))
118118 }
119119}
120120
121- func (c * client ) overwriteRow (table * schema.Table , data arrow.Record ) {
121+ func (c * client ) overwriteRow (table * schema.Table , data arrow.RecordBatch ) {
122122 tableName := table .Name
123123 pksIndex := table .PrimaryKeysIndexes ()
124124 if len (pksIndex ) == 0 {
@@ -152,7 +152,7 @@ func (*client) GetSpec() any {
152152 return & Spec {}
153153}
154154
155- func (c * client ) Read (_ context.Context , table * schema.Table , res chan <- arrow.Record ) error {
155+ func (c * client ) Read (_ context.Context , table * schema.Table , res chan <- arrow.RecordBatch ) error {
156156 c .memoryDBLock .RLock ()
157157 defer c .memoryDBLock .RUnlock ()
158158
@@ -196,7 +196,7 @@ func (c *client) migrate(_ context.Context, table *schema.Table) {
196196 tableName := table .Name
197197 memTable := c .memoryDB [tableName ]
198198 if memTable == nil {
199- c .memoryDB [tableName ] = make ([]arrow.Record , 0 )
199+ c .memoryDB [tableName ] = make ([]arrow.RecordBatch , 0 )
200200 c .tables [tableName ] = table
201201 return
202202 }
@@ -206,7 +206,7 @@ func (c *client) migrate(_ context.Context, table *schema.Table) {
206206 if changes == nil {
207207 return
208208 }
209- c .memoryDB [tableName ] = make ([]arrow.Record , 0 )
209+ c .memoryDB [tableName ] = make ([]arrow.RecordBatch , 0 )
210210 c .tables [tableName ] = table
211211}
212212
@@ -253,7 +253,7 @@ func (c *client) Close(context.Context) error {
253253}
254254
255255func (c * client ) deleteStale (_ context.Context , msg * message.WriteDeleteStale ) {
256- var filteredTable []arrow.Record
256+ var filteredTable []arrow.RecordBatch
257257 tableName := msg .TableName
258258 for i , row := range c .memoryDB [tableName ] {
259259 sc := row .Schema ()
@@ -280,7 +280,7 @@ func (c *client) deleteStale(_ context.Context, msg *message.WriteDeleteStale) {
280280}
281281
282282func (c * client ) deleteRecord (_ context.Context , msg * message.WriteDeleteRecord ) {
283- var filteredTable []arrow.Record
283+ var filteredTable []arrow.RecordBatch
284284 tableName := msg .TableName
285285 for i , row := range c .memoryDB [tableName ] {
286286 isMatch := true
@@ -308,15 +308,15 @@ func (c *client) deleteRecord(_ context.Context, msg *message.WriteDeleteRecord)
308308 c .memoryDB [tableName ] = filteredTable
309309}
310310
311- func (* client ) Transform (_ context.Context , _ <- chan arrow.Record , _ chan <- arrow.Record ) error {
311+ func (* client ) Transform (_ context.Context , _ <- chan arrow.RecordBatch , _ chan <- arrow.RecordBatch ) error {
312312 return nil
313313}
314314
315315func (* client ) TransformSchema (_ context.Context , _ * arrow.Schema ) (* arrow.Schema , error ) {
316316 return nil , nil
317317}
318318
319- func evaluatePredicate (pred message.Predicate , record arrow.Record ) bool {
319+ func evaluatePredicate (pred message.Predicate , record arrow.RecordBatch ) bool {
320320 sc := record .Schema ()
321321 indices := sc .FieldIndices (pred .Column )
322322 if len (indices ) == 0 {
0 commit comments