Skip to content

Commit 634fd69

Browse files
authored
fix: align glossaryTerms and institutionalMemory aspects with DataHub PDL schemas (#60)
- Add required auditStamp field to glossaryTermsAspect (GlossaryTerms.pdl) - Fix linkElement JSON tag from "created" to "createStamp" (InstitutionalMemoryMetadata.pdl) - Use time.Now().UnixMilli() for audit stamps instead of hardcoded 0 - Add newAuditStamp() helper for consistent stamp creation Fixes #59
1 parent 063b27d commit 634fd69

File tree

2 files changed

+49
-13
lines changed

2 files changed

+49
-13
lines changed

pkg/client/write.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"time"
89
)
910

1011
// entityTypeFromURN derives the DataHub entity type string from a parsed URN.
@@ -169,8 +170,10 @@ func (c *Client) readGlobalTags(ctx context.Context, urn string) (*globalTagsAsp
169170
}
170171

171172
// glossaryTermsAspect represents the glossaryTerms aspect structure.
173+
// Per GlossaryTerms.pdl, auditStamp is a required field.
172174
type glossaryTermsAspect struct {
173-
Terms []termAssociation `json:"terms"`
175+
Terms []termAssociation `json:"terms"`
176+
AuditStamp auditStampRaw `json:"auditStamp"`
174177
}
175178

176179
// termAssociation represents a glossary term association.
@@ -198,6 +201,7 @@ func (c *Client) AddGlossaryTerm(ctx context.Context, urn, termURN string) error
198201
}
199202

200203
terms.Terms = append(terms.Terms, termAssociation{URN: termURN})
204+
terms.AuditStamp = newAuditStamp()
201205

202206
return c.postIngestProposal(ctx, ingestProposal{
203207
EntityType: entityType,
@@ -226,6 +230,7 @@ func (c *Client) RemoveGlossaryTerm(ctx context.Context, urn, termURN string) er
226230
}
227231
}
228232
terms.Terms = filtered
233+
terms.AuditStamp = newAuditStamp()
229234

