Skip to content

Commit caa9dea

Browse files
committed
feat: Add source plugin version to table metadata.
1 parent 9558e99 commit caa9dea

File tree

5 files changed

+112
-1
lines changed

5 files changed

+112
-1
lines changed

internal/servers/plugin/v3/plugin.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ func (s *Server) GetTables(ctx context.Context, req *pb.GetTables_Request) (*pb.
3939
if err != nil {
4040
return nil, status.Errorf(codes.Internal, "failed to get tables: %v", err)
4141
}
42+
pluginVersion := s.Plugin.Version()
43+
tables.SetSourcePluginVersion(pluginVersion)
4244
schemas := tables.ToArrowSchemas()
4345
encoded := make([][]byte, len(schemas))
4446
for i, sc := range schemas {
@@ -203,6 +205,9 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
203205
pbMsg := &pb.Sync_Response{}
204206
switch m := msg.(type) {
205207
case *message.SyncMigrateTable:
208+
pluginVersion := s.Plugin.Version()
209+
m.Table.SourcePluginVersion = pluginVersion
210+
m.Table.Relations.SetSourcePluginVersion(pluginVersion)
206211
tableSchema := m.Table.ToArrowSchema()
207212
schemaBytes, err := pb.SchemaToBytes(tableSchema)
208213
if err != nil {

internal/servers/plugin/v3/plugin_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,32 @@ func TestGetVersion(t *testing.T) {
5050
}
5151
}
5252

53+
func TestGetTables(t *testing.T) {
54+
ctx := context.Background()
55+
pluginVersion := "v1.2.3"
56+
s := Server{
57+
Plugin: plugin.NewPlugin("test", pluginVersion, memdb.NewMemDBClient),
58+
}
59+
60+
_, err := s.Init(ctx, &pb.Init_Request{})
61+
require.NoError(t, err)
62+
63+
res, err := s.GetTables(ctx, &pb.GetTables_Request{Tables: []string{"*"}})
64+
require.NoError(t, err)
65+
require.NotNil(t, res)
66+
require.Greater(t, len(res.Tables), 0, "expected at least one table")
67+
68+
// Verify that the plugin version is included in the schema metadata
69+
for _, tableBytes := range res.Tables {
70+
sc, err := pb.NewSchemaFromBytes(tableBytes)
71+
require.NoError(t, err)
72+
73+
version, found := sc.Metadata().GetValue(schema.MetadataTablePluginVersion)
74+
require.True(t, found, "expected plugin version to be in schema metadata")
75+
require.Equal(t, pluginVersion, version, "expected plugin version to match")
76+
}
77+
}
78+
5379
type mockSyncServer struct {
5480
grpc.ServerStream
5581
messages []*pb.Sync_Response

schema/arrow.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929
MetadataTableIsPaid = "cq:table_paid"
3030
MetadataTablePermissionsNeeded = "cq:table_permissions_needed"
3131
MetadataTableSensitiveColumns = "cq:table_sensitive_columns"
32+
MetadataTablePluginVersion = "cq:table_plugin_version"
3233
)
3334

3435
type Schemas []*arrow.Schema
@@ -112,6 +113,24 @@ func ReplaceFieldInRecord(src arrow.RecordBatch, fieldName string, field arrow.A
112113
return record, nil
113114
}
114115

116+
// AddPluginVersionToSchema adds the plugin version metadata to an Arrow schema.
117+
// It returns a new schema with the version added to the metadata.
118+
func AddPluginVersionToSchema(sc *arrow.Schema, version string) *arrow.Schema {
119+
if version == "" {
120+
return sc
121+
}
122+
existingMD := sc.Metadata()
123+
md := make(map[string]string)
124+
for i := 0; i < existingMD.Len(); i++ {
125+
key := existingMD.Keys()[i]
126+
value := existingMD.Values()[i]
127+
md[key] = value
128+
}
129+
md[MetadataTablePluginVersion] = version
130+
newMetadata := arrow.MetadataFrom(md)
131+
return arrow.NewSchema(sc.Fields(), &newMetadata)
132+
}
133+
115134
func AddInternalColumnsToRecord(record arrow.RecordBatch, cqClientIDValue string) (arrow.RecordBatch, error) {
116135
schema := record.Schema()
117136
nRows := int(record.NumRows())

schema/table.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ type Table struct {
116116
// with whether the table makes use of a paid API or not.
117117
IsPaid bool `json:"is_paid"`
118118

119+
// SourcePluginVersion is the version of the source plugin that created this table.
120+
// This is set when tables are retrieved from a plugin and is included in the Arrow schema metadata.
121+
SourcePluginVersion string `json:"source_plugin_version,omitempty"`
122+
119123
// IgnorePKComponentsMismatchValidation is a flag that indicates if the table should skip validating usage of both primary key components and primary keys
120124
IgnorePKComponentsMismatchValidation bool `json:"ignore_pk_components_mismatch_validation"`
121125
}
@@ -232,6 +236,9 @@ func NewTableFromArrowSchema(sc *arrow.Schema) (*Table, error) {
232236
if isPaid, found := tableMD.GetValue(MetadataTableIsPaid); found {
233237
table.IsPaid = isPaid == MetadataTrue
234238
}
239+
if sourcePluginVersion, found := tableMD.GetValue(MetadataTablePluginVersion); found {
240+
table.SourcePluginVersion = sourcePluginVersion
241+
}
235242
return table, nil
236243
}
237244

@@ -375,6 +382,14 @@ func (tt Tables) FilterDfsFunc(include, exclude func(*Table) bool, skipDependent
375382
return filteredTables
376383
}
377384

385+
// SetSourcePluginVersion sets the SourcePluginVersion on all tables recursively, including relations.
386+
func (tt Tables) SetSourcePluginVersion(version string) {
387+
for _, t := range tt {
388+
t.SourcePluginVersion = version
389+
t.Relations.SetSourcePluginVersion(version)
390+
}
391+
}
392+
378393
func (tt Tables) ToArrowSchemas() Schemas {
379394
flattened := tt.FlattenTables()
380395
schemas := make(Schemas, len(flattened))
@@ -606,6 +621,9 @@ func (t *Table) ToArrowSchema() *arrow.Schema {
606621
md[MetadataTablePermissionsNeeded] = string(asJSON)
607622
asJSON, _ = json.Marshal(t.SensitiveColumns)
608623
md[MetadataTableSensitiveColumns] = string(asJSON)
624+
if t.SourcePluginVersion != "" {
625+
md[MetadataTablePluginVersion] = t.SourcePluginVersion
626+
}
609627

610628
schemaMd := arrow.MetadataFrom(md)
611629
for i, c := range t.Columns {

schema/table_test.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,8 @@ func TestTablesToAndFromArrow(t *testing.T) {
717717
Parent: &Table{
718718
Name: "parent_table",
719719
},
720-
IsIncremental: true,
720+
IsIncremental: true,
721+
SourcePluginVersion: "v1.0.0",
721722
Columns: []Column{
722723
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean},
723724
{Name: "int", Type: arrow.PrimitiveTypes.Int64},
@@ -747,6 +748,48 @@ func TestTablesToAndFromArrow(t *testing.T) {
747748
}
748749
}
749750

751+
func TestSetSourcePluginVersion(t *testing.T) {
752+
version := "v2.1.0"
753+
tables := Tables{
754+
&Table{
755+
Name: "table1",
756+
Relations: Tables{
757+
&Table{
758+
Name: "table2",
759+
Relations: Tables{
760+
&Table{Name: "table3"},
761+
},
762+
},
763+
},
764+
},
765+
&Table{Name: "table4"},
766+
}
767+
768+
tables.SetSourcePluginVersion(version)
769+
770+
// Verify all tables have the version set
771+
flattened := tables.FlattenTables()
772+
for _, table := range flattened {
773+
if table.SourcePluginVersion != version {
774+
t.Errorf("expected SourcePluginVersion to be %q, got %q for table %s", version, table.SourcePluginVersion, table.Name)
775+
}
776+
}
777+
778+
// Verify relations also have the version set
779+
for _, table := range tables {
780+
if table.SourcePluginVersion != version {
781+
t.Errorf("expected SourcePluginVersion to be %q, got %q for table %s", version, table.SourcePluginVersion, table.Name)
782+
}
783+
if len(table.Relations) > 0 {
784+
for _, rel := range table.Relations {
785+
if rel.SourcePluginVersion != version {
786+
t.Errorf("expected SourcePluginVersion to be %q, got %q for relation %s", version, rel.SourcePluginVersion, rel.Name)
787+
}
788+
}
789+
}
790+
}
791+
}
792+
750793
func TestValidateDuplicateTables(t *testing.T) {
751794
tests := []struct {
752795
name string

0 commit comments

Comments
 (0)