Skip to content

Commit ce16191

Browse files
authored
Merge branch 'main' into andrest50/unmarshal-json
2 parents 0a20b8e + cda11e8 commit ce16191

File tree

6 files changed

+120
-149
lines changed

6 files changed

+120
-149
lines changed

cmd/dynamodb-stream-consumer/publisher.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ import (
1818

1919
// DynamoDBStreamEvent is the event payload published to NATS for each DynamoDB stream record.
2020
type DynamoDBStreamEvent struct {
21-
EventID string `json:"event_id"`
22-
EventName string `json:"event_name"` // INSERT, MODIFY, REMOVE
23-
TableName string `json:"table_name"`
24-
SequenceNumber string `json:"sequence_number"`
25-
ApproximateCreationTime time.Time `json:"approximate_creation_time"`
21+
EventID string `json:"event_id"`
22+
EventName string `json:"event_name"` // INSERT, MODIFY, REMOVE
23+
TableName string `json:"table_name"`
24+
SequenceNumber string `json:"sequence_number"`
25+
ApproximateCreationTime time.Time `json:"approximate_creation_time"`
2626
// Keys contains only the primary key attribute(s) of the item (partition key +
2727
// optional sort key). Consumers can use this to construct a stable record
2828
// identifier without needing to know the full item schema.

cmd/lfx-v1-sync-helper/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ func LoadConfig() (*Config, error) {
130130
Auth0ClientID: os.Getenv("AUTH0_CLIENT_ID"),
131131
Auth0PrivateKey: os.Getenv("AUTH0_PRIVATE_KEY"),
132132
// Other configuration
133-
NATSURL: os.Getenv("NATS_URL"),
134-
Port: os.Getenv("PORT"),
135-
Bind: os.Getenv("BIND"),
133+
NATSURL: os.Getenv("NATS_URL"),
134+
Port: os.Getenv("PORT"),
135+
Bind: os.Getenv("BIND"),
136136
Debug: parseBooleanEnv("DEBUG"),
137137
HTTPDebug: parseBooleanEnv("HTTP_DEBUG"),
138138
UseMsgpack: parseBooleanEnv("USE_MSGPACK"),

cmd/lfx-v1-sync-helper/handlers_meetings.go

Lines changed: 32 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -416,18 +416,15 @@ func handleZoomMeetingMappingUpdate(ctx context.Context, key string, v1Data map[
416416
return
417417
}
418418

419-
// Fetch the meeting object from v1-objects KV bucket
419+
// Fetch and parse the meeting data.
420420
meetingKey := fmt.Sprintf("itx-zoom-meetings-v2.%s", meetingID)
421-
meetingEntry, err := v1KV.Get(ctx, meetingKey)
421+
meetingData, exists, err := getV1ObjectData(ctx, meetingKey)
422422
if err != nil {
423-
funcLogger.With(errKey, err, "meeting_id", meetingID).WarnContext(ctx, "failed to fetch meeting from KV bucket, cannot trigger re-index")
423+
funcLogger.With(errKey, err, "meeting_id", meetingID).ErrorContext(ctx, "failed to get meeting data from KV bucket")
424424
return
425425
}
426-
427-
// Parse the meeting data
428-
var meetingData map[string]any
429-
if err := json.Unmarshal(meetingEntry.Value(), &meetingData); err != nil {
430-
funcLogger.With(errKey, err, "meeting_id", meetingID).ErrorContext(ctx, "failed to unmarshal meeting data")
426+
if !exists {
427+
funcLogger.With("meeting_id", meetingID).WarnContext(ctx, "meeting data not found or deleted in KV bucket")
431428
return
432429
}
433430

@@ -1073,18 +1070,15 @@ func handleZoomPastMeetingMappingUpdate(ctx context.Context, key string, v1Data
10731070
return
10741071
}
10751072

1076-
// Fetch the past meeting object from v1-objects KV bucket
1073+
// Fetch and parse the past meeting data.
10771074
pastMeetingKey := fmt.Sprintf("itx-zoom-past-meetings.%s", meetingAndOccurrenceID)
1078-
pastMeetingEntry, err := v1KV.Get(ctx, pastMeetingKey)
1075+
pastMeetingData, exists, err := getV1ObjectData(ctx, pastMeetingKey)
10791076
if err != nil {
1080-
funcLogger.With(errKey, err).WarnContext(ctx, "failed to fetch past meeting from KV bucket, cannot trigger re-index")
1077+
funcLogger.With(errKey, err).ErrorContext(ctx, "failed to get past meeting data from KV bucket")
10811078
return
10821079
}
1083-
1084-
// Parse the past meeting data
1085-
var pastMeetingData map[string]any
1086-
if err := json.Unmarshal(pastMeetingEntry.Value(), &pastMeetingData); err != nil {
1087-
funcLogger.With(errKey, err).ErrorContext(ctx, "failed to unmarshal past meeting data")
1080+
if !exists {
1081+
funcLogger.WarnContext(ctx, "past meeting data not found or deleted in KV bucket")
10881082
return
10891083
}
10901084

@@ -1262,22 +1256,16 @@ func handleZoomPastMeetingInviteeUpdate(ctx context.Context, key string, v1Data
12621256
isHost := false
12631257
registrantID := invitee.RegistrantID
12641258
if registrantID != "" {
1265-
// Look up the registrant in the v1-objects KV bucket
1259+
// Look up the registrant in the v1-objects KV bucket.
12661260
registrantKey := fmt.Sprintf("itx-zoom-meetings-registrants-v2.%s", registrantID)
1267-
registrantEntry, err := v1KV.Get(ctx, registrantKey)
1268-
if err == nil && registrantEntry != nil {
1269-
// Parse the registrant data
1270-
var registrantData map[string]any
1271-
if err := json.Unmarshal(registrantEntry.Value(), &registrantData); err == nil {
1272-
// Check if the registrant has the host field set to true
1273-
if hostValue, ok := registrantData["host"].(bool); ok {
1274-
isHost = hostValue
1275-
}
1276-
} else {
1277-
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to unmarshal registrant data")
1261+
registrantData, exists, err := getV1ObjectData(ctx, registrantKey)
1262+
if err != nil {
1263+
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to get registrant data")
1264+
} else if exists {
1265+
// Check if the registrant has the host field set to true
1266+
if hostValue, ok := registrantData["host"].(bool); ok {
1267+
isHost = hostValue
12781268
}
1279-
} else {
1280-
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to fetch registrant from KV bucket")
12811269
}
12821270
}
12831271

@@ -1457,23 +1445,17 @@ func handleZoomPastMeetingAttendeeUpdate(ctx context.Context, key string, v1Data
14571445
isRegistrant := false
14581446
registrantID := attendee.RegistrantID
14591447
if registrantID != "" {
1460-
// Look up the registrant in the v1-objects KV bucket
1448+
// Look up the registrant in the v1-objects KV bucket.
14611449
registrantKey := fmt.Sprintf("itx-zoom-meetings-registrants-v2.%s", registrantID)
1462-
registrantEntry, err := v1KV.Get(ctx, registrantKey)
1463-
if err == nil && registrantEntry != nil {
1450+
registrantData, exists, err := getV1ObjectData(ctx, registrantKey)
1451+
if err != nil {
1452+
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to get registrant data")
1453+
} else if exists {
14641454
isRegistrant = true
1465-
// Parse the registrant data
1466-
var registrantData map[string]any
1467-
if err := json.Unmarshal(registrantEntry.Value(), &registrantData); err == nil {
1468-
// Check if the registrant has the host field set to true
1469-
if hostValue, ok := registrantData["host"].(bool); ok {
1470-
isHost = hostValue
1471-
}
1472-
} else {
1473-
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to unmarshal registrant data")
1455+
// Check if the registrant has the host field set to true
1456+
if hostValue, ok := registrantData["host"].(bool); ok {
1457+
isHost = hostValue
14741458
}
1475-
} else {
1476-
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to fetch registrant from KV bucket")
14771459
}
14781460
}
14791461

@@ -2011,18 +1993,13 @@ func handleZoomPastMeetingSummaryUpdate(ctx context.Context, key string, v1Data
20111993
aiSummaryAccess := ""
20121994
if summaryInput.PastMeetingUID != "" {
20131995
pastMeetingKey := fmt.Sprintf("itx-zoom-past-meetings.%s", summaryInput.PastMeetingUID)
2014-
pastMeetingEntry, err := v1KV.Get(ctx, pastMeetingKey)
2015-
if err == nil && pastMeetingEntry != nil {
2016-
var pastMeetingData map[string]any
2017-
if err := json.Unmarshal(pastMeetingEntry.Value(), &pastMeetingData); err == nil {
2018-
if aiSummaryAccessValue, ok := pastMeetingData["ai_summary_access"].(string); ok && aiSummaryAccessValue != "" {
2019-
aiSummaryAccess = aiSummaryAccessValue
2020-
}
2021-
} else {
2022-
funcLogger.With(errKey, err).WarnContext(ctx, "failed to unmarshal past meeting data")
1996+
pastMeetingData, exists, err := getV1ObjectData(ctx, pastMeetingKey)
1997+
if err != nil {
1998+
funcLogger.With(errKey, err).WarnContext(ctx, "failed to get past meeting data")
1999+
} else if exists {
2000+
if aiSummaryAccessValue, ok := pastMeetingData["ai_summary_access"].(string); ok && aiSummaryAccessValue != "" {
2001+
aiSummaryAccess = aiSummaryAccessValue
20232002
}
2024-
} else {
2025-
funcLogger.With(errKey, err).WarnContext(ctx, "failed to fetch past meeting from KV bucket")
20262003
}
20272004
}
20282005

cmd/lfx-v1-sync-helper/lfx_v1_client.go

Lines changed: 50 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -138,33 +138,12 @@ func lookupV1User(ctx context.Context, platformID string) (*V1User, error) {
138138
// Look up user in the salesforce-merged_user table via v1-objects KV bucket
139139
userKey := fmt.Sprintf("salesforce-merged_user.%s", platformID)
140140

141-
entry, err := v1KV.Get(ctx, userKey)
141+
userData, exists, err := getV1ObjectData(ctx, userKey)
142142
if err != nil {
143-
if err == jetstream.ErrKeyNotFound {
144-
return nil, fmt.Errorf("user %s not found in v1-objects KV bucket", platformID)
145-
}
146-
return nil, fmt.Errorf("failed to get user from v1-objects KV bucket: %w", err)
147-
}
148-
149-
// Parse the merged_user data
150-
var userData map[string]any
151-
if err := json.Unmarshal(entry.Value(), &userData); err != nil {
152-
// Try msgpack if JSON fails.
153-
if msgpackErr := msgpack.Unmarshal(entry.Value(), &userData); msgpackErr != nil {
154-
return nil, fmt.Errorf("failed to unmarshal user data (json: %w, msgpack: %w)", err, msgpackErr)
155-
}
143+
return nil, fmt.Errorf("failed to get user data: %w", err)
156144
}
157-
158-
// Check if the user record is deleted
159-
if isDeleted, ok := userData["isdeleted"].(bool); ok && isDeleted {
160-
return nil, fmt.Errorf("user %s is deleted (isdeleted)", platformID)
161-
}
162-
163-
// Also treat WAL-based soft deletes (indicated by _sdc_deleted_at) as deleted.
164-
if deletedAt, ok := userData["_sdc_deleted_at"]; ok {
165-
if s, okStr := deletedAt.(string); (okStr && strings.TrimSpace(s) != "") || (!okStr && deletedAt != nil) {
166-
return nil, fmt.Errorf("user %s is deleted (_sdc_deleted_at))", platformID)
167-
}
145+
if !exists {
146+
return nil, fmt.Errorf("user %s not found or is deleted in v1-objects KV bucket", platformID)
168147
}
169148

170149
// Extract user fields from the merged_user record
@@ -264,42 +243,15 @@ func getPrimaryEmailForUser(ctx context.Context, userSfid string) (string, error
264243
func getAlternateEmailDetails(ctx context.Context, emailSfid string) (email string, isPrimary bool, isTombstoned bool, err error) {
265244
emailKey := fmt.Sprintf("salesforce-alternate_email__c.%s", emailSfid)
266245

267-
entry, err := v1KV.Get(ctx, emailKey)
246+
// Parse the alternate email record.
247+
emailData, exists, err := getV1ObjectData(ctx, emailKey)
268248
if err != nil {
269-
if err == jetstream.ErrKeyNotFound || err == jetstream.ErrKeyDeleted {
270-
// Key not found or deleted could mean it was tombstoned/deleted
271-
return "", false, true, nil
272-
}
273-
return "", false, false, fmt.Errorf("failed to get email record %s from v1-objects: %w", emailSfid, err)
274-
}
275-
276-
// Check if this is a tombstone marker
277-
if isTombstonedMapping(entry.Value()) {
278-
return "", false, true, nil
279-
}
280-
281-
// Parse the alternate email record
282-
var emailData map[string]any
283-
if err := json.Unmarshal(entry.Value(), &emailData); err != nil {
284-
// Try msgpack if JSON fails.
285-
if msgpackErr := msgpack.Unmarshal(entry.Value(), &emailData); msgpackErr != nil {
286-
return "", false, false, fmt.Errorf("failed to unmarshal email data (json: %w, msgpack: %w)", err, msgpackErr)
287-
}
249+
return "", false, false, fmt.Errorf("failed to get email data: %w", err)
288250
}
289-
290-
// Check if the email record is deleted.
291-
if isDeleted, ok := emailData["isdeleted"].(bool); ok && isDeleted {
251+
if !exists {
292252
return "", false, true, nil
293253
}
294254

295-
// Also check for WAL-based soft deletes (indicated by _sdc_deleted_at).
296-
// This is expected when soft-deleted email records are preserved in v1-objects KV bucket.
297-
if deletedAt, ok := emailData["_sdc_deleted_at"]; ok {
298-
if s, okStr := deletedAt.(string); (okStr && strings.TrimSpace(s) != "") || (!okStr && deletedAt != nil) {
299-
return "", false, true, nil
300-
}
301-
}
302-
303255
// Also check if the email is inactive (active__c is not true).
304256
if isActive, ok := emailData["active__c"].(bool); !ok || !isActive {
305257
return "", false, true, nil
@@ -563,3 +515,45 @@ func parseWebsiteURL(website string) string {
563515

564516
return ""
565517
}
518+
519+
// getV1ObjectData retrieves and unmarshals data from the v1-objects KV bucket with dual-format support.
520+
// It attempts JSON decoding first, then falls back to msgpack if JSON fails.
521+
// Returns (data, exists, error) where exists indicates if the record exists and is not deleted/tombstoned.
522+
// This abstraction should be used for all v1-objects bucket reads to ensure consistent
523+
// dual-format handling across the codebase.
524+
func getV1ObjectData(ctx context.Context, key string) (map[string]any, bool, error) {
525+
entry, err := v1KV.Get(ctx, key)
526+
if err != nil {
527+
if err == jetstream.ErrKeyNotFound || err == jetstream.ErrKeyDeleted {
528+
return nil, false, nil
529+
}
530+
return nil, false, fmt.Errorf("failed to get data from v1-objects KV bucket: %w", err)
531+
}
532+
533+
// Check if this is a tombstone marker.
534+
if isTombstonedMapping(entry.Value()) {
535+
return nil, false, nil
536+
}
537+
538+
var data map[string]any
539+
if err := json.Unmarshal(entry.Value(), &data); err != nil {
540+
// Try msgpack if JSON fails.
541+
if msgpackErr := msgpack.Unmarshal(entry.Value(), &data); msgpackErr != nil {
542+
return nil, false, fmt.Errorf("failed to unmarshal data (json: %w, msgpack: %w)", err, msgpackErr)
543+
}
544+
}
545+
546+
// Check if the record is deleted.
547+
if isDeleted, ok := data["isdeleted"].(bool); ok && isDeleted {
548+
return nil, false, nil
549+
}
550+
551+
// Also check for WAL-based soft deletes (indicated by _sdc_deleted_at).
552+
if deletedAt, ok := data["_sdc_deleted_at"]; ok {
553+
if s, okStr := deletedAt.(string); (okStr && strings.TrimSpace(s) != "") || (!okStr && deletedAt != nil) {
554+
return nil, false, nil
555+
}
556+
}
557+
558+
return data, true, nil
559+
}

go.mod

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,24 @@ go 1.25.6
66

77
require (
88
github.com/akamensky/base58 v0.0.0-20210829145138-ce8bf8802e8f
9-
github.com/auth0/go-auth0 v1.32.1
9+
github.com/auth0/go-auth0 v1.34.0
1010
github.com/aws/aws-sdk-go-v2 v1.41.1
1111
github.com/aws/aws-sdk-go-v2/config v1.32.9
1212
github.com/aws/aws-sdk-go-v2/credentials v1.19.9
1313
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.55.0
1414
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.10
1515
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6
16-
github.com/golang-jwt/jwt/v5 v5.3.0
16+
github.com/golang-jwt/jwt/v5 v5.3.1
1717
github.com/google/uuid v1.6.0
1818
github.com/linuxfoundation/lfx-v2-committee-service v0.2.19
1919
github.com/linuxfoundation/lfx-v2-indexer-service v0.4.14
20-
github.com/linuxfoundation/lfx-v2-project-service v0.5.4
20+
github.com/linuxfoundation/lfx-v2-project-service v0.5.6
2121
github.com/nats-io/nats.go v1.48.0
2222
github.com/patrickmn/go-cache v2.1.0+incompatible
2323
github.com/teambition/rrule-go v1.8.2
2424
github.com/vmihailenco/msgpack/v5 v5.4.1
25-
goa.design/goa/v3 v3.24.1
26-
golang.org/x/oauth2 v0.34.0
25+
goa.design/goa/v3 v3.25.3
26+
golang.org/x/oauth2 v0.35.0
2727
)
2828

2929
require (
@@ -41,22 +41,22 @@ require (
4141
github.com/aws/smithy-go v1.24.0 // indirect
4242
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
4343
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
44-
github.com/go-chi/chi/v5 v5.2.4 // indirect
44+
github.com/go-chi/chi/v5 v5.2.5 // indirect
4545
github.com/goccy/go-json v0.10.5 // indirect
4646
github.com/gorilla/websocket v1.5.3 // indirect
47-
github.com/klauspost/compress v1.18.3 // indirect
47+
github.com/klauspost/compress v1.18.4 // indirect
4848
github.com/lestrrat-go/blackmagic v1.0.4 // indirect
4949
github.com/lestrrat-go/httpcc v1.0.1 // indirect
5050
github.com/lestrrat-go/httprc v1.0.6 // indirect
5151
github.com/lestrrat-go/iter v1.0.2 // indirect
5252
github.com/lestrrat-go/jwx/v2 v2.1.6 // indirect
5353
github.com/lestrrat-go/option v1.0.1 // indirect
54-
github.com/nats-io/nkeys v0.4.14 // indirect
54+
github.com/nats-io/nkeys v0.4.15 // indirect
5555
github.com/nats-io/nuid v1.0.1 // indirect
5656
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
5757
github.com/segmentio/asm v1.2.1 // indirect
5858
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
5959
go.devnw.com/structs v1.0.0 // indirect
60-
golang.org/x/crypto v0.47.0 // indirect
61-
golang.org/x/sys v0.40.0 // indirect
60+
golang.org/x/crypto v0.48.0 // indirect
61+
golang.org/x/sys v0.41.0 // indirect
6262
)

0 commit comments

Comments
 (0)