Skip to content

Commit 3b28991

Browse files
authored
feat: Enable storing _cq_client_id. (#2046)
We need to be able to store `_cq_client_id` on every table for certain use cases where we need to know where a row came from in terms of multiplexed clients. This PR adds the ability for the SDK and all its schedulers to store `_cq_client_id` on all tables. I added this feature to the example plugin in the SDK and it does store it properly: <img width="861" alt="Screenshot 2025-01-08 at 10 45 04" src="https://github.com/user-attachments/assets/58705cf2-3e61-4585-bfbc-63abbccef555" /> If you want to try this yourself, please note that, very unfortunately, the simple plugin also stores a `client_id` column that is unrelated (and doesn't work, since it thinks it's supposed to be an integer).
1 parent 3878f90 commit 3b28991

File tree

6 files changed

+33
-0
lines changed

6 files changed

+33
-0
lines changed

examples/simple_plugin/plugin/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ func getTables() schema.Tables {
123123
}
124124
for _, t := range tables {
125125
schema.AddCqIDs(t)
126+
schema.AddCqClientID(t)
126127
}
127128
return tables
128129
}

scheduler/queue/worker.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien
147147
atomic.AddUint64(&tableMetrics.Errors, 1)
148148
return
149149
}
150+
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
151+
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
152+
}
150153
if err := resolvedResource.Validate(); err != nil {
151154
switch err.(type) {
152155
case *schema.PKError:

scheduler/scheduler_dfs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
188188
atomic.AddUint64(&tableMetrics.Errors, 1)
189189
return
190190
}
191+
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
192+
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
193+
}
191194
if err := resolvedResource.Validate(); err != nil {
192195
switch err.(type) {
193196
case *schema.PKError:

schema/meta.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ var CqParentIDColumn = Column{
2929
IgnoreInTests: true,
3030
}
3131

32+
var CqClientIDColumn = Column{
33+
Name: "_cq_client_id",
34+
Type: arrow.BinaryTypes.String,
35+
Description: "Internal CQ ID of the multiplexed client",
36+
NotNull: true,
37+
}
38+
3239
// These columns are managed and populated by the destination plugin.
3340
var CqSyncTimeColumn = Column{
3441
Name: "_cq_sync_time",

schema/resource.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,14 @@ func (r *Resource) storeCQID(value uuid.UUID) error {
123123
return r.Set(CqIDColumn.Name, b)
124124
}
125125

126+
func (r *Resource) StoreCQClientID(clientID string) error {
127+
// We skip if _cq_client_id is not present.
128+
if r.Table.Columns.Get(CqClientIDColumn.Name) == nil {
129+
return nil
130+
}
131+
return r.Set(CqClientIDColumn.Name, clientID)
132+
}
133+
126134
type PKError struct {
127135
MissingPKs []string
128136
}

schema/table.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,17 @@ func AddCqIDs(table *Table) {
127127
}
128128
}
129129

130+
// AddCqClientID adds the cq_client_id column to the table,
131+
// which is used to identify the multiplexed client that fetched the resource
132+
func AddCqClientID(t *Table) {
133+
if t.Columns.Get(CqClientIDColumn.Name) == nil {
134+
t.Columns = append(ColumnList{CqClientIDColumn}, t.Columns...)
135+
}
136+
for _, rel := range t.Relations {
137+
AddCqClientID(rel)
138+
}
139+
}
140+
130141
// CqIDAsPK sets the cq_id column as primary key if it exists
131142
// and removes the primary key from all other columns
132143
func CqIDAsPK(t *Table) {

0 commit comments

Comments
 (0)