Skip to content

Commit 2848c34

Browse files
committed
add passing tests, full support for cloud file tables
1 parent 5edc871 commit 2848c34

27 files changed

+2424
-412
lines changed

SCHEMA_UPDATE_ANALYSIS.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Schema Evolution & Async Processing Analysis
2+
3+
## Overview
4+
5+
The issue of "Schema Evolution" failing (where `GetTable` returns the old schema after a file update) is caused by the asynchronous nature of the file processing pipeline and how schema updates are handled differently in the **Create** versus **Update** (Patch/Replace) flows.
6+
7+
## API Logic: Create vs. Patch
8+
9+
### 1. Create Table (`POST /v2/reference-tables`)
10+
* **Endpoint**: `CreateTable` in `reference-tables-api/http/v2_endpoints.go`.
11+
* **Flow**:
12+
1. Calls `ImportFileForCreate` in `reference-tables-edge`.
13+
2. `reference-tables-edge` calls `CreateResources`.
14+
3. `CreateResources` calls `upsert.Create`.
15+
4. `upsert.Create` uses `convertHeadersForCreate` which **creates a new schema** from the file headers.
16+
5. It then calls `produceUpsertRawFile` with `RawFile_CREATE`.
17+
18+
### 2. Patch Table (`PATCH /v2/reference-tables/{id}`)
19+
* **Endpoint**: `PatchTable` in `reference-tables-api/http/v2_endpoints.go`.
20+
* **Flow**:
21+
* Validates access details and schema (if provided).
22+
* Calls `ImportFileForReplace` (via `handleNonLocalFileSyncDetails` or direct local flow).
23+
* `reference-tables-edge` calls `ReplaceResources`.
24+
* `ReplaceResources` calls `upsert.Replace`.
25+
* **Code:** [`Replace` in `upsert_replace.go`](https://github.com/DataDog/dd-go/blob/prod/resources/reference-tables/pkg/usecase/referencetables/upsert_replace.go#L72)
26+
27+
## The Core Issue: Additive-Only Schema in Replace
28+
29+
In the **Patch/Replace** flow (`upsert.Replace`), the schema update logic is **additive-only** and conservative.
30+
31+
**Logic in `convertHeadersForReplace` (`upsert.go`):**
32+
1. It iterates over the **existing** schema fields and adds them to the new schema definition.
33+
2. It checks if `primaryKey` fields are present in the new file headers.
34+
3. It then adds any **new** headers found in the file.
35+
4. **CRITICAL**: It does **NOT** remove fields that are missing from the file (unless they are primary keys, which causes an error).
36+
5. `schemaChanged` is set to true **only if new fields are added** (or labels change).
37+
38+
**Consequence**:
39+
* If you **add** a column: `schemaChanged` is true, and `UpdateTableSchema` is called synchronously. The schema evolves.
40+
* If you **remove** a column: `schemaChanged` is false (or the field is kept). The schema **does not evolve** to reflect the removal. The old column remains in the schema.
41+
42+
## The "Missing Link" & File Operator
43+
44+
You asked about the `file-operator` proof. Here is the clarification:
45+
46+
1. **File Operator Role**:
47+
* The `file-operator` processes the raw file and infers the schema *exactly as it is in the file*.
48+
* It sends this exact schema in a `TableDefinition` message to the `write-operator`.
49+
* **Code:** [`Process` in `blob/writer.go`](https://github.com/DataDog/dd-go/blob/prod/resources/reference-tables/pkg/usecase/blob/writer.go#L178)
50+
51+
2. **Write Operator & Aggregator (The Proof)**:
52+
* The `write-operator` consumes this message and calls `WriteSchema` on the backend.
53+
* For file-based tables (Postgres/Cassandra), the backend is `aggregator`.
54+
* **The Proof**: `Aggregator.WriteSchema` is a **NO-OP**.
55+
* **Code:** [`WriteSchema` (NO-OP) in `aggregator.go`](https://github.com/DataDog/dd-go/blob/prod/resources/reference-tables/pkg/repository/aggregator/aggregator.go#L131-L140)
56+
57+
```go
58+
func (a *Aggregator) WriteSchema(...) error {
59+
// ... tracing ...
60+
span.Finish()
61+
return nil // NO-OP: Does not write to Postgres
62+
}
63+
```
64+
65+
**Why this matters**:
66+
* Because `upsert.Replace` (Edge service) is **additive-only**, it cannot handle column removals.
67+
* The `file-operator` -> `write-operator` pipeline *has* the correct, exact schema (from the file).
68+
* If `Aggregator.WriteSchema` were implemented to update Postgres, it would **overwrite** the additive schema with the *exact* schema from the file, effectively supporting full schema evolution (including removals).
69+
* Since it is a NO-OP, we are stuck with the additive-only behavior of the synchronous Edge service.
70+
71+
## Conclusion
72+
73+
The "bug" preventing schema evolution (specifically removals or full sync) is a combination of:
74+
1. **Edge Service Design**: `upsert.Replace` is intentionally additive/safe.
75+
2. **Missing Async Update**: The `write-operator` (via `aggregator`) ignores the schema inferred from the file processing, which is the only place where the "true" file schema exists.
76+
77+
**To support full schema evolution (making the file the source of truth):**
78+
We must implement `WriteSchema` in `aggregator.go` to update `table_metadata` in Postgres. This will make the system eventually consistent with the file content.

datadog/fwprovider/data_source_datadog_reference_table_rows.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package fwprovider
33
import (
44
"context"
55
"fmt"
6+
"net/http"
7+
"time"
68

79
"github.com/DataDog/datadog-api-client-go/v2/api/datadogV2"
810
"github.com/hashicorp/terraform-plugin-framework/datasource"
@@ -100,10 +102,37 @@ func (d *datadogReferenceTableRowsDataSource) Read(ctx context.Context, request
100102
return
101103
}
102104

103-
// Call API to get rows by ID
104-
ddResp, _, err := d.Api.GetRowsByID(d.Auth, tableId, rowIds)
105-
if err != nil {
106-
response.Diagnostics.Append(utils.FrameworkErrorDiag(err, "error getting reference table rows"))
105+
// Call API to get rows by ID with retry logic
106+
// Rows are written asynchronously, so we need to retry if the table hasn't synced yet
107+
// Use a 5-second interval to avoid spamming the API while waiting for sync
108+
var ddResp datadogV2.TableRowResourceArray
109+
var httpResp *http.Response
110+
var err error
111+
112+
retryErr := utils.Retry(5*time.Second, 10, func() error {
113+
ddResp, httpResp, err = d.Api.GetRowsByID(d.Auth, tableId, rowIds)
114+
if err != nil {
115+
// If we get a 404, the table might not have synced yet - retry
116+
if httpResp != nil && httpResp.StatusCode == 404 {
117+
return &utils.RetryableError{Prob: fmt.Sprintf("rows not found (table may not have synced yet): %v", err)}
118+
}
119+
// For other errors, don't retry
120+
return &utils.FatalError{Prob: fmt.Sprintf("error getting reference table rows: %v", err)}
121+
}
122+
// Success - check if we got the expected number of rows
123+
if len(ddResp.Data) == len(rowIds) {
124+
return nil
125+
}
126+
// If we got some rows but not all, the table might still be syncing - retry
127+
if len(ddResp.Data) > 0 && len(ddResp.Data) < len(rowIds) {
128+
return &utils.RetryableError{Prob: fmt.Sprintf("only %d of %d rows found (table may still be syncing)", len(ddResp.Data), len(rowIds))}
129+
}
130+
// If we got no rows, retry
131+
return &utils.RetryableError{Prob: "no rows found (table may not have synced yet)"}
132+
})
133+
134+
if retryErr != nil {
135+
response.Diagnostics.Append(utils.FrameworkErrorDiag(retryErr, "error getting reference table rows"))
107136
return
108137
}
109138

0 commit comments

Comments
 (0)