diff --git a/docs/resources/function.md b/docs/resources/function.md index f4d99de..e39ecab 100644 --- a/docs/resources/function.md +++ b/docs/resources/function.md @@ -51,6 +51,12 @@ description: |- - `retain_key_ordering` (Boolean) Whether to retain key ordering when the function is restarted after failure. - `retain_ordering` (Boolean) Whether to retain ordering when the function is restarted after failure. - `secrets` (String) The secrets of the function. +- `sink_config` (Block List) Sink configuration key/values serialized into custom_runtime_options. + - `configs` (Map of String) Sink-specific key/value pairs. Non-string values are stringified during state reconciliation. + - `sink_type` (String) Sink implementation identifier, for example `kafka`. +- `source_config` (Block List) Source configuration key/values serialized into custom_runtime_options. + - `configs` (Map of String) Source-specific key/value pairs. Non-string values are stringified during state reconciliation. + - `source_type` (String) Source implementation identifier, for example `kinesis`. - `skip_to_latest` (Boolean) Whether to skip to the latest position when the function is restarted after failure. - `subscription_name` (String) The subscription name of the function. - `subscription_position` (String) The subscription position of the function. Possible values are `LATEST`, `EARLIEST`, and `CUSTOM`. @@ -61,5 +67,3 @@ description: |- ### Read-Only - `id` (String) The ID of this resource. - - diff --git a/docs/resources/package.md b/docs/resources/package.md new file mode 100644 index 0000000..5ee0f02 --- /dev/null +++ b/docs/resources/package.md @@ -0,0 +1,46 @@ +--- +# generated by https://github.com/hashicorp/terraform-plugin-docs +page_title: "pulsar_package Resource - terraform-provider-pulsar" +subcategory: "" +description: |- + Manages Pulsar packages that store runnable artifacts for functions, sources, and sinks. +--- + +# pulsar_package (Resource) + +Manages Pulsar packages that store runnable artifacts for functions, sources, and sinks. Packages are referenced by +other resources via their fully-qualified URL (`:////@`). The provider uploads +the local archive file, keeps metadata (description, contact, properties) in sync, and tracks the local file checksum +so Terraform detects archive changes. + +> **Important:** The archive file referenced by `path` must exist on the machine running `terraform plan`/`apply` +> because the provider computes the SHA256 checksum to trigger updates when the file contents change. + + +## Schema + +### Required + +- `name` (String) Package name. +- `namespace` (String) Namespace that owns the package. +- `path` (String) Local path to the archive file that should be uploaded. +- `tenant` (String) Tenant that owns the package. +- `type` (String) Package type. Supported values: `function`, `sink`, `source`. +- `version` (String) Package version. Changing the version creates a new package resource. + +### Optional + +- `contact` (String) Contact information stored alongside the package metadata. +- `description` (String) Optional package description stored alongside the package metadata. +- `properties` (Map of String) Additional user-defined metadata properties. + +### Read-Only + +- `create_time` (Number) Server-side package creation timestamp (epoch millis). +- `file_checksum` (String) Checksum reported by Pulsar for the uploaded archive. +- `file_name` (String) Archive file name stored in metadata. +- `file_size` (Number) Archive size reported by Pulsar (bytes). +- `id` (String) The ID of this resource. +- `modification_time` (Number) Server-side package modification timestamp (epoch millis). +- `package_url` (String) Fully-qualified package URL. +- `source_hash` (String) SHA256 checksum of the local archive used to trigger updates when the file changes. diff --git a/pulsar/provider.go b/pulsar/provider.go index 1dad227..8bce1aa 100644 --- a/pulsar/provider.go +++ b/pulsar/provider.go @@ -166,6 +166,7 @@ func Provider() *schema.Provider { "pulsar_function": resourcePulsarFunction(), "pulsar_subscription": resourcePulsarSubscription(), "pulsar_permission_grant": resourcePulsarPermissionGrant(), + "pulsar_package": resourcePulsarPackage(), }, } diff --git a/pulsar/resource_pulsar_function.go b/pulsar/resource_pulsar_function.go index 2676721..305b1e1 100644 --- a/pulsar/resource_pulsar_function.go +++ b/pulsar/resource_pulsar_function.go @@ -21,6 +21,8 @@ import ( "context" "encoding/json" "fmt" + "math" + "strconv" "strings" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" @@ -70,6 +72,41 @@ const ( resourceFunctionRAMKey = "ram_mb" resourceFunctionDiskKey = "disk_mb" resourceFunctionUserConfig = "user_config" + resourceFunctionSinkConfigKey = "sink_config" + resourceFunctionSourceConfigKey = "source_config" + resourceFunctionSinkConfigTypeKey = "sink_type" + resourceFunctionSourceConfigTypeKey = "source_type" + resourceFunctionRuntimeConfigConfigsKey = "configs" +) + +const ( + runtimeOptionSinkConfigKey = "sinkConfig" + runtimeOptionSourceConfigKey = "sourceConfig" + runtimeOptionSinkConfigTypeField = "sinkType" + runtimeOptionSourceConfigTypeField = "sourceType" + runtimeOptionConfigsKey = "configs" +) + +type runtimeConfigDefinition struct { + schemaKey string + typeSchemaKey string + runtimeKey string + runtimeTypeKey string +} + +var ( + sinkRuntimeConfigDefinition = runtimeConfigDefinition{ + schemaKey: resourceFunctionSinkConfigKey, + typeSchemaKey: resourceFunctionSinkConfigTypeKey, + runtimeKey: runtimeOptionSinkConfigKey, + runtimeTypeKey: runtimeOptionSinkConfigTypeField, + } + sourceRuntimeConfigDefinition = runtimeConfigDefinition{ + schemaKey: resourceFunctionSourceConfigKey, + typeSchemaKey: resourceFunctionSourceConfigTypeKey, + runtimeKey: runtimeOptionSourceConfigKey, + runtimeTypeKey: runtimeOptionSourceConfigTypeField, + } ) var resourceFunctionDescriptions = make(map[string]string) @@ -114,6 +151,8 @@ func init() { resourceFunctionRAMKey: "The RAM that need to be allocated per function instance", resourceFunctionDiskKey: "The disk that need to be allocated per function instance", resourceFunctionUserConfig: "User-defined config key/values", + resourceFunctionSinkConfigKey: "Sink configuration key/values serialized into custom_runtime_options.", + resourceFunctionSourceConfigKey: "Source configuration key/values serialized into custom_runtime_options.", } } @@ -205,6 +244,7 @@ func resourcePulsarFunction() *schema.Resource { resourceFunctionProcessingGuaranteesKey: { Type: schema.TypeString, Optional: true, + Computed: true, Description: resourceFunctionDescriptions[resourceFunctionProcessingGuaranteesKey], }, resourceFunctionSubscriptionNameKey: { @@ -295,10 +335,12 @@ func resourcePulsarFunction() *schema.Resource { resourceFunctionInputTypeClassNameKey: { Type: schema.TypeString, Optional: true, + Computed: true, Description: resourceFunctionDescriptions[resourceFunctionInputTypeClassNameKey], }, resourceFunctionOutputTypeClassNameKey: { Type: schema.TypeString, + Computed: true, Optional: true, Description: resourceFunctionDescriptions[resourceFunctionOutputTypeClassNameKey], }, @@ -374,6 +416,54 @@ func resourcePulsarFunction() *schema.Resource { Description: resourceFunctionDescriptions[resourceFunctionUserConfig], Elem: &schema.Schema{Type: schema.TypeString}, }, + resourceFunctionSinkConfigKey: { + Type: schema.TypeList, + Optional: true, + Computed: true, + MaxItems: 1, + Description: resourceFunctionDescriptions[resourceFunctionSinkConfigKey], + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + resourceFunctionSinkConfigTypeKey: { + Type: schema.TypeString, + Optional: true, + Computed: true, + Description: "Sink implementation identifier.", + }, + resourceFunctionRuntimeConfigConfigsKey: { + Type: schema.TypeMap, + Optional: true, + Computed: true, + Description: "Sink-specific key/value options.", + Elem: &schema.Schema{Type: schema.TypeString}, + }, + }, + }, + }, + resourceFunctionSourceConfigKey: { + Type: schema.TypeList, + Optional: true, + Computed: true, + MaxItems: 1, + Description: resourceFunctionDescriptions[resourceFunctionSourceConfigKey], + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + resourceFunctionSourceConfigTypeKey: { + Type: schema.TypeString, + Optional: true, + Computed: true, + Description: "Source implementation identifier.", + }, + resourceFunctionRuntimeConfigConfigsKey: { + Type: schema.TypeMap, + Optional: true, + Computed: true, + Description: "Source-specific key/value options.", + Elem: &schema.Schema{Type: schema.TypeString}, + }, + }, + }, + }, }, } } @@ -638,8 +728,12 @@ func marshalFunctionConfig(d *schema.ResourceData) (*utils.FunctionConfig, error functionConfig.CustomSchemaOutputs = stringMap } - if inter, ok := d.GetOk(resourceFunctionCustomRuntimeOptionsKey); ok { - functionConfig.CustomRuntimeOptions = inter.(string) + customRuntimeOptions, err := buildFunctionCustomRuntimeOptions(d) + if err != nil { + return nil, err + } + if customRuntimeOptions != "" { + functionConfig.CustomRuntimeOptions = customRuntimeOptions } if inter, ok := d.GetOk(resourceFunctionSecretsKey); ok { @@ -886,16 +980,58 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso } if functionConfig.CustomRuntimeOptions != "" { - orig, ok := d.GetOk(resourceFunctionCustomRuntimeOptionsKey) - if ok { - s, err := ignoreServerSetCustomRuntimeOptions(orig.(string), functionConfig.CustomRuntimeOptions) + sanitizedOptions, sinkConfig, sinkConfigPresent, sourceConfig, sourceConfigPresent, + err := splitFunctionCustomRuntimeOptions(functionConfig.CustomRuntimeOptions) + if err != nil { + return err + } + + if sinkConfigPresent { + sinkState, err := flattenRuntimeConfigForState(sinkConfig, sinkRuntimeConfigDefinition) if err != nil { return err } - err = d.Set(resourceFunctionCustomRuntimeOptionsKey, s) + if err = d.Set(resourceFunctionSinkConfigKey, sinkState); err != nil { + return err + } + } else { + if err = d.Set(resourceFunctionSinkConfigKey, nil); err != nil { + return err + } + } + + if sourceConfigPresent { + sourceState, err := flattenRuntimeConfigForState(sourceConfig, sourceRuntimeConfigDefinition) if err != nil { return err } + if err = d.Set(resourceFunctionSourceConfigKey, sourceState); err != nil { + return err + } + } else { + if err = d.Set(resourceFunctionSourceConfigKey, nil); err != nil { + return err + } + } + + if orig, ok := d.GetOk(resourceFunctionCustomRuntimeOptionsKey); ok { + valueToSet := sanitizedOptions + if origStr := orig.(string); origStr != "" && sanitizedOptions != "" { + valueToSet, err = ignoreServerSetCustomRuntimeOptions(origStr, sanitizedOptions) + if err != nil { + return err + } + } + if err = d.Set(resourceFunctionCustomRuntimeOptionsKey, valueToSet); err != nil { + return err + } + } + } else { + if err := d.Set(resourceFunctionSinkConfigKey, nil); err != nil { + return err + } + if err := d.Set(resourceFunctionSourceConfigKey, nil); err != nil { + return err } } @@ -940,3 +1076,268 @@ func unmarshalFunctionConfig(functionConfig utils.FunctionConfig, d *schema.Reso return nil } + +func buildFunctionCustomRuntimeOptions(d *schema.ResourceData) (string, error) { + var base string + if inter, ok := d.GetOk(resourceFunctionCustomRuntimeOptionsKey); ok { + base = inter.(string) + } + + sinkConfig, sinkConfigSet, err := expandFunctionRuntimeConfig(d, sinkRuntimeConfigDefinition) + if err != nil { + return "", err + } + sourceConfig, sourceConfigSet, err := expandFunctionRuntimeConfig(d, sourceRuntimeConfigDefinition) + if err != nil { + return "", err + } + + if !sinkConfigSet && !sourceConfigSet { + return base, nil + } + + updates := make([]runtimeConfigUpdate, 0, 2) + if sinkConfigSet { + updates = append(updates, runtimeConfigUpdate{ + key: runtimeOptionSinkConfigKey, + config: sinkConfig, + }) + } + + if sourceConfigSet { + updates = append(updates, runtimeConfigUpdate{ + key: runtimeOptionSourceConfigKey, + config: sourceConfig, + }) + } + + return mergeFunctionCustomRuntimeOptions(base, updates...) +} + +func expandFunctionRuntimeConfig(d *schema.ResourceData, + def runtimeConfigDefinition) (map[string]interface{}, bool, error) { + inter, ok := d.GetOk(def.schemaKey) + if !ok { + return nil, false, nil + } + + if inter == nil { + return map[string]interface{}{}, true, nil + } + + list, ok := inter.([]interface{}) + if !ok { + return nil, false, fmt.Errorf("%s must be a list", def.schemaKey) + } + + if len(list) == 0 || list[0] == nil { + return map[string]interface{}{}, true, nil + } + + item, ok := list[0].(map[string]interface{}) + if !ok { + return nil, false, fmt.Errorf("%s must contain an object", def.schemaKey) + } + + runtimeConfig := make(map[string]interface{}) + if typeValue, ok := item[def.typeSchemaKey].(string); ok && typeValue != "" { + runtimeConfig[def.runtimeTypeKey] = typeValue + } + + if configsRaw, ok := item[resourceFunctionRuntimeConfigConfigsKey].(map[string]interface{}); ok && + len(configsRaw) > 0 { + runtimeConfig[runtimeOptionConfigsKey] = normalizeRuntimeConfigSchemaMap(configsRaw) + } + + return runtimeConfig, true, nil +} + +func normalizeRuntimeConfigSchemaMap(input map[string]interface{}) map[string]interface{} { + normalized := make(map[string]interface{}, len(input)) + for key, value := range input { + switch v := value.(type) { + case string: + normalized[key] = v + case fmt.Stringer: + normalized[key] = v.String() + case nil: + normalized[key] = "" + default: + normalized[key] = fmt.Sprintf("%v", v) + } + } + + return normalized +} + +type runtimeConfigUpdate struct { + key string + config map[string]interface{} +} + +func mergeFunctionCustomRuntimeOptions(base string, updates ...runtimeConfigUpdate) (string, error) { + if len(updates) == 0 { + return base, nil + } + + trimmed := strings.TrimSpace(base) + runtimeOptions := make(map[string]interface{}) + if trimmed != "" { + if err := json.Unmarshal([]byte(trimmed), &runtimeOptions); err != nil { + return "", errors.Wrap(err, "cannot unmarshal custom_runtime_options") + } + } + + for _, update := range updates { + delete(runtimeOptions, update.key) + if len(update.config) > 0 { + runtimeOptions[update.key] = update.config + } + } + + if len(runtimeOptions) == 0 { + return "", nil + } + + b, err := json.Marshal(runtimeOptions) + if err != nil { + return "", errors.Wrap(err, "cannot marshal custom_runtime_options") + } + + return string(b), nil +} + +func splitFunctionCustomRuntimeOptions(raw string) ( + string, map[string]interface{}, bool, map[string]interface{}, bool, error) { + trimmed := strings.TrimSpace(raw) + if trimmed == "" { + return "", nil, false, nil, false, nil + } + + runtimeOptions := make(map[string]interface{}) + if err := json.Unmarshal([]byte(trimmed), &runtimeOptions); err != nil { + return "", nil, false, nil, false, errors.Wrap(err, "cannot unmarshal custom_runtime_options from Pulsar") + } + + sinkConfig, sinkPresent, err := extractRuntimeConfig(runtimeOptions, sinkRuntimeConfigDefinition) + if err != nil { + return "", nil, false, nil, false, err + } + + sourceConfig, sourcePresent, err := extractRuntimeConfig(runtimeOptions, sourceRuntimeConfigDefinition) + if err != nil { + return "", nil, false, nil, false, err + } + + sanitized := "" + if len(runtimeOptions) > 0 { + b, err := json.Marshal(runtimeOptions) + if err != nil { + return "", nil, false, nil, false, errors.Wrap(err, "cannot marshal custom_runtime_options") + } + sanitized = string(b) + } + + return sanitized, sinkConfig, sinkPresent, sourceConfig, sourcePresent, nil +} + +func extractRuntimeConfig(runtimeOptions map[string]interface{}, + def runtimeConfigDefinition) (map[string]interface{}, bool, error) { + raw, ok := runtimeOptions[def.runtimeKey] + if !ok { + return nil, false, nil + } + + delete(runtimeOptions, def.runtimeKey) + configState := map[string]interface{}{} + if raw == nil { + return configState, true, nil + } + + configMap, ok := raw.(map[string]interface{}) + if !ok { + return nil, false, fmt.Errorf("%s in custom_runtime_options must be a JSON object", def.runtimeKey) + } + + if typeVal, ok := configMap[def.runtimeTypeKey]; ok { + if typeStr, ok := typeVal.(string); ok && typeStr != "" { + configState[def.runtimeTypeKey] = typeStr + } + } + + if configsRaw, ok := configMap[runtimeOptionConfigsKey]; ok { + configsMap, ok := configsRaw.(map[string]interface{}) + if !ok { + return nil, false, fmt.Errorf("configs in %s must be a JSON object", def.runtimeKey) + } + flattened, err := stringifyRuntimeConfigMap(configsMap) + if err != nil { + return nil, false, err + } + configState[runtimeOptionConfigsKey] = flattened + } + + return configState, true, nil +} + +func stringifyRuntimeConfigMap(input map[string]interface{}) (map[string]interface{}, error) { + config := make(map[string]interface{}, len(input)) + for key, value := range input { + stringValue, err := stringifyRuntimeConfigValue(value) + if err != nil { + return nil, err + } + config[key] = stringValue + } + + return config, nil +} + +func stringifyRuntimeConfigValue(value interface{}) (string, error) { + switch v := value.(type) { + case string: + return v, nil + case bool: + return strconv.FormatBool(v), nil + case float64: + if math.Trunc(v) == v { + return strconv.FormatInt(int64(v), 10), nil + } + return strconv.FormatFloat(v, 'f', -1, 64), nil + case nil: + return "", nil + default: + b, err := json.Marshal(v) + if err != nil { + return "", err + } + return string(b), nil + } +} + +func flattenRuntimeConfigForState(config map[string]interface{}, def runtimeConfigDefinition) ([]interface{}, error) { + if len(config) == 0 { + return nil, nil + } + + state := make(map[string]interface{}) + if typeVal, ok := config[def.runtimeTypeKey]; ok { + if typeStr, ok := typeVal.(string); ok && typeStr != "" { + state[def.typeSchemaKey] = typeStr + } + } + + if configsVal, ok := config[runtimeOptionConfigsKey]; ok { + configsMap, ok := configsVal.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("configs in %s must be a map of strings", def.runtimeKey) + } + state[resourceFunctionRuntimeConfigConfigsKey] = configsMap + } + + if len(state) == 0 { + return nil, nil + } + + return []interface{}{state}, nil +} diff --git a/pulsar/resource_pulsar_function_unit_test.go b/pulsar/resource_pulsar_function_unit_test.go new file mode 100644 index 0000000..a56a1f1 --- /dev/null +++ b/pulsar/resource_pulsar_function_unit_test.go @@ -0,0 +1,66 @@ +package pulsar + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMergeFunctionCustomRuntimeOptions(t *testing.T) { + base := `{"foo":"bar","sinkConfig":{"old":"value"},"sourceConfig":{"keep":"me"}}` + sinkConfig := map[string]interface{}{ + runtimeOptionSinkConfigTypeField: "kafka", + runtimeOptionConfigsKey: map[string]interface{}{"new": "value"}, + } + sourceConfig := map[string]interface{}{} + + merged, err := mergeFunctionCustomRuntimeOptions(base, + runtimeConfigUpdate{key: runtimeOptionSinkConfigKey, config: sinkConfig}, + runtimeConfigUpdate{key: runtimeOptionSourceConfigKey, config: sourceConfig}, + ) + require.NoError(t, err) + + var result map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(merged), &result)) + + assert.Equal(t, "bar", result["foo"]) + assert.Equal(t, map[string]interface{}{ + runtimeOptionSinkConfigTypeField: "kafka", + runtimeOptionConfigsKey: map[string]interface{}{"new": "value"}, + }, result["sinkConfig"]) + _, hasSource := result["sourceConfig"] + assert.False(t, hasSource) +} + +func TestSplitFunctionCustomRuntimeOptions(t *testing.T) { + ///nolint:lll + raw := `{"foo":"bar","sinkConfig":{"sinkType":"kafka","configs":{"alpha":"1","beta":true}},"sourceConfig":{"sourceType":"kinesis","configs":{"gamma":2}}}` + + sanitized, sinkConfig, sinkPresent, sourceConfig, sourcePresent, err := splitFunctionCustomRuntimeOptions(raw) + require.NoError(t, err) + assert.True(t, sinkPresent) + assert.Equal(t, map[string]interface{}{ + runtimeOptionSinkConfigTypeField: "kafka", + runtimeOptionConfigsKey: map[string]interface{}{"alpha": "1", "beta": "true"}, + }, sinkConfig) + assert.True(t, sourcePresent) + assert.Equal(t, map[string]interface{}{ + runtimeOptionSourceConfigTypeField: "kinesis", + runtimeOptionConfigsKey: map[string]interface{}{"gamma": "2"}, + }, sourceConfig) + assert.JSONEq(t, `{"foo":"bar"}`, sanitized) +} + +func TestSplitFunctionCustomRuntimeOptionsWithoutSinkConfig(t *testing.T) { + raw := `{"foo":"bar"}` + + sanitized, sinkConfig, sinkPresent, sourceConfig, sourcePresent, err := splitFunctionCustomRuntimeOptions(raw) + require.NoError(t, err) + assert.False(t, sinkPresent) + assert.Nil(t, sinkConfig) + assert.False(t, sourcePresent) + assert.Nil(t, sourceConfig) + assert.JSONEq(t, `{"foo":"bar"}`, sanitized) +} diff --git a/pulsar/resource_pulsar_package.go b/pulsar/resource_pulsar_package.go new file mode 100644 index 0000000..94f49c9 --- /dev/null +++ b/pulsar/resource_pulsar_package.go @@ -0,0 +1,504 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/hashicorp/terraform-plugin-log/tflog" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +const ( + resourcePackageTypeKey = "type" + resourcePackageTenantKey = "tenant" + resourcePackageNamespaceKey = "namespace" + resourcePackageNameKey = "name" + resourcePackageVersionKey = "version" + resourcePackagePathKey = "path" + resourcePackageDescriptionKey = "description" + resourcePackageContactKey = "contact" + resourcePackagePropertiesKey = "properties" + + resourcePackagePackageURLKey = "package_url" + resourcePackageCreateTimeKey = "create_time" + resourcePackageModificationTime = "modification_time" + resourcePackageFileChecksumKey = "file_checksum" + resourcePackageFileSizeKey = "file_size" + resourcePackageFileNameKey = "file_name" + resourcePackageSourceChecksumKey = "source_hash" +) + +const ( + packageMetadataPropertyTenant = "tenant" + packageMetadataPropertyNamespace = "namespace" + packageMetadataPropertyName = "functionName" + packageMetadataPropertyFileName = "fileName" + packageMetadataPropertyFileSize = "fileSize" + packageMetadataPropertyChecksum = "checksum" + packageMetadataPropertyManagedBy = "managedByMeshWorkerService" + packageMetadataPropertyLibDir = "libDir" + packageMetadataManagedByTerraform = "terraform-provider-pulsar" +) + +var reservedPackageMetadataKeys = map[string]struct{}{ + packageMetadataPropertyTenant: {}, + packageMetadataPropertyNamespace: {}, + packageMetadataPropertyName: {}, + packageMetadataPropertyFileName: {}, + packageMetadataPropertyFileSize: {}, + packageMetadataPropertyChecksum: {}, + packageMetadataPropertyManagedBy: {}, + packageMetadataPropertyLibDir: {}, +} + +type packageFileInfo struct { + checksum string + size int64 + name string +} + +func resourcePulsarPackage() *schema.Resource { + return &schema.Resource{ + CreateContext: resourcePulsarPackageCreate, + ReadContext: resourcePulsarPackageRead, + UpdateContext: resourcePulsarPackageUpdate, + DeleteContext: resourcePulsarPackageDelete, + CustomizeDiff: resourcePulsarPackageCustomizeDiff, + Description: "Manages Pulsar packages for functions, sources, and sinks. Packages bundle executable artifacts " + + "that can be referenced by other Pulsar resources. The provider uploads the package archive and keeps the " + + "metadata (description, contact, and custom properties) in sync with Terraform state.", + + Importer: &schema.ResourceImporter{ + StateContext: resourcePulsarPackageImport, + }, + + Schema: map[string]*schema.Schema{ + resourcePackageTypeKey: { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Package type. Supported values: `function`, `sink`, `source`.", + ValidateFunc: validatePackageType, + }, + resourcePackageTenantKey: { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Tenant that owns the package.", + }, + resourcePackageNamespaceKey: { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Namespace that owns the package.", + }, + resourcePackageNameKey: { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Package name.", + }, + resourcePackageVersionKey: { + Type: schema.TypeString, + Required: true, + ForceNew: true, + Description: "Package version. Changing the version creates a new package resource.", + }, + resourcePackagePathKey: { + Type: schema.TypeString, + Required: true, + Description: "Local path to the package archive. Changing the file contents will trigger an update.", + }, + resourcePackageDescriptionKey: { + Type: schema.TypeString, + Optional: true, + Description: "Optional package description stored in Pulsar metadata.", + }, + resourcePackageContactKey: { + Type: schema.TypeString, + Optional: true, + Description: "Contact information stored in Pulsar metadata.", + }, + resourcePackagePropertiesKey: { + Type: schema.TypeMap, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Description: "Additional metadata properties. ", + }, + resourcePackagePackageURLKey: { + Type: schema.TypeString, + Computed: true, + Description: "Fully-qualified package URL in the format `:////@`.", + }, + resourcePackageCreateTimeKey: { + Type: schema.TypeInt, + Computed: true, + Description: "Server-side package creation timestamp (epoch millis).", + }, + resourcePackageModificationTime: { + Type: schema.TypeInt, + Computed: true, + Description: "Server-side package modification timestamp (epoch millis).", + }, + resourcePackageFileChecksumKey: { + Type: schema.TypeString, + Computed: true, + Description: "Checksum reported by Pulsar for the uploaded package.", + }, + resourcePackageFileSizeKey: { + Type: schema.TypeInt, + Computed: true, + Description: "Package size reported by Pulsar (in bytes).", + }, + resourcePackageFileNameKey: { + Type: schema.TypeString, + Computed: true, + Description: "Package file name stored in metadata.", + }, + resourcePackageSourceChecksumKey: { + Type: schema.TypeString, + Computed: true, + Description: "SHA256 checksum of the local package file. Used to detect file changes.", + }, + }, + } +} + +func resourcePulsarPackageCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + client := getV3ClientFromMeta(meta).Packages() + packageURL, err := packageURLFromResourceData(d) + if err != nil { + return diag.FromErr(err) + } + + path := d.Get(resourcePackagePathKey).(string) + info, err := calculatePackageFileInfo(path) + if err != nil { + return diag.FromErr(err) + } + + description := d.Get(resourcePackageDescriptionKey).(string) + contact := d.Get(resourcePackageContactKey).(string) + properties := buildPackageMetadataProperties(d, &info) + + if err = client.Upload(packageURL, path, description, contact, properties); err != nil { + return diag.FromErr(err) + } + + d.SetId(packageURL) + _ = d.Set(resourcePackagePackageURLKey, packageURL) + _ = d.Set(resourcePackageSourceChecksumKey, info.checksum) + + return resourcePulsarPackageRead(ctx, d, meta) +} + +func resourcePulsarPackageRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + client := getV3ClientFromMeta(meta).Packages() + + packageURL, err := ensurePackageID(d) + if err != nil { + return diag.FromErr(err) + } + + metadata, err := client.GetMetadata(packageURL) + if err != nil { + if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 { + d.SetId("") + return nil + } + return diag.FromErr(err) + } + + _ = d.Set(resourcePackagePackageURLKey, packageURL) + _ = d.Set(resourcePackageDescriptionKey, metadata.Description) + _ = d.Set(resourcePackageContactKey, metadata.Contact) + _ = d.Set(resourcePackageCreateTimeKey, metadata.CreateTime) + _ = d.Set(resourcePackageModificationTime, metadata.ModificationTime) + + userProps := filterUserManagedProperties(metadata.Properties) + if err = d.Set(resourcePackagePropertiesKey, convertToInterfaceMap(userProps)); err != nil { + return diag.FromErr(err) + } + + if metadata.Properties != nil { + if checksum, ok := metadata.Properties[packageMetadataPropertyChecksum]; ok { + _ = d.Set(resourcePackageFileChecksumKey, checksum) + } + if size, ok := metadata.Properties[packageMetadataPropertyFileSize]; ok { + if parsed, parseErr := strconv.ParseInt(size, 10, 64); parseErr == nil { + _ = d.Set(resourcePackageFileSizeKey, parsed) + } + } + if name, ok := metadata.Properties[packageMetadataPropertyFileName]; ok { + _ = d.Set(resourcePackageFileNameKey, name) + } + } + + setLocalSourceChecksum(ctx, d) + + return nil +} + +func resourcePulsarPackageUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + client := getV3ClientFromMeta(meta).Packages() + packageURL, err := packageURLFromResourceData(d) + if err != nil { + return diag.FromErr(err) + } + + path := d.Get(resourcePackagePathKey).(string) + description := d.Get(resourcePackageDescriptionKey).(string) + contact := d.Get(resourcePackageContactKey).(string) + + reupload := d.HasChange(resourcePackagePathKey) || d.HasChange(resourcePackageSourceChecksumKey) + + if reupload { + info, calcErr := calculatePackageFileInfo(path) + if calcErr != nil { + return diag.FromErr(calcErr) + } + properties := buildPackageMetadataProperties(d, &info) + if err = client.Upload(packageURL, path, description, contact, properties); err != nil { + return diag.FromErr(err) + } + _ = d.Set(resourcePackageSourceChecksumKey, info.checksum) + } else if d.HasChanges(resourcePackageDescriptionKey, resourcePackageContactKey, resourcePackagePropertiesKey) { + properties := buildPackageMetadataProperties(d, nil) + if err = client.UpdateMetadata(packageURL, description, contact, properties); err != nil { + return diag.FromErr(err) + } + } + + return resourcePulsarPackageRead(ctx, d, meta) +} + +func resourcePulsarPackageDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + client := getV3ClientFromMeta(meta).Packages() + + packageURL, err := ensurePackageID(d) + if err != nil { + return diag.FromErr(err) + } + + if err = client.Delete(packageURL); err != nil { + if cliErr, ok := err.(rest.Error); ok && cliErr.Code == 404 { + d.SetId("") + return nil + } + return diag.FromErr(err) + } + + return nil +} + +func resourcePulsarPackageCustomizeDiff(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error { + path := diff.Get(resourcePackagePathKey).(string) + if path == "" { + return fmt.Errorf("path must be provided") + } + + info, err := calculatePackageFileInfo(path) + if err != nil { + return err + } + + if err = diff.SetNew(resourcePackageSourceChecksumKey, info.checksum); err != nil { + return err + } + + return nil +} + +func resourcePulsarPackageImport(ctx context.Context, d *schema.ResourceData, + meta interface{}) ([]*schema.ResourceData, error) { + packageName, err := utils.GetPackageName(d.Id()) + if err != nil { + return nil, err + } + + _ = d.Set(resourcePackageTypeKey, packageName.GetType().String()) + _ = d.Set(resourcePackageTenantKey, packageName.GetTenant()) + _ = d.Set(resourcePackageNamespaceKey, packageName.GetNamespace()) + _ = d.Set(resourcePackageNameKey, packageName.GetName()) + _ = d.Set(resourcePackageVersionKey, packageName.GetVersion()) + + diags := resourcePulsarPackageRead(ctx, d, meta) + if diags.HasError() { + return nil, fmt.Errorf("failed to import %s: %s", d.Id(), diags[0].Summary) + } + + return []*schema.ResourceData{d}, nil +} + +func validatePackageType(val interface{}, key string) (warns []string, errs []error) { + value, ok := val.(string) + if !ok { + errs = append(errs, fmt.Errorf("%s must be a string", key)) + return + } + + switch value { + case utils.PackageTypeFunction.String(), utils.PackageTypeSink.String(), utils.PackageTypeSource.String(): + return + default: + errs = append(errs, fmt.Errorf("%s must be one of %q, %q, or %q", + key, utils.PackageTypeFunction, utils.PackageTypeSink, utils.PackageTypeSource)) + return + } +} + +func ensurePackageID(d *schema.ResourceData) (string, error) { + if d.Id() != "" { + return d.Id(), nil + } + return packageURLFromResourceData(d) +} + +func packageURLFromResourceData(d *schema.ResourceData) (string, error) { + packageType := utils.PackageType(d.Get(resourcePackageTypeKey).(string)) + tenant := d.Get(resourcePackageTenantKey).(string) + namespace := d.Get(resourcePackageNamespaceKey).(string) + name := d.Get(resourcePackageNameKey).(string) + version := d.Get(resourcePackageVersionKey).(string) + + packageName, err := utils.GetPackageNameWithComponents(packageType, tenant, namespace, name, version) + if err != nil { + return "", err + } + + return packageName.String(), nil +} + +func buildPackageMetadataProperties(d *schema.ResourceData, info *packageFileInfo) map[string]string { + props := make(map[string]string) + if raw, ok := d.GetOk(resourcePackagePropertiesKey); ok && raw != nil { + for k, v := range raw.(map[string]interface{}) { + props[k] = v.(string) + } + } + + props[packageMetadataPropertyTenant] = d.Get(resourcePackageTenantKey).(string) + props[packageMetadataPropertyNamespace] = d.Get(resourcePackageNamespaceKey).(string) + props[packageMetadataPropertyName] = d.Get(resourcePackageNameKey).(string) + props[packageMetadataPropertyManagedBy] = packageMetadataManagedByTerraform + + if info == nil { + info = existingPackageFileInfoFromState(d) + } + + if info != nil { + props[packageMetadataPropertyFileName] = info.name + props[packageMetadataPropertyFileSize] = strconv.FormatInt(info.size, 10) + props[packageMetadataPropertyChecksum] = info.checksum + } + + return props +} + +func existingPackageFileInfoFromState(d *schema.ResourceData) *packageFileInfo { + name := d.Get(resourcePackageFileNameKey).(string) + checksum := d.Get(resourcePackageFileChecksumKey).(string) + + var size int64 + if sizeVal, ok := d.Get(resourcePackageFileSizeKey).(int); ok { + size = int64(sizeVal) + } + + if name == "" && checksum == "" && size == 0 { + return nil + } + + return &packageFileInfo{ + name: name, + checksum: checksum, + size: size, + } +} + +func calculatePackageFileInfo(path string) (packageFileInfo, error) { + file, err := os.Open(filepath.Clean(path)) + if err != nil { + return packageFileInfo{}, err + } + + defer file.Close() + + hasher := sha256.New() + size, err := io.Copy(hasher, file) + if err != nil { + return packageFileInfo{}, err + } + + return packageFileInfo{ + checksum: hex.EncodeToString(hasher.Sum(nil)), + size: size, + name: filepath.Base(path), + }, nil +} + +func filterUserManagedProperties(in map[string]string) map[string]string { + if len(in) == 0 { + return map[string]string{} + } + + out := make(map[string]string) + for key, value := range in { + if _, reserved := reservedPackageMetadataKeys[key]; reserved { + continue + } + out[key] = value + } + + return out +} + +func convertToInterfaceMap(in map[string]string) map[string]interface{} { + out := make(map[string]interface{}, len(in)) + for k, v := range in { + out[k] = v + } + return out +} + +func setLocalSourceChecksum(ctx context.Context, d *schema.ResourceData) { + path := d.Get(resourcePackagePathKey).(string) + if path == "" { + return + } + + info, err := calculatePackageFileInfo(path) + if err != nil { + tflog.Warn(ctx, "unable to read local package file for checksum", map[string]interface{}{ + "path": path, "error": err.Error(), + }) + return + } + + _ = d.Set(resourcePackageSourceChecksumKey, info.checksum) +} diff --git a/pulsar/resource_pulsar_package_test.go b/pulsar/resource_pulsar_package_test.go new file mode 100644 index 0000000..f5c61fc --- /dev/null +++ b/pulsar/resource_pulsar_package_test.go @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "os" + "path/filepath" + "testing" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +func TestFilterUserManagedProperties(t *testing.T) { + props := map[string]string{ + packageMetadataPropertyTenant: "tenant", + packageMetadataPropertyFileName: "pkg.nar", + "custom": "value", + } + + filtered := filterUserManagedProperties(props) + + if len(filtered) != 1 { + t.Fatalf("expected 1 property, got %d", len(filtered)) + } + + if filtered["custom"] != "value" { + t.Fatalf("expected custom property to be kept") + } +} + +func TestBuildPackageMetadataProperties(t *testing.T) { + resource := resourcePulsarPackage() + data := schema.TestResourceDataRaw(t, resource.Schema, map[string]interface{}{ + resourcePackageTypeKey: "function", + resourcePackageTenantKey: "tenant-a", + resourcePackageNamespaceKey: "ns", + resourcePackageNameKey: "pkg", + resourcePackageVersionKey: "v1", + resourcePackagePathKey: "/tmp/pkg.nar", + resourcePackagePropertiesKey: map[string]interface{}{"custom": "value"}, + }) + + info := &packageFileInfo{ + checksum: "abc", + size: 42, + name: "pkg.nar", + } + + props := buildPackageMetadataProperties(data, info) + + expected := map[string]string{ + packageMetadataPropertyTenant: "tenant-a", + packageMetadataPropertyNamespace: "ns", + packageMetadataPropertyName: "pkg", + packageMetadataPropertyFileName: "pkg.nar", + packageMetadataPropertyFileSize: "42", + packageMetadataPropertyChecksum: "abc", + packageMetadataPropertyManagedBy: packageMetadataManagedByTerraform, + "custom": "value", + } + + for k, v := range expected { + if props[k] != v { + t.Fatalf("expected %s=%s, got %s", k, v, props[k]) + } + } +} + +func TestBuildPackageMetadataPropertiesUsesExistingValues(t *testing.T) { + resource := resourcePulsarPackage() + data := schema.TestResourceDataRaw(t, resource.Schema, map[string]interface{}{ + resourcePackageTypeKey: "function", + resourcePackageTenantKey: "tenant-a", + resourcePackageNamespaceKey: "ns", + resourcePackageNameKey: "pkg", + resourcePackageVersionKey: "v1", + resourcePackagePathKey: "/tmp/pkg.nar", + }) + + _ = data.Set(resourcePackageFileNameKey, "existing.nar") + _ = data.Set(resourcePackageFileSizeKey, 100) + _ = data.Set(resourcePackageFileChecksumKey, "old") + + props := buildPackageMetadataProperties(data, nil) + + if props[packageMetadataPropertyFileName] != "existing.nar" { + t.Fatalf("expected existing file name to be reused, got %s", props[packageMetadataPropertyFileName]) + } + + if props[packageMetadataPropertyFileSize] != "100" { + t.Fatalf("expected existing file size to be reused, got %s", props[packageMetadataPropertyFileSize]) + } + + if props[packageMetadataPropertyChecksum] != "old" { + t.Fatalf("expected existing checksum to be reused, got %s", props[packageMetadataPropertyChecksum]) + } +} + +func TestExistingPackageFileInfoFromState(t *testing.T) { + resource := resourcePulsarPackage() + data := schema.TestResourceDataRaw(t, resource.Schema, map[string]interface{}{ + resourcePackageTypeKey: "function", + resourcePackageTenantKey: "tenant-a", + resourcePackageNamespaceKey: "ns", + resourcePackageNameKey: "pkg", + resourcePackageVersionKey: "v1", + resourcePackagePathKey: "/tmp/pkg.nar", + }) + + _ = data.Set(resourcePackageFileNameKey, "pkg.nar") + _ = data.Set(resourcePackageFileSizeKey, 0) + _ = data.Set(resourcePackageFileChecksumKey, "abc") + + info := existingPackageFileInfoFromState(data) + if info == nil { + t.Fatalf("expected file info") + } + + if info.name != "pkg.nar" || info.checksum != "abc" || info.size != 0 { + t.Fatalf("unexpected info %#v", info) + } +} + +func TestCalculatePackageFileInfo(t *testing.T) { + dir := t.TempDir() + filePath := filepath.Join(dir, "pkg.nar") + content := []byte("hello pulsar package") + + if err := os.WriteFile(filePath, content, 0o600); err != nil { + t.Fatalf("failed to write temp file: %v", err) + } + + info, err := calculatePackageFileInfo(filePath) + if err != nil { + t.Fatalf("failed to calculate info: %v", err) + } + + if info.size != int64(len(content)) { + t.Fatalf("expected size %d, got %d", len(content), info.size) + } + + if info.name != "pkg.nar" { + t.Fatalf("expected name pkg.nar, got %s", info.name) + } + + expectedChecksum := "032cc2f30d10d8598ae15a53bfbf4e93724d5c70a4b6dd8167bcda59fd955d8f" + if info.checksum != expectedChecksum { + t.Fatalf("expected checksum %s, got %s", expectedChecksum, info.checksum) + } +}