-
Notifications
You must be signed in to change notification settings - Fork 737
Description
Confirm by changing [ ] to [x] below:
- I've gone though the API reference
- I've checked AWS Forums and StackOverflow for answers
When working with AWS SDK for Go prior to v2, I had a solution to consume a DynamoDB Streams Events. NewItem and OldItem if present are from a configurations table and an Item is defined as follows:
// Item represents a dynamoDB record in the configurations table
type Item struct {
PK string `json:"pk"`
Version int `json:"version,string"`
ManifestVersion string `json:"manifestVersion"`
Region string `json:"region"`
K8sClusterPrefix string `json:"k8sClusterPrefix"`
AZDependent map[string]interface{} `json:"azDependent,omitempty"`
Parameters map[string]interface{} `json:"parameters"`
AuthoredBy string `json:"authoredBy"`
AuthoredWhen string `json:"authoredWhen"`
ItemState ItemState `json:"itemState"`
ItemStateWhen string `json:"itemStateWhen"`
ItemStateAZ string `json:"itemStateAZ,omitempty"`
Message string `json:"message,omitempty"`
}I then had the following to consume a Raw DynamoDB Streams Event
// Event represents a DynamoDB Streams Event written to SQS
type Event struct {
Records []*EventRecord `json:"Records"`
}
// EventRecord represents a DynamoDB Streams Event Record
type EventRecord struct {
Change StreamRecord `json:"dynamodb"`
EventName string `json:"eventName"`
}
// StreamRecord represents a DynamoDB Streams Record
type StreamRecord struct {
Keys map[string]*dynamodb.AttributeValue `json:"Keys,omitempty"`
NewImage map[string]*dynamodb.AttributeValue `json:"NewImage,omitempty"`
OldImage map[string]*dynamodb.AttributeValue `json:"OldImage,omitempty"`
SequenceNumber string `json:"SequenceNumber"`
SizeBytes int64 `json:"SizeBytes"`
}And finally, to consume messages from the SQS queue
for {
result, err := q.client.ReceiveMessageWithContext(q.ctx, &sqs.ReceiveMessageInput{
QueueUrl: sdkaws.String(env.SQSQueueURL()),
WaitTimeSeconds: sdkaws.Int64(maxLongPollSeconds),
MaxNumberOfMessages: sdkaws.Int64(1),
VisibilityTimeout: sdkaws.Int64(visibilityTimeoutSeconds),
})
if err != nil {
q.lasterr = err
return
}
for _, m := range result.Messages {
if m.Body == nil {
q.deleteMessage(m.ReceiptHandle)
continue
}
var event configurations.Event
if err = json.Unmarshal([]byte(*m.Body), &event); err != nil {
q.deleteMessage(m.ReceiptHandle)
continue
}
for _, eventRecord := range event.Records {
select {
case q.eventRecordCh <- eventRecord:
case <-q.ctx.Done():
logrus.Info("queues.processor: context terminated")
q.lasterr = q.ctx.Err()
return
}
}
q.deleteMessage(m.ReceiptHandle)
}
}Then the consumer of the eventRecordCh channel would use
err := dynamodbattribute.UnmarshalMap(eventRecord.Change.NewImage, &item)if a NewImage was expected; the populated map[string]*dynamodb.AttributeValue could then easily be converted to an Item.
When porting over to AWS SDK for go v2 the Event structures became
// Event represents a DynamoDB Streams Event written to SQS
type Event struct {
Records []*EventRecord `json:"Records"`
}
// EventRecord represents a DynamoDB Streams Event Record
type EventRecord struct {
Change StreamRecord `json:"dynamodb"`
EventName string `json:"eventName"`
}
// StreamRecord represents a DynamoDB Streams Record
type StreamRecord struct {
Keys map[string]types.AttributeValue `json:"Keys,omitempty"`
NewImage map[string]types.AttributeValue `json:"NewImage,omitempty"`
OldImage map[string]types.AttributeValue `json:"OldImage,omitempty"`
SequenceNumber string `json:"SequenceNumber"`
SizeBytes int64 `json:"SizeBytes"`
}And with the following test driver
result, err := client.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{
QueueUrl: aws.String(env.SQSQueueURL()),
WaitTimeSeconds: 20,
MaxNumberOfMessages: 1,
VisibilityTimeout: 30,
})
if err != nil {
panic(err)
}
for i, m := range result.Messages {
var event configurations.Event
if err = json.Unmarshal([]byte(*m.Body), &event); err != nil {
fmt.Printf("Error: %v\n", err)
continue
}
fmt.Printf("result.Message[%d]: %#v\n", i, event)
}json.Unmarshal() returns the error
Error: json: cannot unmarshal object into Go struct field StreamRecord.Records.dynamodb.Keys of type types.AttributeValueMind you, I have working ported dynamodb code to read and write the table, Item changing to (yes, PK changed to a partition key 'feed' and a sort key 'microservice'; 'pk' was these two fields # concatenated)
// Item represents a dynamoDB record in the configurations table
type Item struct {
Feed string `json:"feed" dynamodbav:"feed"`
Microservice string `json:"microservice" dynamodbav:"microservice"`
Version int `json:"version,string" dynamodbav:"version,string"`
ManifestVersion string `json:"manifestVersion" dynamodbav:"manifestVersion"`
Region string `json:"region" dynamodbav:"region"`
K8sCluster string `json:"k8sCluster" dynamodbav:"k8sCluster"`
AZDependent map[string]interface{} `json:"azDependent,omitempty" dynamodbav:"azDependent,omitempty"`
Parameters map[string]interface{} `json:"parameters" dynamodbav:"parameters"`
AuthoredBy string `json:"authoredBy" dynamodbav:"authoredBy"`
AuthoredWhen string `json:"authoredWhen" dynamodbav:"authoredWhen"`
ItemState ItemState `json:"itemState" dynamodbav:"itemState"`
ItemStateWhen string `json:"itemStateWhen" dynamodbav:"itemStateWhen"`
ItemStateAZ string `json:"itemStateAZ,omitempty" dynamodbav:"itemStateAZ,omitempty"`
Message string `json:"message,omitempty" dynamodbav:"message,omitempty"`
}
// ItemState is string
type ItemState stringThe magic that *dynamodb.AttributeValue processed allowed for json.Unmarshal() to just work. Apparently that magic wasn't passed down. ;-(
In AWS SDK for go v2, is there a public way to go from DynamoDB JSON to JSON?