[LFXV2-1084] Add NATS KV Event Processing to Survey Service#4
[LFXV2-1084] Add NATS KV Event Processing to Survey Service#4
Conversation
Implement complete event processing infrastructure to consume v1 survey and survey_response data from NATS KV buckets, transform to v2 format, and publish to indexer and FGA-sync services. This mirrors the pattern from voting service PR #8. Implementation details: Event Processing Infrastructure: - EventProcessor manages NATS JetStream consumer lifecycle with Start/Stop - Watches v1-objects KV bucket with consumer pattern (DeliverLastPerSubject) - Routes events by key prefix (itx-surveys, itx-survey-responses) - Graceful shutdown with proper context cancellation Data Transformation Layer: - Converts v1 DynamoDB string fields to proper v2 types (ints, booleans) - Maps v1 SFIDs to v2 UUIDs via IDMapper for committees and projects - Processes committee arrays with deduplication of project references - Preserves SurveyMonkey question/answer data without transformation - Handles string-to-int conversions with error logging Publishing Strategy: - Dual publishing to indexer service and FGA-sync service - Indexer messages include IndexingConfig with parent refs and access control - FGA messages include committee/project references for access control - Determines create vs update by checking v1-mappings KV bucket - Delete operations publish to both services and clean up mappings Error Handling: - Transient errors (NATS timeout, connection issues) trigger NAK for retry - Permanent errors (invalid JSON, missing required fields) trigger ACK to skip - Mapping failures log warnings but don't block processing - MaxDeliver=3, AckWait=30s, MaxAckPending=1000 Configuration: - EVENT_PROCESSING_ENABLED=true (default enabled) - EVENT_CONSUMER_NAME=survey-service-kv-consumer - EVENT_STREAM_NAME=KV_v1-objects - EVENT_FILTER_SUBJECT=$KV.v1-objects.> - Runs in same binary as HTTP API, starts in background goroutine Files created: - cmd/survey-api/eventing/event_processor.go - cmd/survey-api/eventing/kv_handler.go - cmd/survey-api/eventing/survey_event_handler.go - cmd/survey-api/eventing/survey_response_event_handler.go - internal/domain/event_models.go - internal/domain/event_publisher.go - internal/infrastructure/eventing/event_config.go - internal/infrastructure/eventing/nats_publisher.go - pkg/utils/string.go Files modified: - cmd/survey-api/main.go - Event processor integration - go.mod/go.sum - Added indexer-service dependency - internal/domain/errors.go - Added error classifications - internal/service/survey_service.go - Minor adjustments - pkg/models/itx/models.go - Model updates 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> Signed-off-by: Andres Tobon <andrest2455@gmail.com>
Create detailed documentation for the NATS KV bucket event processing feature and update README to reference it. Documentation includes: - Architecture overview with diagrams - Event flow and data transformation details - Configuration reference for all environment variables - Error handling strategies (transient vs permanent) - Operations guide (monitoring, troubleshooting, lifecycle) - Deduplication mechanism explanation - Performance considerations and tuning - Development guide with code structure - Integration with IDMapper, Indexer, and FGA-Sync services README updates: - Added Event Processing to features list - Added event processing environment variables to configuration section - Updated project structure to show eventing packages - Added reference link to event-processing.md This ensures developers understand what event processing does, how to configure it, and how to troubleshoot issues. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> Signed-off-by: Andres Tobon <andrest2455@gmail.com>
Add a cleanup utility script that deletes all survey and survey_response documents from the OpenSearch index. This is useful for cleaning up test data or resetting the system during development and testing. The script: - Counts existing survey and survey_response documents - Prompts for confirmation before deletion - Deletes documents by object_type using delete_by_query - Verifies cleanup by counting remaining documents - Provides clear output at each step 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> Signed-off-by: Andres Tobon <andrest2455@gmail.com>
There was a problem hiding this comment.
Pull request overview
This pull request implements NATS KV bucket event processing for the survey service to enable real-time synchronization of survey and survey response data from v1 (DynamoDB) to v2 (indexer and FGA services). The implementation follows the architectural pattern established in the voting service PR #8.
Changes:
- Added event processing infrastructure with NATS JetStream consumer for KV bucket watching
- Implemented data transformation logic from v1 (string-based DynamoDB format) to v2 (properly typed format)
- Added ID mapping integration for converting v1 SFIDs to v2 UUIDs
- Integrated with indexer service for search functionality and FGA-sync service for access control
- Added comprehensive documentation and operational utilities
Reviewed changes
Copilot reviewed 14 out of 18 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
cmd/survey-api/eventing/event_processor.go |
Event processor lifecycle management with NATS connection and JetStream consumer |
cmd/survey-api/eventing/kv_handler.go |
Routes KV events to appropriate handlers based on key prefix and operation |
cmd/survey-api/eventing/survey_event_handler.go |
Transforms v1 survey data to v2 format, handles ID mapping and validation |
cmd/survey-api/eventing/survey_response_event_handler.go |
Transforms v1 survey response data with similar conversion logic |
internal/domain/event_models.go |
Defines v2 data models for surveys and responses with proper types |
internal/domain/event_publisher.go |
Publisher interface abstraction for event publishing |
internal/infrastructure/eventing/event_config.go |
Configuration structure for event processor |
internal/infrastructure/eventing/nats_publisher.go |
NATS publisher implementation for indexer and FGA-sync |
cmd/survey-api/main.go |
Initializes event processor on startup and handles graceful shutdown |
pkg/utils/string.go |
Custom string utility (reinvents stdlib) |
go.mod |
Adds indexer service dependency |
docs/event-processing.md |
Comprehensive event processing documentation |
README.md |
Updates with event processing feature documentation |
scripts/delete_survey_documents.sh |
Utility script for cleaning up test data from OpenSearch |
pkg/models/itx/models.go |
Whitespace/alignment formatting cleanup |
internal/service/survey_service.go |
Minor formatting adjustments |
internal/domain/errors.go |
Formatting cleanup |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Check if we've already added this project UID | ||
| projectRef := fmt.Sprintf("project:%s", committee.ProjectUID) | ||
| found := false | ||
| for _, ref := range parentRefs { | ||
| if ref == projectRef { | ||
| found = true | ||
| break | ||
| } | ||
| } | ||
| if !found { | ||
| parentRefs = append(parentRefs, projectRef) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| indexingConfig := &indexerTypes.IndexingConfig{ | ||
| ObjectID: data.UID, | ||
| AccessCheckObject: fmt.Sprintf("survey:%s", data.UID), | ||
| AccessCheckRelation: "viewer", | ||
| HistoryCheckObject: fmt.Sprintf("survey:%s", data.UID), | ||
| HistoryCheckRelation: "auditor", | ||
| SortName: data.SurveyTitle, | ||
| NameAndAliases: nameAndAliases, | ||
| ParentRefs: parentRefs, | ||
| Fulltext: data.SurveyTitle, | ||
| } | ||
|
|
||
| if action == indexerConstants.ActionDeleted { | ||
| return p.sendIndexerDeleteMessage(ctx, subject, action, data.UID, indexingConfig) | ||
| } | ||
|
|
||
| return p.sendIndexerCreateUpdateMessage(ctx, subject, action, data, indexingConfig) | ||
| } | ||
|
|
||
| // sendSurveyAccessMessage sends the message to the NATS server for the survey access control | ||
| func (p *NATSPublisher) sendSurveyAccessMessage(survey *domain.SurveyData) error { | ||
| // Build committee and project references | ||
| committeeRefs := []string{} | ||
| projectRefs := []string{} | ||
|
|
||
| for _, committee := range survey.Committees { | ||
| if committee.CommitteeUID != "" { | ||
| committeeRefs = append(committeeRefs, committee.CommitteeUID) | ||
| } | ||
| if committee.ProjectUID != "" { | ||
| // Check if we've already added this project UID | ||
| found := false | ||
| for _, ref := range projectRefs { | ||
| if ref == committee.ProjectUID { | ||
| found = true | ||
| break | ||
| } | ||
| } | ||
| if !found { | ||
| projectRefs = append(projectRefs, committee.ProjectUID) | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic for deduplicating project UIDs (lines 118-130) and committee UIDs in access messages (lines 163-174) is duplicated. This is a maintenance burden and could lead to inconsistencies.
Consider extracting this into a helper function:
func deduplicateStrings(items []string) []string {
seen := make(map[string]bool)
result := []string{}
for _, item := range items {
if !seen[item] {
seen[item] = true
result = append(result, item)
}
}
return result
}| // Contains checks if a string contains a substring | ||
| func Contains(s, substr string) bool { | ||
| return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && (s[:len(substr)] == substr || | ||
| s[len(s)-len(substr):] == substr || | ||
| containsMiddle(s, substr))) | ||
| } | ||
|
|
||
| // containsMiddle checks if substr appears in the middle of s | ||
| func containsMiddle(s, substr string) bool { | ||
| for i := 0; i <= len(s)-len(substr); i++ { | ||
| if s[i:i+len(substr)] == substr { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| } |
There was a problem hiding this comment.
The custom Contains function reinvents the standard library's strings.Contains() function. This implementation is more complex and slower than the standard library version which uses optimized assembly on many platforms.
Replace utils.Contains(errStr, "timeout") calls with the standard strings.Contains(errStr, "timeout") throughout the codebase.
| // Contains checks if a string contains a substring | |
| func Contains(s, substr string) bool { | |
| return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && (s[:len(substr)] == substr || | |
| s[len(s)-len(substr):] == substr || | |
| containsMiddle(s, substr))) | |
| } | |
| // containsMiddle checks if substr appears in the middle of s | |
| func containsMiddle(s, substr string) bool { | |
| for i := 0; i <= len(s)-len(substr); i++ { | |
| if s[i:i+len(substr)] == substr { | |
| return true | |
| } | |
| } | |
| return false | |
| } | |
| import "strings" | |
| // Contains checks if a string contains a substring | |
| func Contains(s, substr string) bool { | |
| return strings.Contains(s, substr) | |
| } |
- Add config validation call after loadConfig() to fail fast on missing credentials - Replace os.Exit() in goroutines with shutdown channel for graceful cleanup - Add dependency checks for jq and curl in cleanup script - Add HTTP status code validation for all curl requests in script - Fix duplicate cmd/survey-api entry in README project structure Generated with Claude Code: https://claude.com/claude-code Signed-off-by: Andres Tobon <andrest2455@gmail.com>
…-service into andrest50/events-processing
Summary
Implements NATS KV bucket event processing for the survey service to automatically sync survey and survey response data from the v1 system to the v2 system. This enables real-time data synchronization, search indexing, and access control updates.
Ticket
https://linuxfoundation.atlassian.net/browse/LFXV2-1084
Implementation Details
Architecture
v1-objectsKV bucket for keys matchingitx-surveys:*anditx-survey-responses:*DeliverLastPerSubjectPolicyfor latest version processingEVENT_PROCESSING_ENABLEDData Transformation
Publishing
Publishes transformed data to two downstream services:
lfx.index.survey,lfx.index.survey_response)lfx.fga-sync.update_access,lfx.fga-sync.delete_access)Deduplication
v1-mappingsKV bucketError Handling
Files Changed
New Files
cmd/survey-api/eventing/event_processor.go- Lifecycle managementcmd/survey-api/eventing/kv_handler.go- Event routing by key prefixcmd/survey-api/eventing/survey_event_handler.go- Survey transformation logiccmd/survey-api/eventing/survey_response_event_handler.go- Response transformation logicinternal/domain/event_models.go- v2 data modelsinternal/domain/event_publisher.go- Publisher interfaceinternal/infrastructure/eventing/event_config.go- Configurationinternal/infrastructure/eventing/nats_publisher.go- NATS publishing implementationpkg/utils/string.go- String utility functionsdocs/event-processing.md- Comprehensive documentationscripts/delete_survey_documents.sh- Cleanup utility scriptModified Files
cmd/survey-api/main.go- Event processor initialization and shutdownREADME.md- Documentation updatesgo.mod- Added indexer service dependencyConfiguration
New environment variables:
EVENT_PROCESSING_ENABLED- Enable/disable event processing (default: true)EVENT_CONSUMER_NAME- JetStream consumer name (default: survey-service-kv-consumer)EVENT_STREAM_NAME- JetStream stream name (default: KV_v1-objects)EVENT_FILTER_SUBJECT- NATS subject filter (default: $KV.v1-objects.>)Reference Implementation
Follows the exact pattern from voting service PR #8: linuxfoundation/lfx-v2-voting-service#8
Uses function-based handlers (not struct-based methods) to match the voting service architecture.
Documentation
docs/event-processing.mdwith architecture, configuration, operations, and troubleshootingREADME.mdwith event processing feature and configurationChecklist