Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/dynamodb-stream-consumer/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (

// DynamoDBStreamEvent is the event payload published to NATS for each DynamoDB stream record.
type DynamoDBStreamEvent struct {
EventID string `json:"event_id"`
EventName string `json:"event_name"` // INSERT, MODIFY, REMOVE
TableName string `json:"table_name"`
SequenceNumber string `json:"sequence_number"`
ApproximateCreationTime time.Time `json:"approximate_creation_time"`
EventID string `json:"event_id"`
EventName string `json:"event_name"` // INSERT, MODIFY, REMOVE
TableName string `json:"table_name"`
SequenceNumber string `json:"sequence_number"`
ApproximateCreationTime time.Time `json:"approximate_creation_time"`
// Keys contains only the primary key attribute(s) of the item (partition key +
// optional sort key). Consumers can use this to construct a stable record
// identifier without needing to know the full item schema.
Expand Down
6 changes: 3 additions & 3 deletions cmd/lfx-v1-sync-helper/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func LoadConfig() (*Config, error) {
Auth0ClientID: os.Getenv("AUTH0_CLIENT_ID"),
Auth0PrivateKey: os.Getenv("AUTH0_PRIVATE_KEY"),
// Other configuration
NATSURL: os.Getenv("NATS_URL"),
Port: os.Getenv("PORT"),
Bind: os.Getenv("BIND"),
NATSURL: os.Getenv("NATS_URL"),
Port: os.Getenv("PORT"),
Bind: os.Getenv("BIND"),
Debug: parseBooleanEnv("DEBUG"),
HTTPDebug: parseBooleanEnv("HTTP_DEBUG"),
UseMsgpack: parseBooleanEnv("USE_MSGPACK"),
Expand Down
87 changes: 32 additions & 55 deletions cmd/lfx-v1-sync-helper/handlers_meetings.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,18 +536,15 @@ func handleZoomMeetingMappingUpdate(ctx context.Context, key string, v1Data map[
return
}

// Fetch the meeting object from v1-objects KV bucket
// Fetch and parse the meeting data using unified abstraction
meetingKey := fmt.Sprintf("itx-zoom-meetings-v2.%s", meetingID)
meetingEntry, err := v1KV.Get(ctx, meetingKey)
meetingData, exists, err := getV1ObjectData(ctx, meetingKey)
if err != nil {
funcLogger.With(errKey, err, "meeting_id", meetingID).WarnContext(ctx, "failed to fetch meeting from KV bucket, cannot trigger re-index")
funcLogger.With(errKey, err, "meeting_id", meetingID).ErrorContext(ctx, "failed to get meeting data from KV bucket")
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This failure is handled by logging and returning (i.e., it’s a recoverable condition that just prevents re-index). Logging it at Error level may create noisy alerts; consider using Warn level here to match other recoverable KV/mapping misses in this service.

Suggested change
funcLogger.With(errKey, err, "meeting_id", meetingID).ErrorContext(ctx, "failed to get meeting data from KV bucket")
funcLogger.With(errKey, err, "meeting_id", meetingID).WarnContext(ctx, "failed to get meeting data from KV bucket")

Copilot uses AI. Check for mistakes.
return
}

// Parse the meeting data
var meetingData map[string]any
if err := json.Unmarshal(meetingEntry.Value(), &meetingData); err != nil {
funcLogger.With(errKey, err, "meeting_id", meetingID).ErrorContext(ctx, "failed to unmarshal meeting data")
if !exists {
funcLogger.With("meeting_id", meetingID).WarnContext(ctx, "meeting data not found or deleted in KV bucket")
return
}

Expand Down Expand Up @@ -1251,18 +1248,15 @@ func handleZoomPastMeetingMappingUpdate(ctx context.Context, key string, v1Data
return
}

// Fetch the past meeting object from v1-objects KV bucket
// Fetch and parse the past meeting data using unified abstraction
pastMeetingKey := fmt.Sprintf("itx-zoom-past-meetings.%s", meetingAndOccurrenceID)
pastMeetingEntry, err := v1KV.Get(ctx, pastMeetingKey)
pastMeetingData, exists, err := getV1ObjectData(ctx, pastMeetingKey)
if err != nil {
funcLogger.With(errKey, err).WarnContext(ctx, "failed to fetch past meeting from KV bucket, cannot trigger re-index")
funcLogger.With(errKey, err).ErrorContext(ctx, "failed to get past meeting data from KV bucket")
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the meeting mapping handler, this KV read failure is recovered by returning early. Consider logging at Warn instead of Error to avoid treating transient KV issues/out-of-order events as an application error.

Suggested change
funcLogger.With(errKey, err).ErrorContext(ctx, "failed to get past meeting data from KV bucket")
funcLogger.With(errKey, err).WarnContext(ctx, "failed to get past meeting data from KV bucket")

Copilot uses AI. Check for mistakes.
return
}

// Parse the past meeting data
var pastMeetingData map[string]any
if err := json.Unmarshal(pastMeetingEntry.Value(), &pastMeetingData); err != nil {
funcLogger.With(errKey, err).ErrorContext(ctx, "failed to unmarshal past meeting data")
if !exists {
funcLogger.WarnContext(ctx, "past meeting data not found or deleted in KV bucket")
return
}

Expand Down Expand Up @@ -1440,22 +1434,16 @@ func handleZoomPastMeetingInviteeUpdate(ctx context.Context, key string, v1Data
isHost := false
registrantID := invitee.RegistrantID
if registrantID != "" {
// Look up the registrant in the v1-objects KV bucket
// Look up the registrant in the v1-objects KV bucket using unified abstraction
registrantKey := fmt.Sprintf("itx-zoom-meetings-registrants-v2.%s", registrantID)
registrantEntry, err := v1KV.Get(ctx, registrantKey)
if err == nil && registrantEntry != nil {
// Parse the registrant data
var registrantData map[string]any
if err := json.Unmarshal(registrantEntry.Value(), &registrantData); err == nil {
// Check if the registrant has the host field set to true
if hostValue, ok := registrantData["host"].(bool); ok {
isHost = hostValue
}
} else {
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to unmarshal registrant data")
registrantData, exists, err := getV1ObjectData(ctx, registrantKey)
if err != nil {
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to get registrant data")
} else if exists {
// Check if the registrant has the host field set to true
if hostValue, ok := registrantData["host"].(bool); ok {
isHost = hostValue
}
} else {
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to fetch registrant from KV bucket")
}
}

Expand Down Expand Up @@ -1642,23 +1630,17 @@ func handleZoomPastMeetingAttendeeUpdate(ctx context.Context, key string, v1Data
isRegistrant := false
registrantID := attendee.RegistrantID
if registrantID != "" {
// Look up the registrant in the v1-objects KV bucket
// Look up the registrant in the v1-objects KV bucket using unified abstraction
registrantKey := fmt.Sprintf("itx-zoom-meetings-registrants-v2.%s", registrantID)
registrantEntry, err := v1KV.Get(ctx, registrantKey)
if err == nil && registrantEntry != nil {
registrantData, exists, err := getV1ObjectData(ctx, registrantKey)
if err != nil {
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to get registrant data")
} else if exists {
isRegistrant = true
// Parse the registrant data
var registrantData map[string]any
if err := json.Unmarshal(registrantEntry.Value(), &registrantData); err == nil {
// Check if the registrant has the host field set to true
if hostValue, ok := registrantData["host"].(bool); ok {
isHost = hostValue
}
} else {
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to unmarshal registrant data")
// Check if the registrant has the host field set to true
if hostValue, ok := registrantData["host"].(bool); ok {
isHost = hostValue
}
} else {
funcLogger.With(errKey, err, "registrant_id", registrantID).WarnContext(ctx, "failed to fetch registrant from KV bucket")
}
}

Expand Down Expand Up @@ -2241,18 +2223,13 @@ func handleZoomPastMeetingSummaryUpdate(ctx context.Context, key string, v1Data
aiSummaryAccess := ""
if summaryInput.PastMeetingUID != "" {
pastMeetingKey := fmt.Sprintf("itx-zoom-past-meetings.%s", summaryInput.PastMeetingUID)
pastMeetingEntry, err := v1KV.Get(ctx, pastMeetingKey)
if err == nil && pastMeetingEntry != nil {
var pastMeetingData map[string]any
if err := json.Unmarshal(pastMeetingEntry.Value(), &pastMeetingData); err == nil {
if aiSummaryAccessValue, ok := pastMeetingData["ai_summary_access"].(string); ok && aiSummaryAccessValue != "" {
aiSummaryAccess = aiSummaryAccessValue
}
} else {
funcLogger.With(errKey, err).WarnContext(ctx, "failed to unmarshal past meeting data")
pastMeetingData, exists, err := getV1ObjectData(ctx, pastMeetingKey)
if err != nil {
funcLogger.With(errKey, err).WarnContext(ctx, "failed to get past meeting data")
} else if exists {
if aiSummaryAccessValue, ok := pastMeetingData["ai_summary_access"].(string); ok && aiSummaryAccessValue != "" {
aiSummaryAccess = aiSummaryAccessValue
}
} else {
funcLogger.With(errKey, err).WarnContext(ctx, "failed to fetch past meeting from KV bucket")
}
}

Expand Down
106 changes: 50 additions & 56 deletions cmd/lfx-v1-sync-helper/lfx_v1_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,33 +138,12 @@ func lookupV1User(ctx context.Context, platformID string) (*V1User, error) {
// Look up user in the salesforce-merged_user table via v1-objects KV bucket
userKey := fmt.Sprintf("salesforce-merged_user.%s", platformID)

entry, err := v1KV.Get(ctx, userKey)
userData, exists, err := getV1ObjectData(ctx, userKey)
if err != nil {
if err == jetstream.ErrKeyNotFound {
return nil, fmt.Errorf("user %s not found in v1-objects KV bucket", platformID)
}
return nil, fmt.Errorf("failed to get user from v1-objects KV bucket: %w", err)
}

// Parse the merged_user data
var userData map[string]any
if err := json.Unmarshal(entry.Value(), &userData); err != nil {
// Try msgpack if JSON fails.
if msgpackErr := msgpack.Unmarshal(entry.Value(), &userData); msgpackErr != nil {
return nil, fmt.Errorf("failed to unmarshal user data (json: %w, msgpack: %w)", err, msgpackErr)
}
return nil, fmt.Errorf("failed to get user data: %w", err)
}

// Check if the user record is deleted
if isDeleted, ok := userData["isdeleted"].(bool); ok && isDeleted {
return nil, fmt.Errorf("user %s is deleted (isdeleted)", platformID)
}

// Also treat WAL-based soft deletes (indicated by _sdc_deleted_at) as deleted.
if deletedAt, ok := userData["_sdc_deleted_at"]; ok {
if s, okStr := deletedAt.(string); (okStr && strings.TrimSpace(s) != "") || (!okStr && deletedAt != nil) {
return nil, fmt.Errorf("user %s is deleted (_sdc_deleted_at))", platformID)
}
if !exists {
return nil, fmt.Errorf("user %s not found or is deleted in v1-objects KV bucket", platformID)
}
Comment on lines +141 to 147
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error returned here loses useful context (platformID/userKey and the bucket). Since getV1ObjectData already abstracts the fetch, consider including the key or platformID in the wrapped error so production logs can be tied back to a specific record more easily.

Copilot uses AI. Check for mistakes.

// Extract user fields from the merged_user record
Expand Down Expand Up @@ -264,42 +243,15 @@ func getPrimaryEmailForUser(ctx context.Context, userSfid string) (string, error
func getAlternateEmailDetails(ctx context.Context, emailSfid string) (email string, isPrimary bool, isTombstoned bool, err error) {
emailKey := fmt.Sprintf("salesforce-alternate_email__c.%s", emailSfid)

entry, err := v1KV.Get(ctx, emailKey)
// Parse the alternate email record using the unified abstraction
emailData, exists, err := getV1ObjectData(ctx, emailKey)
if err != nil {
if err == jetstream.ErrKeyNotFound || err == jetstream.ErrKeyDeleted {
// Key not found or deleted could mean it was tombstoned/deleted
return "", false, true, nil
}
return "", false, false, fmt.Errorf("failed to get email record %s from v1-objects: %w", emailSfid, err)
}

// Check if this is a tombstone marker
if isTombstonedMapping(entry.Value()) {
return "", false, true, nil
}

// Parse the alternate email record
var emailData map[string]any
if err := json.Unmarshal(entry.Value(), &emailData); err != nil {
// Try msgpack if JSON fails.
if msgpackErr := msgpack.Unmarshal(entry.Value(), &emailData); msgpackErr != nil {
return "", false, false, fmt.Errorf("failed to unmarshal email data (json: %w, msgpack: %w)", err, msgpackErr)
}
return "", false, false, fmt.Errorf("failed to get email data: %w", err)
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wrapped error is very generic (no email SFID / key). Including emailSfid or emailKey in the error message would make troubleshooting KV/data issues much easier.

Suggested change
return "", false, false, fmt.Errorf("failed to get email data: %w", err)
return "", false, false, fmt.Errorf("failed to get email data for emailSfid %s (key %s): %w", emailSfid, emailKey, err)

Copilot uses AI. Check for mistakes.
}

// Check if the email record is deleted.
if isDeleted, ok := emailData["isdeleted"].(bool); ok && isDeleted {
if !exists {
return "", false, true, nil
}

// Also check for WAL-based soft deletes (indicated by _sdc_deleted_at).
// This is expected when soft-deleted email records are preserved in v1-objects KV bucket.
if deletedAt, ok := emailData["_sdc_deleted_at"]; ok {
if s, okStr := deletedAt.(string); (okStr && strings.TrimSpace(s) != "") || (!okStr && deletedAt != nil) {
return "", false, true, nil
}
}

// Also check if the email is inactive (active__c is not true).
if isActive, ok := emailData["active__c"].(bool); !ok || !isActive {
return "", false, true, nil
Expand Down Expand Up @@ -563,3 +515,45 @@ func parseWebsiteURL(website string) string {

return ""
}

// getV1ObjectData retrieves and unmarshals data from the v1-objects KV bucket with dual-format support.
// It attempts JSON decoding first, then falls back to msgpack if JSON fails.
// Returns (data, exists, error) where exists indicates if the record exists and is not deleted/tombstoned.
// This abstraction should be used for all v1-objects bucket reads to ensure consistent
// dual-format handling across the codebase.
func getV1ObjectData(ctx context.Context, key string) (map[string]any, bool, error) {
entry, err := v1KV.Get(ctx, key)
if err != nil {
if err == jetstream.ErrKeyNotFound || err == jetstream.ErrKeyDeleted {
return nil, false, nil
}
return nil, false, fmt.Errorf("failed to get data from v1-objects KV bucket: %w", err)
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When KV.Get fails with a non-notfound error, this message doesn’t include the KV key being fetched. Including the key in the returned error would significantly improve debuggability (especially since many call sites only wrap with a generic "failed to get ... data").

Suggested change
return nil, false, fmt.Errorf("failed to get data from v1-objects KV bucket: %w", err)
return nil, false, fmt.Errorf("failed to get data from v1-objects KV bucket for key %q: %w", key, err)

Copilot uses AI. Check for mistakes.
}

// Check if this is a tombstone marker.
if isTombstonedMapping(entry.Value()) {
return nil, false, nil
}

var data map[string]any
if err := json.Unmarshal(entry.Value(), &data); err != nil {
// Try msgpack if JSON fails.
if msgpackErr := msgpack.Unmarshal(entry.Value(), &data); msgpackErr != nil {
return nil, false, fmt.Errorf("failed to unmarshal data (json: %w, msgpack: %w)", err, msgpackErr)
}
}

// Check if the record is deleted.
if isDeleted, ok := data["isdeleted"].(bool); ok && isDeleted {
return nil, false, nil
}

// Also check for WAL-based soft deletes (indicated by _sdc_deleted_at).
if deletedAt, ok := data["_sdc_deleted_at"]; ok {
if s, okStr := deletedAt.(string); (okStr && strings.TrimSpace(s) != "") || (!okStr && deletedAt != nil) {
return nil, false, nil
}
}

return data, true, nil
}
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@ go 1.25.6

require (
github.com/akamensky/base58 v0.0.0-20210829145138-ce8bf8802e8f
github.com/auth0/go-auth0 v1.32.1
github.com/auth0/go-auth0 v1.34.0
github.com/aws/aws-sdk-go-v2 v1.41.1
github.com/aws/aws-sdk-go-v2/config v1.32.9
github.com/aws/aws-sdk-go-v2/credentials v1.19.9
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.55.0
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.10
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/golang-jwt/jwt/v5 v5.3.1
github.com/google/uuid v1.6.0
github.com/linuxfoundation/lfx-v2-committee-service v0.2.19
github.com/linuxfoundation/lfx-v2-indexer-service v0.4.14
github.com/linuxfoundation/lfx-v2-project-service v0.5.4
github.com/linuxfoundation/lfx-v2-project-service v0.5.6
github.com/nats-io/nats.go v1.48.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/teambition/rrule-go v1.8.2
github.com/vmihailenco/msgpack/v5 v5.4.1
goa.design/goa/v3 v3.24.1
golang.org/x/oauth2 v0.34.0
goa.design/goa/v3 v3.25.3
golang.org/x/oauth2 v0.35.0
)

require (
Expand All @@ -41,22 +41,22 @@ require (
github.com/aws/smithy-go v1.24.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
github.com/go-chi/chi/v5 v5.2.4 // indirect
github.com/go-chi/chi/v5 v5.2.5 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/klauspost/compress v1.18.3 // indirect
github.com/klauspost/compress v1.18.4 // indirect
github.com/lestrrat-go/blackmagic v1.0.4 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
github.com/lestrrat-go/httprc v1.0.6 // indirect
github.com/lestrrat-go/iter v1.0.2 // indirect
github.com/lestrrat-go/jwx/v2 v2.1.6 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/nats-io/nkeys v0.4.14 // indirect
github.com/nats-io/nkeys v0.4.15 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/segmentio/asm v1.2.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.devnw.com/structs v1.0.0 // indirect
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/sys v0.41.0 // indirect
)
Loading