From 26c4750566d2af61f92ebd91d2b4da7e20f29c06 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Sun, 9 Nov 2025 23:58:32 +0800 Subject: [PATCH 1/6] support sink and source configs in pulsar functions --- docs/resources/function.md | 4 +- pulsar/resource_pulsar_function.go | 258 ++++++++++++++++++- pulsar/resource_pulsar_function_unit_test.go | 53 ++++ 3 files changed, 305 insertions(+), 10 deletions(-) create mode 100644 pulsar/resource_pulsar_function_unit_test.go diff --git a/docs/resources/function.md b/docs/resources/function.md index f4d99de1..0f81ce47 100644 --- a/docs/resources/function.md +++ b/docs/resources/function.md @@ -51,6 +51,8 @@ description: |- - `retain_key_ordering` (Boolean) Whether to retain key ordering when the function is restarted after failure. - `retain_ordering` (Boolean) Whether to retain ordering when the function is restarted after failure. - `secrets` (String) The secrets of the function. +- `sink_config` (Map of String) Sink configuration key/values serialized into custom_runtime_options. +- `source_config` (Map of String) Source configuration key/values serialized into custom_runtime_options. - `skip_to_latest` (Boolean) Whether to skip to the latest position when the function is restarted after failure. - `subscription_name` (String) The subscription name of the function. - `subscription_position` (String) The subscription position of the function. Possible values are `LATEST`, `EARLIEST`, and `CUSTOM`. @@ -61,5 +63,3 @@ description: |- ### Read-Only - `id` (String) The ID of this resource. - - diff --git a/pulsar/resource_pulsar_function.go b/pulsar/resource_pulsar_function.go index 26767216..0f18e818 100644 --- a/pulsar/resource_pulsar_function.go +++ b/pulsar/resource_pulsar_function.go @@ -21,6 +21,8 @@ import ( "context" "encoding/json" "fmt" + "math" + "strconv" "strings" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" @@ -70,6 +72,13 @@ const ( resourceFunctionRAMKey = "ram_mb" resourceFunctionDiskKey = "disk_mb" resourceFunctionUserConfig = "user_config" + resourceFunctionSinkConfigKey = "sink_config" + resourceFunctionSourceConfigKey = "source_config" +) + +const ( + runtimeOptionSinkConfigKey = "sinkConfig" + runtimeOptionSourceConfigKey = "sourceConfig" ) var resourceFunctionDescriptions = make(map[string]string) @@ -114,6 +123,8 @@ func init() { resourceFunctionRAMKey: "The RAM that need to be allocated per function instance", resourceFunctionDiskKey: "The disk that need to be allocated per function instance", resourceFunctionUserConfig: "User-defined config key/values", + resourceFunctionSinkConfigKey: "Sink configuration key/values serialized into custom_runtime_options.", + resourceFunctionSourceConfigKey: "Source configuration key/values serialized into custom_runtime_options.", } } @@ -374,6 +385,20 @@ func resourcePulsarFunction() *schema.Resource { Description: resourceFunctionDescriptions[resourceFunctionUserConfig], Elem: &schema.Schema{Type: schema.TypeString}, }, + resourceFunctionSinkConfigKey: { + Type: schema.TypeMap, + Optional: true, + Computed: true, + Description: resourceFunctionDescriptions[resourceFunctionSinkConfigKey], + Elem: &schema.Schema{Type: schema.TypeString}, + }, + resourceFunctionSourceConfigKey: { + Type: schema.TypeMap, + Optional: true, + Computed: true, + Description: resourceFunctionDescriptions[resourceFunctionSourceConfigKey], + Elem: &schema.Schema{Type: schema.TypeString}, + }, }, } } @@ -638,8 +663,12 @@ func marshalFunctionConfig(d *schema.ResourceData) (*utils.FunctionConfig, error functionConfig.CustomSchemaOutputs = stringMap } - if inter, ok := d.GetOk(resourceFunctionCustomRuntimeOptionsKey); ok { - functionConfig.CustomRuntimeOptions = inter.(string) + customRuntimeOptions, err := buildFunctionCustomRuntimeOptions(d) + if err != nil { + return nil, err + } + if customRuntimeOptions != "" { + functionConfig.CustomRuntimeOptions = customRuntimeOptions } if inter, ok := d.GetOk(resourceFunctionSecretsKey); ok { @@ -886,17 +915,50 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso } if functionConfig.CustomRuntimeOptions != "" { - orig, ok := d.GetOk(resourceFunctionCustomRuntimeOptionsKey) - if ok { - s, err := ignoreServerSetCustomRuntimeOptions(orig.(string), functionConfig.CustomRuntimeOptions) - if err != nil { + sanitizedOptions, sinkConfig, sinkConfigPresent, sourceConfig, sourceConfigPresent, err := splitFunctionCustomRuntimeOptions(functionConfig.CustomRuntimeOptions) + if err != nil { + return err + } + + if sinkConfigPresent { + if err = d.Set(resourceFunctionSinkConfigKey, sinkConfig); err != nil { + return err + } + } else { + if err = d.Set(resourceFunctionSinkConfigKey, nil); err != nil { return err } - err = d.Set(resourceFunctionCustomRuntimeOptionsKey, s) - if err != nil { + } + + if sourceConfigPresent { + if err = d.Set(resourceFunctionSourceConfigKey, sourceConfig); err != nil { + return err + } + } else { + if err = d.Set(resourceFunctionSourceConfigKey, nil); err != nil { return err } } + + if orig, ok := d.GetOk(resourceFunctionCustomRuntimeOptionsKey); ok { + valueToSet := sanitizedOptions + if origStr := orig.(string); origStr != "" && sanitizedOptions != "" { + valueToSet, err = ignoreServerSetCustomRuntimeOptions(origStr, sanitizedOptions) + if err != nil { + return err + } + } + if err = d.Set(resourceFunctionCustomRuntimeOptionsKey, valueToSet); err != nil { + return err + } + } + } else { + if err := d.Set(resourceFunctionSinkConfigKey, nil); err != nil { + return err + } + if err := d.Set(resourceFunctionSourceConfigKey, nil); err != nil { + return err + } } if len(functionConfig.Secrets) != 0 { @@ -940,3 +1002,183 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso return nil } + +func buildFunctionCustomRuntimeOptions(d *schema.ResourceData) (string, error) { + var base string + if inter, ok := d.GetOk(resourceFunctionCustomRuntimeOptionsKey); ok { + base = inter.(string) + } + + sinkConfig, sinkConfigSet := expandFunctionRuntimeConfig(d, resourceFunctionSinkConfigKey) + sourceConfig, sourceConfigSet := expandFunctionRuntimeConfig(d, resourceFunctionSourceConfigKey) + + if !sinkConfigSet && !sourceConfigSet { + return base, nil + } + + updates := make([]runtimeConfigUpdate, 0, 2) + if sinkConfigSet { + updates = append(updates, runtimeConfigUpdate{ + key: runtimeOptionSinkConfigKey, + config: sinkConfig, + }) + } + + if sourceConfigSet { + updates = append(updates, runtimeConfigUpdate{ + key: runtimeOptionSourceConfigKey, + config: sourceConfig, + }) + } + + return mergeFunctionCustomRuntimeOptions(base, updates...) +} + +func expandFunctionRuntimeConfig(d *schema.ResourceData, schemaKey string) (map[string]interface{}, bool) { + inter, ok := d.GetOkExists(schemaKey) + if !ok { + return nil, false + } + + if inter == nil { + return map[string]interface{}{}, true + } + + rawMap := inter.(map[string]interface{}) + config := make(map[string]interface{}, len(rawMap)) + for key, value := range rawMap { + config[key] = value + } + + return config, true +} + +type runtimeConfigUpdate struct { + key string + config map[string]interface{} +} + +func mergeFunctionCustomRuntimeOptions(base string, updates ...runtimeConfigUpdate) (string, error) { + if len(updates) == 0 { + return base, nil + } + + trimmed := strings.TrimSpace(base) + runtimeOptions := make(map[string]interface{}) + if trimmed != "" { + if err := json.Unmarshal([]byte(trimmed), &runtimeOptions); err != nil { + return "", errors.Wrap(err, "cannot unmarshal custom_runtime_options") + } + } + + for _, update := range updates { + delete(runtimeOptions, update.key) + if len(update.config) > 0 { + runtimeOptions[update.key] = update.config + } + } + + if len(runtimeOptions) == 0 { + return "", nil + } + + b, err := json.Marshal(runtimeOptions) + if err != nil { + return "", errors.Wrap(err, "cannot marshal custom_runtime_options") + } + + return string(b), nil +} + +func splitFunctionCustomRuntimeOptions(raw string) (string, map[string]interface{}, bool, map[string]interface{}, bool, error) { + trimmed := strings.TrimSpace(raw) + if trimmed == "" { + return "", nil, false, nil, false, nil + } + + runtimeOptions := make(map[string]interface{}) + if err := json.Unmarshal([]byte(trimmed), &runtimeOptions); err != nil { + return "", nil, false, nil, false, errors.Wrap(err, "cannot unmarshal custom_runtime_options from Pulsar") + } + + sinkConfig, sinkPresent, err := extractRuntimeConfig(runtimeOptions, runtimeOptionSinkConfigKey) + if err != nil { + return "", nil, false, nil, false, err + } + + sourceConfig, sourcePresent, err := extractRuntimeConfig(runtimeOptions, runtimeOptionSourceConfigKey) + if err != nil { + return "", nil, false, nil, false, err + } + + sanitized := "" + if len(runtimeOptions) > 0 { + b, err := json.Marshal(runtimeOptions) + if err != nil { + return "", nil, false, nil, false, errors.Wrap(err, "cannot marshal custom_runtime_options") + } + sanitized = string(b) + } + + return sanitized, sinkConfig, sinkPresent, sourceConfig, sourcePresent, nil +} + +func extractRuntimeConfig(runtimeOptions map[string]interface{}, runtimeKey string) (map[string]interface{}, bool, error) { + raw, ok := runtimeOptions[runtimeKey] + if !ok { + return nil, false, nil + } + + delete(runtimeOptions, runtimeKey) + configState := map[string]interface{}{} + if raw == nil { + return configState, true, nil + } + + configMap, ok := raw.(map[string]interface{}) + if !ok { + return nil, false, fmt.Errorf("%s in custom_runtime_options must be a JSON object", runtimeKey) + } + + flattened, err := flattenFunctionRuntimeConfig(configMap) + if err != nil { + return nil, false, err + } + + return flattened, true, nil +} + +func flattenFunctionRuntimeConfig(input map[string]interface{}) (map[string]interface{}, error) { + config := make(map[string]interface{}, len(input)) + for key, value := range input { + stringValue, err := stringifyRuntimeConfigValue(value) + if err != nil { + return nil, err + } + config[key] = stringValue + } + + return config, nil +} + +func stringifyRuntimeConfigValue(value interface{}) (string, error) { + switch v := value.(type) { + case string: + return v, nil + case bool: + return strconv.FormatBool(v), nil + case float64: + if math.Trunc(v) == v { + return strconv.FormatInt(int64(v), 10), nil + } + return strconv.FormatFloat(v, 'f', -1, 64), nil + case nil: + return "", nil + default: + b, err := json.Marshal(v) + if err != nil { + return "", err + } + return string(b), nil + } +} diff --git a/pulsar/resource_pulsar_function_unit_test.go b/pulsar/resource_pulsar_function_unit_test.go new file mode 100644 index 00000000..2d4aec45 --- /dev/null +++ b/pulsar/resource_pulsar_function_unit_test.go @@ -0,0 +1,53 @@ +package pulsar + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMergeFunctionCustomRuntimeOptions(t *testing.T) { + base := `{"foo":"bar","sinkConfig":{"old":"value"},"sourceConfig":{"keep":"me"}}` + sinkConfig := map[string]interface{}{"new": "value"} + sourceConfig := map[string]interface{}{} + + merged, err := mergeFunctionCustomRuntimeOptions(base, + runtimeConfigUpdate{key: runtimeOptionSinkConfigKey, config: sinkConfig}, + runtimeConfigUpdate{key: runtimeOptionSourceConfigKey, config: sourceConfig}, + ) + require.NoError(t, err) + + var result map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(merged), &result)) + + assert.Equal(t, "bar", result["foo"]) + assert.Equal(t, map[string]interface{}{"new": "value"}, result["sinkConfig"]) + _, hasSource := result["sourceConfig"] + assert.False(t, hasSource) +} + +func TestSplitFunctionCustomRuntimeOptions(t *testing.T) { + raw := `{"foo":"bar","sinkConfig":{"alpha":"1","beta":true},"sourceConfig":{"gamma":2}}` + + sanitized, sinkConfig, sinkPresent, sourceConfig, sourcePresent, err := splitFunctionCustomRuntimeOptions(raw) + require.NoError(t, err) + assert.True(t, sinkPresent) + assert.Equal(t, map[string]interface{}{"alpha": "1", "beta": "true"}, sinkConfig) + assert.True(t, sourcePresent) + assert.Equal(t, map[string]interface{}{"gamma": "2"}, sourceConfig) + assert.JSONEq(t, `{"foo":"bar"}`, sanitized) +} + +func TestSplitFunctionCustomRuntimeOptionsWithoutSinkConfig(t *testing.T) { + raw := `{"foo":"bar"}` + + sanitized, sinkConfig, sinkPresent, sourceConfig, sourcePresent, err := splitFunctionCustomRuntimeOptions(raw) + require.NoError(t, err) + assert.False(t, sinkPresent) + assert.Nil(t, sinkConfig) + assert.False(t, sourcePresent) + assert.Nil(t, sourceConfig) + assert.JSONEq(t, `{"foo":"bar"}`, sanitized) +} From 2979d9bbb792e007e9def4fcbf06f6e8a4ae5ad1 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Mon, 10 Nov 2025 00:11:44 +0800 Subject: [PATCH 2/6] fix ci --- pulsar/resource_pulsar_function.go | 211 ++++++++++++++++--- pulsar/resource_pulsar_function_unit_test.go | 16 +- 2 files changed, 192 insertions(+), 35 deletions(-) diff --git a/pulsar/resource_pulsar_function.go b/pulsar/resource_pulsar_function.go index 0f18e818..4b200d3f 100644 --- a/pulsar/resource_pulsar_function.go +++ b/pulsar/resource_pulsar_function.go @@ -74,11 +74,39 @@ const ( resourceFunctionUserConfig = "user_config" resourceFunctionSinkConfigKey = "sink_config" resourceFunctionSourceConfigKey = "source_config" + resourceFunctionSinkConfigTypeKey = "sink_type" + resourceFunctionSourceConfigTypeKey = "source_type" + resourceFunctionRuntimeConfigConfigsKey = "configs" ) const ( - runtimeOptionSinkConfigKey = "sinkConfig" - runtimeOptionSourceConfigKey = "sourceConfig" + runtimeOptionSinkConfigKey = "sinkConfig" + runtimeOptionSourceConfigKey = "sourceConfig" + runtimeOptionSinkConfigTypeField = "sinkType" + runtimeOptionSourceConfigTypeField = "sourceType" + runtimeOptionConfigsKey = "configs" +) + +type runtimeConfigDefinition struct { + schemaKey string + typeSchemaKey string + runtimeKey string + runtimeTypeKey string +} + +var ( + sinkRuntimeConfigDefinition = runtimeConfigDefinition{ + schemaKey: resourceFunctionSinkConfigKey, + typeSchemaKey: resourceFunctionSinkConfigTypeKey, + runtimeKey: runtimeOptionSinkConfigKey, + runtimeTypeKey: runtimeOptionSinkConfigTypeField, + } + sourceRuntimeConfigDefinition = runtimeConfigDefinition{ + schemaKey: resourceFunctionSourceConfigKey, + typeSchemaKey: resourceFunctionSourceConfigTypeKey, + runtimeKey: runtimeOptionSourceConfigKey, + runtimeTypeKey: runtimeOptionSourceConfigTypeField, + } ) var resourceFunctionDescriptions = make(map[string]string) @@ -386,18 +414,52 @@ func resourcePulsarFunction() *schema.Resource { Elem: &schema.Schema{Type: schema.TypeString}, }, resourceFunctionSinkConfigKey: { - Type: schema.TypeMap, + Type: schema.TypeList, Optional: true, Computed: true, + MaxItems: 1, Description: resourceFunctionDescriptions[resourceFunctionSinkConfigKey], - Elem: &schema.Schema{Type: schema.TypeString}, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + resourceFunctionSinkConfigTypeKey: { + Type: schema.TypeString, + Optional: true, + Computed: true, + Description: "Sink implementation identifier.", + }, + resourceFunctionRuntimeConfigConfigsKey: { + Type: schema.TypeMap, + Optional: true, + Computed: true, + Description: "Sink-specific key/value options.", + Elem: &schema.Schema{Type: schema.TypeString}, + }, + }, + }, }, resourceFunctionSourceConfigKey: { - Type: schema.TypeMap, + Type: schema.TypeList, Optional: true, Computed: true, + MaxItems: 1, Description: resourceFunctionDescriptions[resourceFunctionSourceConfigKey], - Elem: &schema.Schema{Type: schema.TypeString}, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + resourceFunctionSourceConfigTypeKey: { + Type: schema.TypeString, + Optional: true, + Computed: true, + Description: "Source implementation identifier.", + }, + resourceFunctionRuntimeConfigConfigsKey: { + Type: schema.TypeMap, + Optional: true, + Computed: true, + Description: "Source-specific key/value options.", + Elem: &schema.Schema{Type: schema.TypeString}, + }, + }, + }, }, }, } @@ -921,7 +983,11 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso } if sinkConfigPresent { - if err = d.Set(resourceFunctionSinkConfigKey, sinkConfig); err != nil { + sinkState, err := flattenRuntimeConfigForState(sinkConfig, sinkRuntimeConfigDefinition) + if err != nil { + return err + } + if err = d.Set(resourceFunctionSinkConfigKey, sinkState); err != nil { return err } } else { @@ -931,7 +997,11 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso } if sourceConfigPresent { - if err = d.Set(resourceFunctionSourceConfigKey, sourceConfig); err != nil { + sourceState, err := flattenRuntimeConfigForState(sourceConfig, sourceRuntimeConfigDefinition) + if err != nil { + return err + } + if err = d.Set(resourceFunctionSourceConfigKey, sourceState); err != nil { return err } } else { @@ -1009,8 +1079,14 @@ func buildFunctionCustomRuntimeOptions(d *schema.ResourceData) (string, error) { base = inter.(string) } - sinkConfig, sinkConfigSet := expandFunctionRuntimeConfig(d, resourceFunctionSinkConfigKey) - sourceConfig, sourceConfigSet := expandFunctionRuntimeConfig(d, resourceFunctionSourceConfigKey) + sinkConfig, sinkConfigSet, err := expandFunctionRuntimeConfig(d, sinkRuntimeConfigDefinition) + if err != nil { + return "", err + } + sourceConfig, sourceConfigSet, err := expandFunctionRuntimeConfig(d, sourceRuntimeConfigDefinition) + if err != nil { + return "", err + } if !sinkConfigSet && !sourceConfigSet { return base, nil @@ -1034,23 +1110,58 @@ func buildFunctionCustomRuntimeOptions(d *schema.ResourceData) (string, error) { return mergeFunctionCustomRuntimeOptions(base, updates...) } -func expandFunctionRuntimeConfig(d *schema.ResourceData, schemaKey string) (map[string]interface{}, bool) { - inter, ok := d.GetOkExists(schemaKey) +func expandFunctionRuntimeConfig(d *schema.ResourceData, def runtimeConfigDefinition) (map[string]interface{}, bool, error) { + inter, ok := d.GetOkExists(def.schemaKey) if !ok { - return nil, false + return nil, false, nil } if inter == nil { - return map[string]interface{}{}, true + return map[string]interface{}{}, true, nil + } + + list, ok := inter.([]interface{}) + if !ok { + return nil, false, fmt.Errorf("%s must be a list", def.schemaKey) + } + + if len(list) == 0 || list[0] == nil { + return map[string]interface{}{}, true, nil + } + + item, ok := list[0].(map[string]interface{}) + if !ok { + return nil, false, fmt.Errorf("%s must contain an object", def.schemaKey) + } + + runtimeConfig := make(map[string]interface{}) + if typeValue, ok := item[def.typeSchemaKey].(string); ok && typeValue != "" { + runtimeConfig[def.runtimeTypeKey] = typeValue } - rawMap := inter.(map[string]interface{}) - config := make(map[string]interface{}, len(rawMap)) - for key, value := range rawMap { - config[key] = value + if configsRaw, ok := item[resourceFunctionRuntimeConfigConfigsKey].(map[string]interface{}); ok && len(configsRaw) > 0 { + runtimeConfig[runtimeOptionConfigsKey] = normalizeRuntimeConfigSchemaMap(configsRaw) + } + + return runtimeConfig, true, nil +} + +func normalizeRuntimeConfigSchemaMap(input map[string]interface{}) map[string]interface{} { + normalized := make(map[string]interface{}, len(input)) + for key, value := range input { + switch v := value.(type) { + case string: + normalized[key] = v + case fmt.Stringer: + normalized[key] = v.String() + case nil: + normalized[key] = "" + default: + normalized[key] = fmt.Sprintf("%v", v) + } } - return config, true + return normalized } type runtimeConfigUpdate struct { @@ -1101,12 +1212,12 @@ func splitFunctionCustomRuntimeOptions(raw string) (string, map[string]interface return "", nil, false, nil, false, errors.Wrap(err, "cannot unmarshal custom_runtime_options from Pulsar") } - sinkConfig, sinkPresent, err := extractRuntimeConfig(runtimeOptions, runtimeOptionSinkConfigKey) + sinkConfig, sinkPresent, err := extractRuntimeConfig(runtimeOptions, sinkRuntimeConfigDefinition) if err != nil { return "", nil, false, nil, false, err } - sourceConfig, sourcePresent, err := extractRuntimeConfig(runtimeOptions, runtimeOptionSourceConfigKey) + sourceConfig, sourcePresent, err := extractRuntimeConfig(runtimeOptions, sourceRuntimeConfigDefinition) if err != nil { return "", nil, false, nil, false, err } @@ -1123,13 +1234,13 @@ func splitFunctionCustomRuntimeOptions(raw string) (string, map[string]interface return sanitized, sinkConfig, sinkPresent, sourceConfig, sourcePresent, nil } -func extractRuntimeConfig(runtimeOptions map[string]interface{}, runtimeKey string) (map[string]interface{}, bool, error) { - raw, ok := runtimeOptions[runtimeKey] +func extractRuntimeConfig(runtimeOptions map[string]interface{}, def runtimeConfigDefinition) (map[string]interface{}, bool, error) { + raw, ok := runtimeOptions[def.runtimeKey] if !ok { return nil, false, nil } - delete(runtimeOptions, runtimeKey) + delete(runtimeOptions, def.runtimeKey) configState := map[string]interface{}{} if raw == nil { return configState, true, nil @@ -1137,18 +1248,31 @@ func extractRuntimeConfig(runtimeOptions map[string]interface{}, runtimeKey stri configMap, ok := raw.(map[string]interface{}) if !ok { - return nil, false, fmt.Errorf("%s in custom_runtime_options must be a JSON object", runtimeKey) + return nil, false, fmt.Errorf("%s in custom_runtime_options must be a JSON object", def.runtimeKey) } - flattened, err := flattenFunctionRuntimeConfig(configMap) - if err != nil { - return nil, false, err + if typeVal, ok := configMap[def.runtimeTypeKey]; ok { + if typeStr, ok := typeVal.(string); ok && typeStr != "" { + configState[def.runtimeTypeKey] = typeStr + } + } + + if configsRaw, ok := configMap[runtimeOptionConfigsKey]; ok { + configsMap, ok := configsRaw.(map[string]interface{}) + if !ok { + return nil, false, fmt.Errorf("configs in %s must be a JSON object", def.runtimeKey) + } + flattened, err := stringifyRuntimeConfigMap(configsMap) + if err != nil { + return nil, false, err + } + configState[runtimeOptionConfigsKey] = flattened } - return flattened, true, nil + return configState, true, nil } -func flattenFunctionRuntimeConfig(input map[string]interface{}) (map[string]interface{}, error) { +func stringifyRuntimeConfigMap(input map[string]interface{}) (map[string]interface{}, error) { config := make(map[string]interface{}, len(input)) for key, value := range input { stringValue, err := stringifyRuntimeConfigValue(value) @@ -1182,3 +1306,30 @@ func stringifyRuntimeConfigValue(value interface{}) (string, error) { return string(b), nil } } + +func flattenRuntimeConfigForState(config map[string]interface{}, def runtimeConfigDefinition) ([]interface{}, error) { + if config == nil || len(config) == 0 { + return nil, nil + } + + state := make(map[string]interface{}) + if typeVal, ok := config[def.runtimeTypeKey]; ok { + if typeStr, ok := typeVal.(string); ok && typeStr != "" { + state[def.typeSchemaKey] = typeStr + } + } + + if configsVal, ok := config[runtimeOptionConfigsKey]; ok { + configsMap, ok := configsVal.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("configs in %s must be a map of strings", def.runtimeKey) + } + state[resourceFunctionRuntimeConfigConfigsKey] = configsMap + } + + if len(state) == 0 { + return nil, nil + } + + return []interface{}{state}, nil +} diff --git a/pulsar/resource_pulsar_function_unit_test.go b/pulsar/resource_pulsar_function_unit_test.go index 2d4aec45..1dc1edd8 100644 --- a/pulsar/resource_pulsar_function_unit_test.go +++ b/pulsar/resource_pulsar_function_unit_test.go @@ -10,7 +10,7 @@ import ( func TestMergeFunctionCustomRuntimeOptions(t *testing.T) { base := `{"foo":"bar","sinkConfig":{"old":"value"},"sourceConfig":{"keep":"me"}}` - sinkConfig := map[string]interface{}{"new": "value"} + sinkConfig := map[string]interface{}{runtimeOptionSinkConfigTypeField: "kafka", runtimeOptionConfigsKey: map[string]interface{}{"new": "value"}} sourceConfig := map[string]interface{}{} merged, err := mergeFunctionCustomRuntimeOptions(base, @@ -23,20 +23,26 @@ func TestMergeFunctionCustomRuntimeOptions(t *testing.T) { require.NoError(t, json.Unmarshal([]byte(merged), &result)) assert.Equal(t, "bar", result["foo"]) - assert.Equal(t, map[string]interface{}{"new": "value"}, result["sinkConfig"]) + assert.Equal(t, map[string]interface{}{runtimeOptionSinkConfigTypeField: "kafka", runtimeOptionConfigsKey: map[string]interface{}{"new": "value"}}, result["sinkConfig"]) _, hasSource := result["sourceConfig"] assert.False(t, hasSource) } func TestSplitFunctionCustomRuntimeOptions(t *testing.T) { - raw := `{"foo":"bar","sinkConfig":{"alpha":"1","beta":true},"sourceConfig":{"gamma":2}}` + raw := `{"foo":"bar","sinkConfig":{"sinkType":"kafka","configs":{"alpha":"1","beta":true}},"sourceConfig":{"sourceType":"kinesis","configs":{"gamma":2}}}` sanitized, sinkConfig, sinkPresent, sourceConfig, sourcePresent, err := splitFunctionCustomRuntimeOptions(raw) require.NoError(t, err) assert.True(t, sinkPresent) - assert.Equal(t, map[string]interface{}{"alpha": "1", "beta": "true"}, sinkConfig) + assert.Equal(t, map[string]interface{}{ + runtimeOptionSinkConfigTypeField: "kafka", + runtimeOptionConfigsKey: map[string]interface{}{"alpha": "1", "beta": "true"}, + }, sinkConfig) assert.True(t, sourcePresent) - assert.Equal(t, map[string]interface{}{"gamma": "2"}, sourceConfig) + assert.Equal(t, map[string]interface{}{ + runtimeOptionSourceConfigTypeField: "kinesis", + runtimeOptionConfigsKey: map[string]interface{}{"gamma": "2"}, + }, sourceConfig) assert.JSONEq(t, `{"foo":"bar"}`, sanitized) } From 895f9b9387fd9179ac929eaa04c6b8770d731e52 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Mon, 10 Nov 2025 00:26:36 +0800 Subject: [PATCH 3/6] fix lint --- docs/resources/function.md | 8 ++++++-- pulsar/resource_pulsar_function.go | 19 ++++++++++++------- pulsar/resource_pulsar_function_unit_test.go | 11 +++++++++-- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/docs/resources/function.md b/docs/resources/function.md index 0f81ce47..e39ecab7 100644 --- a/docs/resources/function.md +++ b/docs/resources/function.md @@ -51,8 +51,12 @@ description: |- - `retain_key_ordering` (Boolean) Whether to retain key ordering when the function is restarted after failure. - `retain_ordering` (Boolean) Whether to retain ordering when the function is restarted after failure. - `secrets` (String) The secrets of the function. -- `sink_config` (Map of String) Sink configuration key/values serialized into custom_runtime_options. -- `source_config` (Map of String) Source configuration key/values serialized into custom_runtime_options. +- `sink_config` (Block List) Sink configuration key/values serialized into custom_runtime_options. + - `configs` (Map of String) Sink-specific key/value pairs. Non-string values are stringified during state reconciliation. + - `sink_type` (String) Sink implementation identifier, for example `kafka`. +- `source_config` (Block List) Source configuration key/values serialized into custom_runtime_options. + - `configs` (Map of String) Source-specific key/value pairs. Non-string values are stringified during state reconciliation. + - `source_type` (String) Source implementation identifier, for example `kinesis`. - `skip_to_latest` (Boolean) Whether to skip to the latest position when the function is restarted after failure. - `subscription_name` (String) The subscription name of the function. - `subscription_position` (String) The subscription position of the function. Possible values are `LATEST`, `EARLIEST`, and `CUSTOM`. diff --git a/pulsar/resource_pulsar_function.go b/pulsar/resource_pulsar_function.go index 4b200d3f..c14e06f4 100644 --- a/pulsar/resource_pulsar_function.go +++ b/pulsar/resource_pulsar_function.go @@ -977,7 +977,8 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso } if functionConfig.CustomRuntimeOptions != "" { - sanitizedOptions, sinkConfig, sinkConfigPresent, sourceConfig, sourceConfigPresent, err := splitFunctionCustomRuntimeOptions(functionConfig.CustomRuntimeOptions) + sanitizedOptions, sinkConfig, sinkConfigPresent, sourceConfig, sourceConfigPresent, + err := splitFunctionCustomRuntimeOptions(functionConfig.CustomRuntimeOptions) if err != nil { return err } @@ -1110,8 +1111,9 @@ func buildFunctionCustomRuntimeOptions(d *schema.ResourceData) (string, error) { return mergeFunctionCustomRuntimeOptions(base, updates...) } -func expandFunctionRuntimeConfig(d *schema.ResourceData, def runtimeConfigDefinition) (map[string]interface{}, bool, error) { - inter, ok := d.GetOkExists(def.schemaKey) +func expandFunctionRuntimeConfig(d *schema.ResourceData, + def runtimeConfigDefinition) (map[string]interface{}, bool, error) { + inter, ok := d.GetOk(def.schemaKey) if !ok { return nil, false, nil } @@ -1139,7 +1141,8 @@ func expandFunctionRuntimeConfig(d *schema.ResourceData, def runtimeConfigDefini runtimeConfig[def.runtimeTypeKey] = typeValue } - if configsRaw, ok := item[resourceFunctionRuntimeConfigConfigsKey].(map[string]interface{}); ok && len(configsRaw) > 0 { + if configsRaw, ok := item[resourceFunctionRuntimeConfigConfigsKey].(map[string]interface{}); ok && + len(configsRaw) > 0 { runtimeConfig[runtimeOptionConfigsKey] = normalizeRuntimeConfigSchemaMap(configsRaw) } @@ -1201,7 +1204,8 @@ func mergeFunctionCustomRuntimeOptions(base string, updates ...runtimeConfigUpda return string(b), nil } -func splitFunctionCustomRuntimeOptions(raw string) (string, map[string]interface{}, bool, map[string]interface{}, bool, error) { +func splitFunctionCustomRuntimeOptions(raw string) ( + string, map[string]interface{}, bool, map[string]interface{}, bool, error) { trimmed := strings.TrimSpace(raw) if trimmed == "" { return "", nil, false, nil, false, nil @@ -1234,7 +1238,8 @@ func splitFunctionCustomRuntimeOptions(raw string) (string, map[string]interface return sanitized, sinkConfig, sinkPresent, sourceConfig, sourcePresent, nil } -func extractRuntimeConfig(runtimeOptions map[string]interface{}, def runtimeConfigDefinition) (map[string]interface{}, bool, error) { +func extractRuntimeConfig(runtimeOptions map[string]interface{}, + def runtimeConfigDefinition) (map[string]interface{}, bool, error) { raw, ok := runtimeOptions[def.runtimeKey] if !ok { return nil, false, nil @@ -1308,7 +1313,7 @@ func stringifyRuntimeConfigValue(value interface{}) (string, error) { } func flattenRuntimeConfigForState(config map[string]interface{}, def runtimeConfigDefinition) ([]interface{}, error) { - if config == nil || len(config) == 0 { + if len(config) == 0 { return nil, nil } diff --git a/pulsar/resource_pulsar_function_unit_test.go b/pulsar/resource_pulsar_function_unit_test.go index 1dc1edd8..a56a1f15 100644 --- a/pulsar/resource_pulsar_function_unit_test.go +++ b/pulsar/resource_pulsar_function_unit_test.go @@ -10,7 +10,10 @@ import ( func TestMergeFunctionCustomRuntimeOptions(t *testing.T) { base := `{"foo":"bar","sinkConfig":{"old":"value"},"sourceConfig":{"keep":"me"}}` - sinkConfig := map[string]interface{}{runtimeOptionSinkConfigTypeField: "kafka", runtimeOptionConfigsKey: map[string]interface{}{"new": "value"}} + sinkConfig := map[string]interface{}{ + runtimeOptionSinkConfigTypeField: "kafka", + runtimeOptionConfigsKey: map[string]interface{}{"new": "value"}, + } sourceConfig := map[string]interface{}{} merged, err := mergeFunctionCustomRuntimeOptions(base, @@ -23,12 +26,16 @@ func TestMergeFunctionCustomRuntimeOptions(t *testing.T) { require.NoError(t, json.Unmarshal([]byte(merged), &result)) assert.Equal(t, "bar", result["foo"]) - assert.Equal(t, map[string]interface{}{runtimeOptionSinkConfigTypeField: "kafka", runtimeOptionConfigsKey: map[string]interface{}{"new": "value"}}, result["sinkConfig"]) + assert.Equal(t, map[string]interface{}{ + runtimeOptionSinkConfigTypeField: "kafka", + runtimeOptionConfigsKey: map[string]interface{}{"new": "value"}, + }, result["sinkConfig"]) _, hasSource := result["sourceConfig"] assert.False(t, hasSource) } func TestSplitFunctionCustomRuntimeOptions(t *testing.T) { + ///nolint:lll raw := `{"foo":"bar","sinkConfig":{"sinkType":"kafka","configs":{"alpha":"1","beta":true}},"sourceConfig":{"sourceType":"kinesis","configs":{"gamma":2}}}` sanitized, sinkConfig, sinkPresent, sourceConfig, sourcePresent, err := splitFunctionCustomRuntimeOptions(raw) From 66e8946c462fab6094b576e65f4431b7c0db2983 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 13 Nov 2025 15:32:57 +0800 Subject: [PATCH 4/6] fix drift --- pulsar/resource_pulsar_function.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar/resource_pulsar_function.go b/pulsar/resource_pulsar_function.go index c14e06f4..305b1e16 100644 --- a/pulsar/resource_pulsar_function.go +++ b/pulsar/resource_pulsar_function.go @@ -244,6 +244,7 @@ func resourcePulsarFunction() *schema.Resource { resourceFunctionProcessingGuaranteesKey: { Type: schema.TypeString, Optional: true, + Computed: true, Description: resourceFunctionDescriptions[resourceFunctionProcessingGuaranteesKey], }, resourceFunctionSubscriptionNameKey: { @@ -334,10 +335,12 @@ func resourcePulsarFunction() *schema.Resource { resourceFunctionInputTypeClassNameKey: { Type: schema.TypeString, Optional: true, + Computed: true, Description: resourceFunctionDescriptions[resourceFunctionInputTypeClassNameKey], }, resourceFunctionOutputTypeClassNameKey: { Type: schema.TypeString, + Computed: true, Optional: true, Description: resourceFunctionDescriptions[resourceFunctionOutputTypeClassNameKey], }, From c1e2b15680b6c39a394191ced67e47ffcc628084 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 13 Nov 2025 20:35:01 +0800 Subject: [PATCH 5/6] add package resource --- docs/resources/package.md | 46 +++ pulsar/provider.go | 1 + pulsar/resource_pulsar_package.go | 496 +++++++++++++++++++++++++ pulsar/resource_pulsar_package_test.go | 140 +++++++ 4 files changed, 683 insertions(+) create mode 100644 docs/resources/package.md create mode 100644 pulsar/resource_pulsar_package.go create mode 100644 pulsar/resource_pulsar_package_test.go diff --git a/docs/resources/package.md b/docs/resources/package.md new file mode 100644 index 00000000..5ee0f028 --- /dev/null +++ b/docs/resources/package.md @@ -0,0 +1,46 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "pulsar_package Resource - terraform-provider-pulsar" +subcategory: "" +description: |- + Manages Pulsar packages that store runnable artifacts for functions, sources, and sinks. +--- + +# pulsar_package (Resource) + +Manages Pulsar packages that store runnable artifacts for functions, sources, and sinks. Packages are referenced by +other resources via their fully-qualified URL (`:////@`). The provider uploads +the local archive file, keeps metadata (description, contact, properties) in sync, and tracks the local file checksum +so Terraform detects archive changes. + +> **Important:** The archive file referenced by `path` must exist on the machine running `terraform plan`/`apply` +> because the provider computes the SHA256 checksum to trigger updates when the file contents change. + + +## Schema + +### Required + +- `name` (String) Package name. +- `namespace` (String) Namespace that owns the package. +- `path` (String) Local path to the archive file that should be uploaded. +- `tenant` (String) Tenant that owns the package. +- `type` (String) Package type. Supported values: `function`, `sink`, `source`. +- `version` (String) Package version. Changing the version creates a new package resource. + +### Optional + +- `contact` (String) Contact information stored alongside the package metadata. +- `description` (String) Optional package description stored alongside the package metadata. +- `properties` (Map of String) Additional user-defined metadata properties. + +### Read-Only + +- `create_time` (Number) Server-side package creation timestamp (epoch millis). +- `file_checksum` (String) Checksum reported by Pulsar for the uploaded archive. +- `file_name` (String) Archive file name stored in metadata. +- `file_size` (Number) Archive size reported by Pulsar (bytes). +- `id` (String) The ID of this resource. +- `modification_time` (Number) Server-side package modification timestamp (epoch millis). +- `package_url` (String) Fully-qualified package URL. +- `source_hash` (String) SHA256 checksum of the local archive used to trigger updates when the file changes. diff --git a/pulsar/provider.go b/pulsar/provider.go index 1dad2277..8bce1aad 100644 --- a/pulsar/provider.go +++ b/pulsar/provider.go @@ -166,6 +166,7 @@ func Provider() *schema.Provider { "pulsar_function": resourcePulsarFunction(), "pulsar_subscription": resourcePulsarSubscription(), "pulsar_permission_grant": resourcePulsarPermissionGrant(), + "pulsar_package": resourcePulsarPackage(), }, } diff --git a/pulsar/resource_pulsar_package.go b/pulsar/resource_pulsar_package.go new file mode 100644 index 00000000..e3f50d55 --- /dev/null +++ b/pulsar/resource_pulsar_package.go @@ -0,0 +1,496 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/hashicorp/terraform-plugin-log/tflog" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +const ( + resourcePackageTypeKey = "type" + resourcePackageTenantKey = "tenant" + resourcePackageNamespaceKey = "namespace" + resourcePackageNameKey = "name" + resourcePackageVersionKey = "version" + resourcePackagePathKey = "path" + resourcePackageDescriptionKey = "description" + resourcePackageContactKey = "contact" + resourcePackagePropertiesKey = "properties" + + resourcePackagePackageURLKey = "package_url" + resourcePackageCreateTimeKey = "create_time" + resourcePackageModificationTime = "modification_time" + resourcePackageFileChecksumKey = "file_checksum" + resourcePackageFileSizeKey = "file_size" + resourcePackageFileNameKey = "file_name" + resourcePackageSourceChecksumKey = "source_hash" +) + +const ( + packageMetadataPropertyTenant = "tenant" + packageMetadataPropertyNamespace = "namespace" + packageMetadataPropertyName = "functionName" + packageMetadataPropertyFileName = "fileName" + packageMetadataPropertyFileSize = "fileSize" + packageMetadataPropertyChecksum = "checksum" + packageMetadataPropertyManagedBy = "managedByMeshWorkerService" + packageMetadataPropertyLibDir = "libDir" + packageMetadataManagedByTerraform = "terraform-provider-pulsar" +) + +var reservedPackageMetadataKeys = map[string]struct{}{ + packageMetadataPropertyTenant: {}, + packageMetadataPropertyNamespace: {}, + packageMetadataPropertyName: {}, + packageMetadataPropertyFileName: {}, + packageMetadataPropertyFileSize: {}, + packageMetadataPropertyChecksum: {}, + packageMetadataPropertyManagedBy: {}, + packageMetadataPropertyLibDir: {}, +} + +type packageFileInfo struct { + checksum string + size int64 + name string +} + +func resourcePulsarPackage() *schema.Resource { + return &schema.Resource{ + CreateContext: resourcePulsarPackageCreate, + ReadContext: resourcePulsarPackageRead, + UpdateContext: resourcePulsarPackageUpdate, + DeleteContext: resourcePulsarPackageDelete, + CustomizeDiff: resourcePulsarPackageCustomizeDiff, + Description: "Manages Pulsar packages for functions, sources, and sinks. Packages bundle executable artifacts " + + "that can be referenced by other Pulsar resources. The provider uploads the package archive and keeps the " + + "metadata (description, contact, and custom properties) in sync with Terraform state.", + + Importer: &schema.ResourceImporter{ + StateContext: resourcePulsarPackageImport, + }, + + Schema: map[string]*schema.Schema{ + resourcePackageTypeKey: { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Package type. Supported values: `function`, `sink`, `source`.", + ValidateFunc: validatePackageType, + }, + resourcePackageTenantKey: { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Tenant that owns the package.", + }, + resourcePackageNamespaceKey: { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Namespace that owns the package.", + }, + resourcePackageNameKey: { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Package name.", + }, + resourcePackageVersionKey: { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Package version. Changing the version creates a new package resource.", + }, + resourcePackagePathKey: { + Type: schema.TypeString, + Required: true, + Description: "Local path to the package archive. Changing the file contents will trigger an update.", + }, + resourcePackageDescriptionKey: { + Type: schema.TypeString, + Optional: true, + Description: "Optional package description stored in Pulsar metadata.", + }, + resourcePackageContactKey: { + Type: schema.TypeString, + Optional: true, + Description: "Contact information stored in Pulsar metadata.", + }, + resourcePackagePropertiesKey: { + Type: schema.TypeMap, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Description: "Additional metadata properties. ", + }, + resourcePackagePackageURLKey: { + Type: schema.TypeString, + Computed: true, + Description: "Fully-qualified package URL in the format `:////@`.", + }, + resourcePackageCreateTimeKey: { + Type: schema.TypeInt, + Computed: true, + Description: "Server-side package creation timestamp (epoch millis).", + }, + resourcePackageModificationTime: { + Type: schema.TypeInt, + Computed: true, + Description: "Server-side package modification timestamp (epoch millis).", + }, + resourcePackageFileChecksumKey: { + Type: schema.TypeString, + Computed: true, + Description: "Checksum reported by Pulsar for the uploaded package.", + }, + resourcePackageFileSizeKey: { + Type: schema.TypeInt, + Computed: true, + Description: "Package size reported by Pulsar (in bytes).", + }, + resourcePackageFileNameKey: { + Type: schema.TypeString, + Computed: true, + Description: "Package file name stored in metadata.", + }, + resourcePackageSourceChecksumKey: { + Type: schema.TypeString, + Computed: true, + Description: "SHA256 checksum of the local package file. Used to detect file changes.", + }, + }, + } +} + +func resourcePulsarPackageCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + client := getV3ClientFromMeta(meta).Packages() + packageURL, err := packageURLFromResourceData(d) + if err != nil { + return diag.FromErr(err) + } + + path := d.Get(resourcePackagePathKey).(string) + info, err := calculatePackageFileInfo(path) + if err != nil { + return diag.FromErr(err) + } + + description := d.Get(resourcePackageDescriptionKey).(string) + contact := d.Get(resourcePackageContactKey).(string) + properties := buildPackageMetadataProperties(d, &info) + + if err = client.Upload(packageURL, path, description, contact, properties); err != nil { + return diag.FromErr(err) + } + + d.SetId(packageURL) + _ = d.Set(resourcePackagePackageURLKey, packageURL) + _ = d.Set(resourcePackageSourceChecksumKey, info.checksum) + + return resourcePulsarPackageRead(ctx, d, meta) +} + +func resourcePulsarPackageRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + client := getV3ClientFromMeta(meta).Packages() + + packageURL, err := ensurePackageID(d) + if err != nil { + return diag.FromErr(err) + } + + metadata, err := client.GetMetadata(packageURL) + if err != nil { + if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 { + d.SetId("") + return nil + } + return diag.FromErr(err) + } + + _ = d.Set(resourcePackagePackageURLKey, packageURL) + _ = d.Set(resourcePackageDescriptionKey, metadata.Description) + _ = d.Set(resourcePackageContactKey, metadata.Contact) + _ = d.Set(resourcePackageCreateTimeKey, metadata.CreateTime) + _ = d.Set(resourcePackageModificationTime, metadata.ModificationTime) + + userProps := filterUserManagedProperties(metadata.Properties) + if err = d.Set(resourcePackagePropertiesKey, convertToInterfaceMap(userProps)); err != nil { + return diag.FromErr(err) + } + + if metadata.Properties != nil { + if checksum, ok := metadata.Properties[packageMetadataPropertyChecksum]; ok { + _ = d.Set(resourcePackageFileChecksumKey, checksum) + } + if size, ok := metadata.Properties[packageMetadataPropertyFileSize]; ok { + if parsed, parseErr := strconv.ParseInt(size, 10, 64); parseErr == nil { + _ = d.Set(resourcePackageFileSizeKey, parsed) + } + } + if name, ok := metadata.Properties[packageMetadataPropertyFileName]; ok { + _ = d.Set(resourcePackageFileNameKey, name) + } + } + + setLocalSourceChecksum(ctx, d) + + return nil +} + +func resourcePulsarPackageUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + client := getV3ClientFromMeta(meta).Packages() + packageURL, err := packageURLFromResourceData(d) + if err != nil { + return diag.FromErr(err) + } + + path := d.Get(resourcePackagePathKey).(string) + description := d.Get(resourcePackageDescriptionKey).(string) + contact := d.Get(resourcePackageContactKey).(string) + + reupload := d.HasChange(resourcePackagePathKey) || d.HasChange(resourcePackageSourceChecksumKey) + + if reupload { + info, calcErr := calculatePackageFileInfo(path) + if calcErr != nil { + return diag.FromErr(calcErr) + } + properties := buildPackageMetadataProperties(d, &info) + if err = client.Upload(packageURL, path, description, contact, properties); err != nil { + return diag.FromErr(err) + } + _ = d.Set(resourcePackageSourceChecksumKey, info.checksum) + } else if d.HasChanges(resourcePackageDescriptionKey, resourcePackageContactKey, resourcePackagePropertiesKey) { + properties := buildPackageMetadataProperties(d, nil) + if err = client.UpdateMetadata(packageURL, description, contact, properties); err != nil { + return diag.FromErr(err) + } + } + + return resourcePulsarPackageRead(ctx, d, meta) +} + +func resourcePulsarPackageDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + client := getV3ClientFromMeta(meta).Packages() + + packageURL, err := ensurePackageID(d) + if err != nil { + return diag.FromErr(err) + } + + if err = client.Delete(packageURL); err != nil { + if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 { + d.SetId("") + return nil + } + return diag.FromErr(err) + } + + return nil +} + +func resourcePulsarPackageCustomizeDiff(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error { + path := diff.Get(resourcePackagePathKey).(string) + if path == "" { + return fmt.Errorf("path must be provided") + } + + info, err := calculatePackageFileInfo(path) + if err != nil { + return err + } + + if err = diff.SetNew(resourcePackageSourceChecksumKey, info.checksum); err != nil { + return err + } + + return nil +} + +func resourcePulsarPackageImport(ctx context.Context, d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) { + packageName, err := utils.GetPackageName(d.Id()) + if err != nil { + return nil, err + } + + _ = d.Set(resourcePackageTypeKey, packageName.GetType().String()) + _ = d.Set(resourcePackageTenantKey, packageName.GetTenant()) + _ = d.Set(resourcePackageNamespaceKey, packageName.GetNamespace()) + _ = d.Set(resourcePackageNameKey, packageName.GetName()) + _ = d.Set(resourcePackageVersionKey, packageName.GetVersion()) + + diags := resourcePulsarPackageRead(ctx, d, meta) + if diags.HasError() { + return nil, fmt.Errorf("failed to import %s: %s", d.Id(), diags[0].Summary) + } + + return []*schema.ResourceData{d}, nil +} + +func validatePackageType(val interface{}, key string) (warns []string, errs []error) { + value, ok := val.(string) + if !ok { + errs = append(errs, fmt.Errorf("%s must be a string", key)) + return + } + + switch value { + case utils.PackageTypeFunction.String(), utils.PackageTypeSink.String(), utils.PackageTypeSource.String(): + return + default: + errs = append(errs, fmt.Errorf("%s must be one of %q, %q, or %q", + key, utils.PackageTypeFunction, utils.PackageTypeSink, utils.PackageTypeSource)) + return + } +} + +func ensurePackageID(d *schema.ResourceData) (string, error) { + if d.Id() != "" { + return d.Id(), nil + } + return packageURLFromResourceData(d) +} + +func packageURLFromResourceData(d *schema.ResourceData) (string, error) { + packageType := utils.PackageType(d.Get(resourcePackageTypeKey).(string)) + tenant := d.Get(resourcePackageTenantKey).(string) + namespace := d.Get(resourcePackageNamespaceKey).(string) + name := d.Get(resourcePackageNameKey).(string) + version := d.Get(resourcePackageVersionKey).(string) + + packageName, err := utils.GetPackageNameWithComponents(packageType, tenant, namespace, name, version) + if err != nil { + return "", err + } + + return packageName.String(), nil +} + +func buildPackageMetadataProperties(d *schema.ResourceData, info *packageFileInfo) map[string]string { + props := make(map[string]string) + if raw, ok := d.GetOk(resourcePackagePropertiesKey); ok && raw != nil { + for k, v := range raw.(map[string]interface{}) { + props[k] = v.(string) + } + } + + props[packageMetadataPropertyTenant] = d.Get(resourcePackageTenantKey).(string) + props[packageMetadataPropertyNamespace] = d.Get(resourcePackageNamespaceKey).(string) + props[packageMetadataPropertyName] = d.Get(resourcePackageNameKey).(string) + props[packageMetadataPropertyManagedBy] = packageMetadataManagedByTerraform + + if info != nil { + props[packageMetadataPropertyFileName] = info.name + props[packageMetadataPropertyFileSize] = strconv.FormatInt(info.size, 10) + props[packageMetadataPropertyChecksum] = info.checksum + } else { + if existing := d.Get(resourcePackageFileNameKey).(string); existing != "" { + props[packageMetadataPropertyFileName] = existing + } + if sizeRaw, ok := d.GetOkExists(resourcePackageFileSizeKey); ok { + switch v := sizeRaw.(type) { + case int: + props[packageMetadataPropertyFileSize] = strconv.Itoa(v) + case int64: + props[packageMetadataPropertyFileSize] = strconv.FormatInt(v, 10) + } + } + if checksumRaw, ok := d.GetOkExists(resourcePackageFileChecksumKey); ok { + if checksum, okCast := checksumRaw.(string); okCast && checksum != "" { + props[packageMetadataPropertyChecksum] = checksum + } + } + } + + return props +} + +func calculatePackageFileInfo(path string) (packageFileInfo, error) { + file, err := os.Open(filepath.Clean(path)) + if err != nil { + return packageFileInfo{}, err + } + + defer file.Close() + + hasher := sha256.New() + size, err := io.Copy(hasher, file) + if err != nil { + return packageFileInfo{}, err + } + + return packageFileInfo{ + checksum: hex.EncodeToString(hasher.Sum(nil)), + size: size, + name: filepath.Base(path), + }, nil +} + +func filterUserManagedProperties(in map[string]string) map[string]string { + if len(in) == 0 { + return map[string]string{} + } + + out := make(map[string]string) + for key, value := range in { + if _, reserved := reservedPackageMetadataKeys[key]; reserved { + continue + } + out[key] = value + } + + return out +} + +func convertToInterfaceMap(in map[string]string) map[string]interface{} { + out := make(map[string]interface{}, len(in)) + for k, v := range in { + out[k] = v + } + return out +} + +func setLocalSourceChecksum(ctx context.Context, d *schema.ResourceData) { + path := d.Get(resourcePackagePathKey).(string) + if path == "" { + return + } + + info, err := calculatePackageFileInfo(path) + if err != nil { + tflog.Warn(ctx, "unable to read local package file for checksum", map[string]interface{}{ + "path": path, "error": err.Error(), + }) + return + } + + _ = d.Set(resourcePackageSourceChecksumKey, info.checksum) +} diff --git a/pulsar/resource_pulsar_package_test.go b/pulsar/resource_pulsar_package_test.go new file mode 100644 index 00000000..c4258d02 --- /dev/null +++ b/pulsar/resource_pulsar_package_test.go @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "os" + "path/filepath" + "testing" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +func TestFilterUserManagedProperties(t *testing.T) { + props := map[string]string{ + packageMetadataPropertyTenant: "tenant", + packageMetadataPropertyFileName: "pkg.nar", + "custom": "value", + } + + filtered := filterUserManagedProperties(props) + + if len(filtered) != 1 { + t.Fatalf("expected 1 property, got %d", len(filtered)) + } + + if filtered["custom"] != "value" { + t.Fatalf("expected custom property to be kept") + } +} + +func TestBuildPackageMetadataProperties(t *testing.T) { + resource := resourcePulsarPackage() + data := schema.TestResourceDataRaw(t, resource.Schema, map[string]interface{}{ + resourcePackageTypeKey: "function", + resourcePackageTenantKey: "tenant-a", + resourcePackageNamespaceKey: "ns", + resourcePackageNameKey: "pkg", + resourcePackageVersionKey: "v1", + resourcePackagePathKey: "/tmp/pkg.nar", + resourcePackagePropertiesKey: map[string]interface{}{"custom": "value"}, + }) + + info := &packageFileInfo{ + checksum: "abc", + size: 42, + name: "pkg.nar", + } + + props := buildPackageMetadataProperties(data, info) + + expected := map[string]string{ + packageMetadataPropertyTenant: "tenant-a", + packageMetadataPropertyNamespace: "ns", + packageMetadataPropertyName: "pkg", + packageMetadataPropertyFileName: "pkg.nar", + packageMetadataPropertyFileSize: "42", + packageMetadataPropertyChecksum: "abc", + packageMetadataPropertyManagedBy: packageMetadataManagedByTerraform, + "custom": "value", + } + + for k, v := range expected { + if props[k] != v { + t.Fatalf("expected %s=%s, got %s", k, v, props[k]) + } + } +} + +func TestBuildPackageMetadataPropertiesUsesExistingValues(t *testing.T) { + resource := resourcePulsarPackage() + data := schema.TestResourceDataRaw(t, resource.Schema, map[string]interface{}{ + resourcePackageTypeKey: "function", + resourcePackageTenantKey: "tenant-a", + resourcePackageNamespaceKey: "ns", + resourcePackageNameKey: "pkg", + resourcePackageVersionKey: "v1", + resourcePackagePathKey: "/tmp/pkg.nar", + }) + + _ = data.Set(resourcePackageFileNameKey, "existing.nar") + _ = data.Set(resourcePackageFileSizeKey, 100) + _ = data.Set(resourcePackageFileChecksumKey, "old") + + props := buildPackageMetadataProperties(data, nil) + + if props[packageMetadataPropertyFileName] != "existing.nar" { + t.Fatalf("expected existing file name to be reused, got %s", props[packageMetadataPropertyFileName]) + } + + if props[packageMetadataPropertyFileSize] != "100" { + t.Fatalf("expected existing file size to be reused, got %s", props[packageMetadataPropertyFileSize]) + } + + if props[packageMetadataPropertyChecksum] != "old" { + t.Fatalf("expected existing checksum to be reused, got %s", props[packageMetadataPropertyChecksum]) + } +} + +func TestCalculatePackageFileInfo(t *testing.T) { + dir := t.TempDir() + filePath := filepath.Join(dir, "pkg.nar") + content := []byte("hello pulsar package") + + if err := os.WriteFile(filePath, content, 0o600); err != nil { + t.Fatalf("failed to write temp file: %v", err) + } + + info, err := calculatePackageFileInfo(filePath) + if err != nil { + t.Fatalf("failed to calculate info: %v", err) + } + + if info.size != int64(len(content)) { + t.Fatalf("expected size %d, got %d", len(content), info.size) + } + + if info.name != "pkg.nar" { + t.Fatalf("expected name pkg.nar, got %s", info.name) + } + + expectedChecksum := "032cc2f30d10d8598ae15a53bfbf4e93724d5c70a4b6dd8167bcda59fd955d8f" + if info.checksum != expectedChecksum { + t.Fatalf("expected checksum %s, got %s", expectedChecksum, info.checksum) + } +} From 54c315512c29c98e4893273f3e5613ee9e444311 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 13 Nov 2025 21:07:43 +0800 Subject: [PATCH 6/6] fix lint --- pulsar/resource_pulsar_package.go | 44 +++++++++++++++----------- pulsar/resource_pulsar_package_test.go | 25 +++++++++++++++ 2 files changed, 51 insertions(+), 18 deletions(-) diff --git a/pulsar/resource_pulsar_package.go b/pulsar/resource_pulsar_package.go index e3f50d55..94f49c9d 100644 --- a/pulsar/resource_pulsar_package.go +++ b/pulsar/resource_pulsar_package.go @@ -335,7 +335,8 @@ func resourcePulsarPackageCustomizeDiff(_ context.Context, diff *schema.Resource return nil } -func resourcePulsarPackageImport(ctx context.Context, d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) { +func resourcePulsarPackageImport(ctx context.Context, d *schema.ResourceData, + meta interface{}) ([]*schema.ResourceData, error) { packageName, err := utils.GetPackageName(d.Id()) if err != nil { return nil, err @@ -407,32 +408,39 @@ func buildPackageMetadataProperties(d *schema.ResourceData, info *packageFileInf props[packageMetadataPropertyName] = d.Get(resourcePackageNameKey).(string) props[packageMetadataPropertyManagedBy] = packageMetadataManagedByTerraform + if info == nil { + info = existingPackageFileInfoFromState(d) + } + if info != nil { props[packageMetadataPropertyFileName] = info.name props[packageMetadataPropertyFileSize] = strconv.FormatInt(info.size, 10) props[packageMetadataPropertyChecksum] = info.checksum - } else { - if existing := d.Get(resourcePackageFileNameKey).(string); existing != "" { - props[packageMetadataPropertyFileName] = existing - } - if sizeRaw, ok := d.GetOkExists(resourcePackageFileSizeKey); ok { - switch v := sizeRaw.(type) { - case int: - props[packageMetadataPropertyFileSize] = strconv.Itoa(v) - case int64: - props[packageMetadataPropertyFileSize] = strconv.FormatInt(v, 10) - } - } - if checksumRaw, ok := d.GetOkExists(resourcePackageFileChecksumKey); ok { - if checksum, okCast := checksumRaw.(string); okCast && checksum != "" { - props[packageMetadataPropertyChecksum] = checksum - } - } } return props } +func existingPackageFileInfoFromState(d *schema.ResourceData) *packageFileInfo { + name := d.Get(resourcePackageFileNameKey).(string) + checksum := d.Get(resourcePackageFileChecksumKey).(string) + + var size int64 + if sizeVal, ok := d.Get(resourcePackageFileSizeKey).(int); ok { + size = int64(sizeVal) + } + + if name == "" && checksum == "" && size == 0 { + return nil + } + + return &packageFileInfo{ + name: name, + checksum: checksum, + size: size, + } +} + func calculatePackageFileInfo(path string) (packageFileInfo, error) { file, err := os.Open(filepath.Clean(path)) if err != nil { diff --git a/pulsar/resource_pulsar_package_test.go b/pulsar/resource_pulsar_package_test.go index c4258d02..f5c61fc2 100644 --- a/pulsar/resource_pulsar_package_test.go +++ b/pulsar/resource_pulsar_package_test.go @@ -111,6 +111,31 @@ func TestBuildPackageMetadataPropertiesUsesExistingValues(t *testing.T) { } } +func TestExistingPackageFileInfoFromState(t *testing.T) { + resource := resourcePulsarPackage() + data := schema.TestResourceDataRaw(t, resource.Schema, map[string]interface{}{ + resourcePackageTypeKey: "function", + resourcePackageTenantKey: "tenant-a", + resourcePackageNamespaceKey: "ns", + resourcePackageNameKey: "pkg", + resourcePackageVersionKey: "v1", + resourcePackagePathKey: "/tmp/pkg.nar", + }) + + _ = data.Set(resourcePackageFileNameKey, "pkg.nar") + _ = data.Set(resourcePackageFileSizeKey, 0) + _ = data.Set(resourcePackageFileChecksumKey, "abc") + + info := existingPackageFileInfoFromState(data) + if info == nil { + t.Fatalf("expected file info") + } + + if info.name != "pkg.nar" || info.checksum != "abc" || info.size != 0 { + t.Fatalf("unexpected info %#v", info) + } +} + func TestCalculatePackageFileInfo(t *testing.T) { dir := t.TempDir() filePath := filepath.Join(dir, "pkg.nar")