230235
return c.postIngestProposal(ctx, ingestProposal{
231236
EntityType: entityType,
@@ -258,18 +263,28 @@ type institutionalMemoryAspect struct {
258263
}
259264

260265
// linkElement represents a link in the institutionalMemory aspect.
266+
// Per InstitutionalMemoryMetadata.pdl, the audit stamp field is "createStamp".
261267
type linkElement struct {
262268
URL string `json:"url"`
263269
Description string `json:"description"`
264-
Created auditStampRaw `json:"created"`
270+
CreateStamp auditStampRaw `json:"createStamp"`
265271
}
266272

267273
// auditStampRaw represents an audit stamp with millisecond timestamp.
274+
// Per AuditStamp.pdl: time (epoch ms) and actor (entity URN) are required.
268275
type auditStampRaw struct {
269276
Time int64 `json:"time"`
270277
Actor string `json:"actor"`
271278
}
272279

280+
// newAuditStamp creates an audit stamp with the current time.
281+
func newAuditStamp() auditStampRaw {
282+
return auditStampRaw{
283+
Time: time.Now().UnixMilli(),
284+
Actor: "urn:li:corpuser:datahub",
285+
}
286+
}
287+
273288
// AddLink adds a link to an entity using read-modify-write on institutionalMemory.
274289
func (c *Client) AddLink(ctx context.Context, urn, linkURL, description string) error {
275290
entityType, err := entityTypeFromURN(urn)
@@ -292,10 +307,7 @@ func (c *Client) AddLink(ctx context.Context, urn, linkURL, description string)
292307
memory.Elements = append(memory.Elements, linkElement{
293308
URL: linkURL,
294309
Description: description,
295-
Created: auditStampRaw{
296-
Time: 0, // DataHub will fill this in
297-
Actor: "urn:li:corpuser:datahub",
298-
},
310+
CreateStamp: newAuditStamp(),
299311
})
300312

301313
return c.postIngestProposal(ctx, ingestProposal{

pkg/client/write_test.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ func TestAddGlossaryTerm(t *testing.T) {
325325
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
326326
if r.Method == http.MethodGet {
327327
resp := aspectResponse{
328-
Value: json.RawMessage(`{"terms":[{"urn":"urn:li:glossaryTerm:existing"}]}`),
328+
Value: json.RawMessage(`{"terms":[{"urn":"urn:li:glossaryTerm:existing"}],"auditStamp":{"time":1000,"actor":"urn:li:corpuser:admin"}}`),
329329
}
330330
w.Header().Set("Content-Type", "application/json")
331331
_ = json.NewEncoder(w).Encode(resp)
@@ -342,6 +342,13 @@ func TestAddGlossaryTerm(t *testing.T) {
342342
if proposal["aspectName"] != "glossaryTerms" {
343343
t.Errorf("expected aspect 'glossaryTerms', got %v", proposal["aspectName"])
344344
}
345+
// Per GlossaryTerms.pdl, auditStamp is required and must have valid time
346+
if terms.AuditStamp.Time <= 0 {
347+
t.Error("expected auditStamp.time > 0")
348+
}
349+
if terms.AuditStamp.Actor == "" {
350+
t.Error("expected auditStamp.actor to be non-empty")
351+
}
345352
w.WriteHeader(http.StatusOK)
346353
}))
347354
defer server.Close()
@@ -365,7 +372,7 @@ func TestAddGlossaryTerm_Duplicate(t *testing.T) {
365372
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
366373
if r.Method == http.MethodGet {
367374
resp := aspectResponse{
368-
Value: json.RawMessage(`{"terms":[{"urn":"urn:li:glossaryTerm:existing"}]}`),
375+
Value: json.RawMessage(`{"terms":[{"urn":"urn:li:glossaryTerm:existing"}],"auditStamp":{"time":1000,"actor":"urn:li:corpuser:admin"}}`),
369376
}
370377
w.Header().Set("Content-Type", "application/json")
371378
_ = json.NewEncoder(w).Encode(resp)
@@ -394,8 +401,11 @@ func TestAddGlossaryTerm_Duplicate(t *testing.T) {
394401
func TestRemoveGlossaryTerm(t *testing.T) {
395402
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
396403
if r.Method == http.MethodGet {
404+
termsJSON := `{"terms":[{"urn":"urn:li:glossaryTerm:keep"},` +
405+
`{"urn":"urn:li:glossaryTerm:remove"}],` +
406+
`"auditStamp":{"time":1000,"actor":"urn:li:corpuser:admin"}}`
397407
resp := aspectResponse{
398-
Value: json.RawMessage(`{"terms":[{"urn":"urn:li:glossaryTerm:keep"},{"urn":"urn:li:glossaryTerm:remove"}]}`),
408+
Value: json.RawMessage(termsJSON),
399409
}
400410
w.Header().Set("Content-Type", "application/json")
401411
_ = json.NewEncoder(w).Encode(resp)
@@ -409,6 +419,9 @@ func TestRemoveGlossaryTerm(t *testing.T) {
409419
if len(terms.Terms) != 1 {
410420
t.Errorf("expected 1 term after removal, got %d", len(terms.Terms))
411421
}
422+
if terms.AuditStamp.Time <= 0 {
423+
t.Error("expected auditStamp.time > 0 on write")
424+
}
412425
w.WriteHeader(http.StatusOK)
413426
}))
414427
defer server.Close()
@@ -433,7 +446,7 @@ func TestAddLink(t *testing.T) {
433446
if r.Method == http.MethodGet {
434447
linkJSON := `{"elements":[{"url":"https://existing.com",` +
435448
`"description":"existing",` +
436-
`"created":{"time":0,"actor":"urn:li:corpuser:datahub"}}]}`
449+
`"createStamp":{"time":1000,"actor":"urn:li:corpuser:datahub"}}]}`
437450
resp := aspectResponse{
438451
Value: json.RawMessage(linkJSON),
439452
}
@@ -452,6 +465,14 @@ func TestAddLink(t *testing.T) {
452465
if len(memory.Elements) != 2 {
453466
t.Errorf("expected 2 links, got %d", len(memory.Elements))
454467
}
468+
// New link should have a real timestamp (not 0)
469+
newLink := memory.Elements[1]
470+
if newLink.CreateStamp.Time <= 0 {
471+
t.Error("expected createStamp.time > 0 for new link")
472+
}
473+
if newLink.CreateStamp.Actor != "urn:li:corpuser:datahub" {
474+
t.Errorf("expected actor 'urn:li:corpuser:datahub', got %q", newLink.CreateStamp.Actor)
475+
}
455476
w.WriteHeader(http.StatusOK)
456477
}))
457478
defer server.Close()
@@ -476,7 +497,7 @@ func TestAddLink_Duplicate(t *testing.T) {
476497
if r.Method == http.MethodGet {
477498
linkJSON := `{"elements":[{"url":"https://existing.com",` +
478499
`"description":"existing",` +
479-
`"created":{"time":0,"actor":"urn:li:corpuser:datahub"}}]}`
500+
`"createStamp":{"time":0,"actor":"urn:li:corpuser:datahub"}}]}`
480501
resp := aspectResponse{
481502
Value: json.RawMessage(linkJSON),
482503
}
@@ -509,9 +530,9 @@ func TestRemoveLink(t *testing.T) {
509530
if r.Method == http.MethodGet {
510531
linkJSON := `{"elements":[` +
511532
`{"url":"https://keep.com","description":"keep",` +
512-
`"created":{"time":0,"actor":"urn:li:corpuser:datahub"}},` +
533+
`"createStamp":{"time":0,"actor":"urn:li:corpuser:datahub"}},` +
513534
`{"url":"https://remove.com","description":"remove",` +
514-
`"created":{"time":0,"actor":"urn:li:corpuser:datahub"}}]}`
535+
`"createStamp":{"time":0,"actor":"urn:li:corpuser:datahub"}}]}`
515536
resp := aspectResponse{
516537
Value: json.RawMessage(linkJSON),
517538
}
@@ -815,6 +836,9 @@ func TestAddGlossaryTerm_NoExistingTerms(t *testing.T) {
815836
if len(terms.Terms) != 1 {
816837
t.Errorf("expected 1 term, got %d", len(terms.Terms))
817838
}
839+
if terms.AuditStamp.Time <= 0 {
840+
t.Error("expected auditStamp.time > 0")
841+
}
818842
w.WriteHeader(http.StatusOK)
819843
}))
820844
defer server.Close()

0 commit comments

Comments
 (0)