Skip to content

Commit 854cdb7

Browse files
authored
Merge pull request #44540 from tabito-hara/f-aws_lambda_event_source_mapping-add_schema_registry_config
[Enhancement] r/aws_lambda_event_source_mapping: Add `schema_registry_config` to both MSK and self-managed Kafka event sources
2 parents a73675d + 47a9f25 commit 854cdb7

File tree

4 files changed

+645
-3
lines changed

4 files changed

+645
-3
lines changed

.changelog/44540.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:enhancement
2+
resource/aws_lambda_event_source_mapping: Add `schema_registry_config` configuration blocks to `amazon_managed_kafka_event_source_config` and `self_managed_kafka_event_source_config` blocks
3+
```

internal/service/lambda/event_source_mapping.go

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strings"
1212
"time"
1313

14+
"github.com/YakDriver/regexache"
1415
"github.com/aws/aws-sdk-go-v2/aws"
1516
"github.com/aws/aws-sdk-go-v2/aws/arn"
1617
"github.com/aws/aws-sdk-go-v2/service/lambda"
@@ -61,6 +62,7 @@ func resourceEventSourceMapping() *schema.Resource {
6162
ForceNew: true,
6263
ValidateFunc: validation.StringLenBetween(1, 200),
6364
},
65+
"schema_registry_config": kafkaSchemaRegistryConfigSchema(),
6466
},
6567
},
6668
},
@@ -352,6 +354,7 @@ func resourceEventSourceMapping() *schema.Resource {
352354
ForceNew: true,
353355
ValidateFunc: validation.StringLenBetween(1, 200),
354356
},
357+
"schema_registry_config": kafkaSchemaRegistryConfigSchema(),
355358
},
356359
},
357360
},
@@ -417,6 +420,70 @@ func resourceEventSourceMapping() *schema.Resource {
417420
}
418421
}
419422

423+
func kafkaSchemaRegistryConfigSchema() *schema.Schema {
424+
return &schema.Schema{
425+
Type: schema.TypeList,
426+
Optional: true,
427+
MaxItems: 1,
428+
ForceNew: true,
429+
Elem: &schema.Resource{
430+
Schema: map[string]*schema.Schema{
431+
"access_config": {
432+
Type: schema.TypeSet,
433+
Optional: true,
434+
ForceNew: true,
435+
Elem: &schema.Resource{
436+
Schema: map[string]*schema.Schema{
437+
names.AttrType: {
438+
Type: schema.TypeString,
439+
Optional: true,
440+
ForceNew: true,
441+
ValidateDiagFunc: enum.Validate[awstypes.KafkaSchemaRegistryAuthType](),
442+
},
443+
names.AttrURI: {
444+
Type: schema.TypeString,
445+
Optional: true,
446+
ForceNew: true,
447+
ValidateFunc: verify.ValidARN,
448+
},
449+
},
450+
},
451+
},
452+
"event_record_format": {
453+
Type: schema.TypeString,
454+
Optional: true,
455+
ForceNew: true,
456+
ValidateDiagFunc: enum.Validate[awstypes.SchemaRegistryEventRecordFormat](),
457+
},
458+
"schema_registry_uri": {
459+
Type: schema.TypeString,
460+
Optional: true,
461+
ForceNew: true,
462+
ValidateFunc: validation.All(
463+
validation.StringLenBetween(1, 10000),
464+
validation.StringMatch(regexache.MustCompile(`[a-zA-Z0-9-/*:_+=.@-]*`), "must be ARN or URL of the registry"),
465+
),
466+
},
467+
"schema_validation_config": {
468+
Type: schema.TypeSet,
469+
Optional: true,
470+
ForceNew: true,
471+
Elem: &schema.Resource{
472+
Schema: map[string]*schema.Schema{
473+
"attribute": {
474+
Type: schema.TypeString,
475+
Optional: true,
476+
ForceNew: true,
477+
ValidateDiagFunc: enum.Validate[awstypes.KafkaSchemaValidationAttribute](),
478+
},
479+
},
480+
},
481+
},
482+
},
483+
},
484+
}
485+
}
486+
420487
func resourceEventSourceMappingCreate(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
421488
var diags diag.Diagnostics
422489
conn := meta.(*conns.AWSClient).LambdaClient(ctx)
@@ -1134,6 +1201,9 @@ func expandAmazonManagedKafkaEventSourceConfig(tfMap map[string]any) *awstypes.A
11341201
if v, ok := tfMap["consumer_group_id"].(string); ok && v != "" {
11351202
apiObject.ConsumerGroupId = aws.String(v)
11361203
}
1204+
if v, ok := tfMap["schema_registry_config"].([]any); ok && len(v) > 0 && v[0] != nil {
1205+
apiObject.SchemaRegistryConfig = expandKafkaSchemaRegistryConfig(v[0].(map[string]any))
1206+
}
11371207

11381208
return apiObject
11391209
}
@@ -1149,6 +1219,10 @@ func flattenAmazonManagedKafkaEventSourceConfig(apiObject *awstypes.AmazonManage
11491219
tfMap["consumer_group_id"] = aws.ToString(v)
11501220
}
11511221

1222+
if v := apiObject.SchemaRegistryConfig; v != nil {
1223+
tfMap["schema_registry_config"] = []any{flattenKafkaSchemaRegistryConfig(v)}
1224+
}
1225+
11521226
return tfMap
11531227
}
11541228

@@ -1163,6 +1237,10 @@ func expandSelfManagedKafkaEventSourceConfig(tfMap map[string]any) *awstypes.Sel
11631237
apiObject.ConsumerGroupId = aws.String(v)
11641238
}
11651239

1240+
if v, ok := tfMap["schema_registry_config"].([]any); ok && len(v) > 0 && v[0] != nil {
1241+
apiObject.SchemaRegistryConfig = expandKafkaSchemaRegistryConfig(v[0].(map[string]any))
1242+
}
1243+
11661244
return apiObject
11671245
}
11681246

@@ -1177,6 +1255,10 @@ func flattenSelfManagedKafkaEventSourceConfig(apiObject *awstypes.SelfManagedKaf
11771255
tfMap["consumer_group_id"] = aws.ToString(v)
11781256
}
11791257

1258+
if v := apiObject.SchemaRegistryConfig; v != nil {
1259+
tfMap["schema_registry_config"] = []any{flattenKafkaSchemaRegistryConfig(v)}
1260+
}
1261+
11801262
return tfMap
11811263
}
11821264

@@ -1442,3 +1524,137 @@ func flattenEventSourceMappingMetricsConfig(apiObject *awstypes.EventSourceMappi
14421524

14431525
return tfMap
14441526
}
1527+
1528+
func expandKafkaSchemaRegistryConfig(tfMap map[string]any) *awstypes.KafkaSchemaRegistryConfig {
1529+
if tfMap == nil {
1530+
return nil
1531+
}
1532+
1533+
apiObject := &awstypes.KafkaSchemaRegistryConfig{}
1534+
1535+
if v, ok := tfMap["access_config"].(*schema.Set); ok && v != nil && v.Len() > 0 {
1536+
apiObject.AccessConfigs = expandKafkaSchemaRegistryAccessConfig(v.List())
1537+
}
1538+
1539+
if v, ok := tfMap["event_record_format"].(string); ok && v != "" {
1540+
apiObject.EventRecordFormat = awstypes.SchemaRegistryEventRecordFormat(v)
1541+
}
1542+
1543+
if v, ok := tfMap["schema_registry_uri"].(string); ok && v != "" {
1544+
apiObject.SchemaRegistryURI = aws.String(v)
1545+
}
1546+
1547+
if v, ok := tfMap["schema_validation_config"].(*schema.Set); ok && v != nil && v.Len() > 0 {
1548+
apiObject.SchemaValidationConfigs = expandKafkaSchemaValidationConfig(v.List())
1549+
}
1550+
1551+
return apiObject
1552+
}
1553+
1554+
func expandKafkaSchemaRegistryAccessConfig(tfList []any) []awstypes.KafkaSchemaRegistryAccessConfig {
1555+
if len(tfList) == 0 || tfList[0] == nil {
1556+
return nil
1557+
}
1558+
1559+
var apiObjects []awstypes.KafkaSchemaRegistryAccessConfig
1560+
1561+
for _, tfMapRaw := range tfList {
1562+
tfMap, ok := tfMapRaw.(map[string]any)
1563+
1564+
if !ok {
1565+
continue
1566+
}
1567+
1568+
apiObject := awstypes.KafkaSchemaRegistryAccessConfig{}
1569+
if v, ok := tfMap[names.AttrType].(string); ok && v != "" {
1570+
apiObject.Type = awstypes.KafkaSchemaRegistryAuthType(v)
1571+
}
1572+
if v, ok := tfMap[names.AttrURI].(string); ok && v != "" {
1573+
apiObject.URI = aws.String(v)
1574+
}
1575+
1576+
apiObjects = append(apiObjects, apiObject)
1577+
}
1578+
return apiObjects
1579+
}
1580+
1581+
func expandKafkaSchemaValidationConfig(tfList []any) []awstypes.KafkaSchemaValidationConfig {
1582+
if len(tfList) == 0 || tfList[0] == nil {
1583+
return nil
1584+
}
1585+
1586+
var apiObjects []awstypes.KafkaSchemaValidationConfig
1587+
1588+
for _, tfMapRaw := range tfList {
1589+
tfMap, ok := tfMapRaw.(map[string]any)
1590+
1591+
if !ok {
1592+
continue
1593+
}
1594+
1595+
apiObject := awstypes.KafkaSchemaValidationConfig{}
1596+
if v, ok := tfMap["attribute"].(string); ok && v != "" {
1597+
apiObject.Attribute = awstypes.KafkaSchemaValidationAttribute(v)
1598+
}
1599+
1600+
apiObjects = append(apiObjects, apiObject)
1601+
}
1602+
return apiObjects
1603+
}
1604+
1605+
func flattenKafkaSchemaRegistryConfig(apiObject *awstypes.KafkaSchemaRegistryConfig) map[string]any {
1606+
if apiObject == nil {
1607+
return nil
1608+
}
1609+
1610+
tfMap := map[string]any{}
1611+
if v := apiObject.AccessConfigs; len(v) > 0 {
1612+
tfMap["access_config"] = flattenKafkaSchemaRegistryAccessConfig(v)
1613+
}
1614+
if v := apiObject.EventRecordFormat; v != "" {
1615+
tfMap["event_record_format"] = v
1616+
}
1617+
if v := apiObject.SchemaRegistryURI; v != nil {
1618+
tfMap["schema_registry_uri"] = aws.ToString(v)
1619+
}
1620+
if v := apiObject.SchemaValidationConfigs; len(v) > 0 {
1621+
tfMap["schema_validation_config"] = flattenSchemaValidationConfig(v)
1622+
}
1623+
1624+
return tfMap
1625+
}
1626+
1627+
func flattenKafkaSchemaRegistryAccessConfig(apiObjects []awstypes.KafkaSchemaRegistryAccessConfig) []any {
1628+
if len(apiObjects) == 0 {
1629+
return nil
1630+
}
1631+
1632+
var tfList []any
1633+
for _, apiObject := range apiObjects {
1634+
tfMap := map[string]any{}
1635+
if v := apiObject.Type; v != "" {
1636+
tfMap[names.AttrType] = v
1637+
}
1638+
if v := apiObject.URI; v != nil {
1639+
tfMap[names.AttrURI] = aws.ToString(v)
1640+
}
1641+
tfList = append(tfList, tfMap)
1642+
}
1643+
return tfList
1644+
}
1645+
1646+
func flattenSchemaValidationConfig(apiObjects []awstypes.KafkaSchemaValidationConfig) []any {
1647+
if len(apiObjects) == 0 {
1648+
return nil
1649+
}
1650+
1651+
var tfList []any
1652+
for _, apiObject := range apiObjects {
1653+
tfMap := map[string]any{}
1654+
if v := apiObject.Attribute; v != "" {
1655+
tfMap["attribute"] = v
1656+
}
1657+
tfList = append(tfList, tfMap)
1658+
}
1659+
return tfList
1660+
}

0 commit comments

Comments
 (0)