Skip to content

Commit b06ad5c

Browse files
committed
feat: Enable storing _cq_client_id.
1 parent 3878f90 commit b06ad5c

File tree

6 files changed

+28
-0
lines changed

6 files changed

+28
-0
lines changed

examples/simple_plugin/plugin/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ func getTables() schema.Tables {
123123
}
124124
for _, t := range tables {
125125
schema.AddCqIDs(t)
126+
schema.AddCqClientID(t)
126127
}
127128
return tables
128129
}

scheduler/queue/worker.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien
147147
atomic.AddUint64(&tableMetrics.Errors, 1)
148148
return
149149
}
150+
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
151+
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
152+
}
150153
if err := resolvedResource.Validate(); err != nil {
151154
switch err.(type) {
152155
case *schema.PKError:

scheduler/scheduler_dfs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
188188
atomic.AddUint64(&tableMetrics.Errors, 1)
189189
return
190190
}
191+
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
192+
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
193+
}
191194
if err := resolvedResource.Validate(); err != nil {
192195
switch err.(type) {
193196
case *schema.PKError:

schema/meta.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ var CqParentIDColumn = Column{
2929
IgnoreInTests: true,
3030
}
3131

32+
var CqClientIDColumn = Column{
33+
Name: "_cq_client_id",
34+
Type: arrow.BinaryTypes.String,
35+
Description: "Internal CQ ID of the multiplexed client",
36+
NotNull: true,
37+
}
38+
3239
// These columns are managed and populated by the destination plugin.
3340
var CqSyncTimeColumn = Column{
3441
Name: "_cq_sync_time",

schema/resource.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,14 @@ func (r *Resource) storeCQID(value uuid.UUID) error {
123123
return r.Set(CqIDColumn.Name, b)
124124
}
125125

126+
func (r *Resource) StoreCQClientID(clientID string) error {
127+
// We skip if _cq_client_id is not present.
128+
if r.Table.Columns.Get(CqClientIDColumn.Name) == nil {
129+
return nil
130+
}
131+
return r.Set(CqClientIDColumn.Name, clientID)
132+
}
133+
126134
type PKError struct {
127135
MissingPKs []string
128136
}

schema/table.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ func AddCqIDs(table *Table) {
127127
}
128128
}
129129

130+
// AddCqClientID adds the cq_client_id column to the table,
131+
// which is used to identify the multiplexed client that fetched the resource
132+
func AddCqClientID(t *Table) {
133+
t.Columns = append(ColumnList{CqClientIDColumn}, t.Columns...)
134+
}
135+
130136
// CqIDAsPK sets the cq_id column as primary key if it exists
131137
// and removes the primary key from all other columns
132138
func CqIDAsPK(t *Table) {

0 commit comments

Comments
 (0)