Skip to content

Commit c8f27ea

Browse files
taherkltaherklshoaibcldcvr
authored
Map datatype support (#89)
* Mapdatatypesupport with list (#40) Co-authored-by: taherkl <[email protected]> Co-authored-by: Shoaibmohammad Jarman <[email protected]>
1 parent c75f73b commit c8f27ea

File tree

13 files changed

+784
-132
lines changed

13 files changed

+784
-132
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ DynamoDB Adapter currently supports the following DynamoDB data types
6060
| `NS` (number set) | `ARRAY<FLOAT64>` |
6161
| `BS` (binary set) | `ARRAY<BYTES(MAX)>` |
6262
| `L` (List Type) | `JSON` |
63+
| `M` (Map Type) | `JSON` |
64+
65+
Note: Map and List datatypes does not support the Set datatypes.
6366

6467
## Configuration
6568

api/v1/condition.go

Lines changed: 57 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737

3838
var operations = map[string]string{"SET": "(?i) SET ", "DELETE": "(?i) DELETE ", "ADD": "(?i) ADD ", "REMOVE": "(?i) REMOVE "}
3939
var byteSliceType = reflect.TypeOf([]byte(nil))
40+
var defaultLevel int16 = 1
4041
var (
4142
listRegex = regexp.MustCompile(`list_append\(([^,]+),\s*([^\)]+)\)`)
4243
listIndexRegex = regexp.MustCompile(`(\w+)\[(\d+)\]`)
@@ -421,7 +422,7 @@ func parseUpdateExpresstion(actionValue string) *models.UpdateExpressionConditio
421422
return expr
422423
}
423424

424-
func performOperation(ctx context.Context, action string, actionValue string, updateAtrr models.UpdateAttr, oldRes map[string]interface{}) (map[string]interface{}, map[string]interface{}, error) {
425+
func performOperation(ctx context.Context, action string, actionValue string, updateAtrr models.UpdateAttr, oldRes map[string]interface{}, spannerRow map[string]interface{}) (map[string]interface{}, map[string]interface{}, error) {
425426
switch {
426427
case action == "DELETE":
427428
// perform delete
@@ -432,12 +433,12 @@ func performOperation(ctx context.Context, action string, actionValue string, up
432433
if strings.Contains(actionValue, "list_append") {
433434
// parse list_append operation here
434435
m, expr := parseActionValue(actionValue, updateAtrr, false, oldRes)
435-
res, err := services.Put(ctx, updateAtrr.TableName, m, expr, updateAtrr.ConditionExpression, updateAtrr.ExpressionAttributeMap, oldRes)
436+
res, err := services.Put(ctx, updateAtrr.TableName, m, expr, updateAtrr.ConditionExpression, updateAtrr.ExpressionAttributeMap, oldRes, spannerRow)
436437
return res, m, err
437438
}
438439
// Update data in table
439440
m, expr := parseActionValue(actionValue, updateAtrr, false, oldRes)
440-
res, err := services.Put(ctx, updateAtrr.TableName, m, expr, updateAtrr.ConditionExpression, updateAtrr.ExpressionAttributeMap, oldRes)
441+
res, err := services.Put(ctx, updateAtrr.TableName, m, expr, updateAtrr.ConditionExpression, updateAtrr.ExpressionAttributeMap, oldRes, spannerRow)
441442
return res, m, err
442443
case action == "ADD":
443444
// Add data in table
@@ -457,8 +458,9 @@ func performOperation(ctx context.Context, action string, actionValue string, up
457458
func UpdateExpression(ctx context.Context, updateAtrr models.UpdateAttr) (interface{}, error) {
458459
updateAtrr.ExpressionAttributeNames = ChangeColumnToSpannerExpressionName(updateAtrr.TableName, updateAtrr.ExpressionAttributeNames)
459460
var oldRes map[string]interface{}
461+
var spannerRow map[string]interface{}
460462
if updateAtrr.ReturnValues != "NONE" {
461-
oldRes, _ = services.GetWithProjection(ctx, updateAtrr.TableName, updateAtrr.PrimaryKeyMap, "", nil)
463+
oldRes, spannerRow, _ = services.GetWithProjection(ctx, updateAtrr.TableName, updateAtrr.PrimaryKeyMap, "", nil)
462464
}
463465
var resp map[string]interface{}
464466
var actVal = make(map[string]interface{})
@@ -470,7 +472,7 @@ func UpdateExpression(ctx context.Context, updateAtrr models.UpdateAttr) (interf
470472
m := extractOperations(updateAtrr.UpdateExpression)
471473

472474
for k, v := range m {
473-
res, acVal, err := performOperation(ctx, k, v, updateAtrr, oldRes)
475+
res, acVal, err := performOperation(ctx, k, v, updateAtrr, oldRes, spannerRow)
474476
resp = res
475477
er = err
476478
for k, v := range acVal {
@@ -680,7 +682,7 @@ func ChangeColumnToSpanner(obj map[string]interface{}) map[string]interface{} {
680682
return rs
681683
}
682684

683-
func convertFrom(a *dynamodb.AttributeValue, tableName string) interface{} {
685+
func convertFrom(a *dynamodb.AttributeValue, tableName string, level int16) interface{} {
684686
if a.S != nil {
685687
return *a.S
686688
}
@@ -715,15 +717,20 @@ func convertFrom(a *dynamodb.AttributeValue, tableName string) interface{} {
715717
if a.M != nil {
716718
m := make(map[string]interface{})
717719
for k, v := range a.M {
718-
m[k] = convertFrom(v, tableName)
720+
if level == 0 {
721+
level = 2
722+
} else {
723+
level++
724+
}
725+
m[k] = convertFrom(v, tableName, level)
719726
}
720727
return m
721728
}
722729

723730
if a.L != nil {
724731
l := make([]interface{}, len(a.L))
725732
for index, v := range a.L {
726-
l[index] = convertFrom(v, tableName)
733+
l[index] = convertFrom(v, tableName, 0)
727734
}
728735
return l
729736
}
@@ -732,46 +739,58 @@ func convertFrom(a *dynamodb.AttributeValue, tableName string) interface{} {
732739
return a.B
733740
}
734741
if a.SS != nil {
735-
uniqueStrings := make(map[string]struct{})
736-
for _, v := range a.SS {
737-
uniqueStrings[*v] = struct{}{}
738-
}
742+
if level > 1 {
743+
panic("The Map and List types do not support String Set (SS) values. Please use the List type instead.")
744+
} else {
745+
uniqueStrings := make(map[string]struct{})
746+
for _, v := range a.SS {
747+
uniqueStrings[*v] = struct{}{}
748+
}
739749

740-
// Convert map keys to a slice
741-
l := make([]string, 0, len(uniqueStrings))
742-
for str := range uniqueStrings {
743-
l = append(l, str)
750+
// Convert map keys to a slice
751+
l := make([]string, 0, len(uniqueStrings))
752+
for str := range uniqueStrings {
753+
l = append(l, str)
754+
}
755+
return l
744756
}
745-
746-
return l
747757
}
748758
if a.NS != nil {
749-
l := []float64{}
750-
numberMap := make(map[string]struct{})
751-
for _, v := range a.NS {
752-
if _, exists := numberMap[*v]; !exists {
753-
numberMap[*v] = struct{}{}
754-
n, err := strconv.ParseFloat(*v, 64)
755-
if err != nil {
756-
panic(fmt.Sprintf("Invalid number in NS: %s", *v))
759+
if level > 1 {
760+
panic("The Map and List types do not support Number Set (NS) values. Please use the List type instead.")
761+
} else {
762+
l := []float64{}
763+
numberMap := make(map[string]struct{})
764+
for _, v := range a.NS {
765+
if _, exists := numberMap[*v]; !exists {
766+
numberMap[*v] = struct{}{}
767+
n, err := strconv.ParseFloat(*v, 64)
768+
if err != nil {
769+
panic(fmt.Sprintf("Invalid number in NS: %s", *v))
770+
}
771+
l = append(l, n)
757772
}
758-
l = append(l, n)
759773
}
774+
return l
760775
}
761-
return l
762776
}
763777
if a.BS != nil {
764-
// Handle Binary Set
765-
binarySet := [][]byte{}
766-
binaryMap := make(map[string]struct{})
767-
for _, v := range a.BS {
768-
key := string(v)
769-
if _, exists := binaryMap[key]; !exists {
770-
binaryMap[key] = struct{}{}
771-
binarySet = append(binarySet, v)
778+
if level > 1 {
779+
panic("The Map and List types do not support Binary Set (BS) values. Please use the List type instead.")
780+
} else {
781+
// Handle Binary Set
782+
binarySet := [][]byte{}
783+
binaryMap := make(map[string]struct{})
784+
for _, v := range a.BS {
785+
key := string(v)
786+
if _, exists := binaryMap[key]; !exists {
787+
binaryMap[key] = struct{}{}
788+
binarySet = append(binarySet, v)
789+
}
772790
}
791+
return binarySet
773792
}
774-
return binarySet
793+
775794
}
776795
panic(fmt.Sprintf("%#v is not a supported dynamodb.AttributeValue", a))
777796
}
@@ -807,7 +826,7 @@ func ConvertFromMap(item map[string]*dynamodb.AttributeValue, v interface{}, tab
807826

808827
m := make(map[string]interface{})
809828
for k, v := range item {
810-
m[k] = convertFrom(v, tableName)
829+
m[k] = convertFrom(v, tableName, defaultLevel)
811830
}
812831

813832
if isTyped(reflect.TypeOf(v)) {

api/v1/db.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,11 @@ func put(ctx context.Context, tableName string, putObj map[string]interface{}, e
147147
pKey := tableConf.PartitionKey
148148
var oldResp map[string]interface{}
149149

150-
oldResp, err = storage.GetStorageInstance().SpannerGet(ctx, tableName, putObj[pKey], putObj[sKey], nil)
150+
oldResp, spannerRow, err := storage.GetStorageInstance().SpannerGet(ctx, tableName, putObj[pKey], putObj[sKey], nil)
151151
if err != nil {
152152
return nil, err
153153
}
154-
res, err := services.Put(ctx, tableName, putObj, nil, conditionExp, expressionAttr, oldResp)
154+
res, err := services.Put(ctx, tableName, putObj, nil, conditionExp, expressionAttr, oldResp, spannerRow)
155155
if err != nil {
156156
return nil, err
157157
}
@@ -212,7 +212,6 @@ func queryResponse(query models.Query, c *gin.Context) {
212212
c.JSON(errors.HTTPResponse(err, "LastEvaluatedKeyChangeError"))
213213
}
214214
}
215-
216215
c.JSON(http.StatusOK, finalResult)
217216
} else {
218217
c.JSON(errors.HTTPResponse(err, query))
@@ -291,7 +290,7 @@ func GetItemMeta(c *gin.Context) {
291290
return
292291
}
293292
getItemMeta.ExpressionAttributeNames = ChangeColumnToSpannerExpressionName(getItemMeta.TableName, getItemMeta.ExpressionAttributeNames)
294-
res, rowErr := services.GetWithProjection(c.Request.Context(), getItemMeta.TableName, getItemMeta.PrimaryKeyMap, getItemMeta.ProjectionExpression, getItemMeta.ExpressionAttributeNames)
293+
res, _, rowErr := services.GetWithProjection(c.Request.Context(), getItemMeta.TableName, getItemMeta.PrimaryKeyMap, getItemMeta.ProjectionExpression, getItemMeta.ExpressionAttributeNames)
295294
if rowErr == nil {
296295
changedColumns := ChangeResponseToOriginalColumns(getItemMeta.TableName, res)
297296
output, err := ChangeMaptoDynamoMap(changedColumns)
@@ -432,7 +431,7 @@ func DeleteItem(c *gin.Context) {
432431
deleteItem.ConditionExpression = strings.ReplaceAll(deleteItem.ConditionExpression, k, v)
433432
}
434433

435-
oldRes, _ := services.GetWithProjection(c.Request.Context(), deleteItem.TableName, deleteItem.PrimaryKeyMap, "", nil)
434+
oldRes, _, _ := services.GetWithProjection(c.Request.Context(), deleteItem.TableName, deleteItem.PrimaryKeyMap, "", nil)
436435
err := services.Delete(c.Request.Context(), deleteItem.TableName, deleteItem.PrimaryKeyMap, deleteItem.ConditionExpression, deleteItem.ExpressionAttributeMap, nil)
437436
if err == nil {
438437
output, _ := ChangeMaptoDynamoMap(ChangeResponseToOriginalColumns(deleteItem.TableName, oldRes))
@@ -618,6 +617,15 @@ func BatchWriteItem(c *gin.Context) {
618617
if err != nil {
619618
for _, v := range value {
620619
if v.PutReq.Item != nil {
620+
if unprocessedBatchWriteItems.UnprocessedItems == nil {
621+
unprocessedBatchWriteItems.UnprocessedItems = make(map[string][]models.BatchWriteSubItems) // Adjust type as needed
622+
}
623+
624+
// Ensure that the specific key's slice is initialized
625+
if _, exists := unprocessedBatchWriteItems.UnprocessedItems[key]; !exists {
626+
unprocessedBatchWriteItems.UnprocessedItems[key] = []models.BatchWriteSubItems{} // Instantiate the slice
627+
}
628+
621629
unprocessedBatchWriteItems.UnprocessedItems[key] = append(unprocessedBatchWriteItems.UnprocessedItems[key], v)
622630
}
623631
}
@@ -659,7 +667,7 @@ func batchUpdateItems(con context.Context, batchMetaUpdate models.BatchMetaUpdat
659667
if err != nil {
660668
return err
661669
}
662-
err = services.BatchPut(con, batchMetaUpdate.TableName, batchMetaUpdate.ArrAttrMap)
670+
err = services.BatchPut(con, batchMetaUpdate.TableName, batchMetaUpdate.ArrAttrMap, nil)
663671
if err != nil {
664672
return err
665673
}

config-files/init.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ var (
5151
spannerIndexName STRING(MAX),
5252
actualTable STRING(MAX),
5353
spannerDataType STRING(MAX)
54-
) PRIMARY KEY (tableName, column);`
54+
) PRIMARY KEY (tableName, column)`
5555
)
5656

5757
// Entry point for the application
@@ -146,7 +146,7 @@ func generateTableDDL(tableName string, client *dynamodb.Client, limit int32) st
146146
}())
147147

148148
return fmt.Sprintf(
149-
"CREATE TABLE %s (\n\t%s\n) %s;",
149+
"CREATE TABLE %s (\n\t%s\n) %s",
150150
tableName, strings.Join(columns, ",\n\t"), primaryKey,
151151
)
152152
}
@@ -162,8 +162,8 @@ func generateInsertQueries(tableName string, client *dynamodb.Client, limit int3
162162
for column, dataType := range attributes {
163163
spannerDataType := utils.ConvertDynamoTypeToSpannerType(dataType)
164164
query := fmt.Sprintf(
165-
`INSERT INTO dynamodb_adapter_table_ddl
166-
(column, tableName, dynamoDataType, originalColumn, partitionKey, sortKey, spannerIndexName, actualTable, spannerDataType)
165+
`INSERT INTO dynamodb_adapter_table_ddl
166+
(column, tableName, dynamoDataType, originalColumn, partitionKey, sortKey, spannerIndexName, actualTable, spannerDataType)
167167
VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s');`,
168168
column, tableName, dataType, column, partitionKey, sortKey, column, tableName, spannerDataType,
169169
)

config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ spanner:
33
instance_id: ${INSTANCE_ID}
44
database_name: ${DATABASE_ID}
55
query_limit: ${QUERY_LIMIT}
6-
dynamo_query_limit: ${DYNAMODB_QUERY_LIMIT}
6+
dynamo_query_limit: ${DYNAMODB_QUERY_LIMIT}

0 commit comments

Comments
 (0)