Skip to content

Commit 7f6aa98

Browse files
authored
Make serverless lakehouse storage enabled for computed (#146)
1 parent 96ed9a6 commit 7f6aa98

File tree

2 files changed

+104
-24
lines changed

2 files changed

+104
-24
lines changed

cloud/data_source_pulsar_cluster.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -430,10 +430,16 @@ func dataSourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, me
430430
_ = d.Set("instance_name", pulsarInstance.Name)
431431

432432
// Set lakehouse_storage_enabled
433-
if pulsarCluster.Spec.Config != nil && pulsarCluster.Spec.Config.LakehouseStorage != nil && pulsarCluster.Spec.Config.LakehouseStorage.Enabled != nil {
434-
_ = d.Set("lakehouse_storage_enabled", *pulsarCluster.Spec.Config.LakehouseStorage.Enabled)
433+
if pulsarInstance.Spec.Type == cloudv1alpha1.PulsarInstanceTypeServerless {
434+
// For serverless clusters, always set to true (computed)
435+
_ = d.Set("lakehouse_storage_enabled", true)
435436
} else {
436-
_ = d.Set("lakehouse_storage_enabled", false)
437+
// For non-serverless clusters, use the actual value
438+
if pulsarCluster.Spec.Config != nil && pulsarCluster.Spec.Config.LakehouseStorage != nil && pulsarCluster.Spec.Config.LakehouseStorage.Enabled != nil {
439+
_ = d.Set("lakehouse_storage_enabled", *pulsarCluster.Spec.Config.LakehouseStorage.Enabled)
440+
} else {
441+
_ = d.Set("lakehouse_storage_enabled", false)
442+
}
437443
}
438444

439445
// Set catalog information

cloud/resource_pulsar_cluster.go

Lines changed: 95 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ func resourcePulsarCluster() *schema.Resource {
4545
oldOrg, _ := diff.GetChange("organization")
4646
oldName, newName := diff.GetChange("name")
4747
if oldOrg.(string) == "" && oldName.(string) == "" {
48-
// This is create event, so we don't need to check the diff.
48+
// For serverless clusters, make lakehouse_storage_enabled computed
49+
makeLakehouseStorageComputedForServerless(ctx, diff, i)
4950
return nil
5051
}
5152
if oldName != "" && newName == "" {
@@ -56,6 +57,8 @@ func resourcePulsarCluster() *schema.Resource {
5657
return fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
5758
"The pulsar cluster organization, name, instance_name, location, pool_member_name does not support updates, please recreate it")
5859
}
60+
// For serverless clusters, make lakehouse_storage_enabled computed
61+
makeLakehouseStorageComputedForServerless(ctx, diff, i)
5962
return nil
6063
},
6164
Importer: &schema.ResourceImporter{
@@ -364,7 +367,7 @@ func resourcePulsarCluster() *schema.Resource {
364367
"lakehouse_storage_enabled": {
365368
Type: schema.TypeBool,
366369
Optional: true,
367-
Default: false,
370+
Computed: true,
368371
Description: descriptions["lakehouse_storage"],
369372
},
370373
"apply_lakehouse_to_all_topics": {
@@ -571,18 +574,29 @@ func resourcePulsarClusterCreate(ctx context.Context, d *schema.ResourceData, me
571574
getPulsarClusterChanged(ctx, pulsarCluster, d)
572575
}
573576

574-
if d.Get("lakehouse_storage_enabled").(bool) {
575-
if ursaEnabled {
576-
return diag.FromErr(fmt.Errorf("ERROR_CREATE_PULSAR_CLUSTER: " +
577-
"you don't set this option for ursa engine cluster"))
578-
}
577+
// Handle lakehouse_storage_enabled
578+
if pulsarInstance.IsServerless() {
579+
// For serverless clusters, automatically enable lakehouse storage
579580
if pulsarCluster.Spec.Config == nil {
580581
pulsarCluster.Spec.Config = &cloudv1alpha1.Config{}
581582
}
582583
pulsarCluster.Spec.Config.LakehouseStorage = &cloudv1alpha1.LakehouseStorageConfig{
583584
Enabled: pointer.Bool(true),
584585
}
585-
586+
} else {
587+
// For non-serverless clusters, check user input
588+
if d.Get("lakehouse_storage_enabled").(bool) {
589+
if ursaEnabled {
590+
return diag.FromErr(fmt.Errorf("ERROR_CREATE_PULSAR_CLUSTER: " +
591+
"you don't set this option for ursa engine cluster"))
592+
}
593+
if pulsarCluster.Spec.Config == nil {
594+
pulsarCluster.Spec.Config = &cloudv1alpha1.Config{}
595+
}
596+
pulsarCluster.Spec.Config.LakehouseStorage = &cloudv1alpha1.LakehouseStorageConfig{
597+
Enabled: pointer.Bool(true),
598+
}
599+
}
586600
}
587601

588602
// Handle catalog configuration
@@ -838,10 +852,16 @@ func resourcePulsarClusterRead(ctx context.Context, d *schema.ResourceData, meta
838852
_ = d.Set("storage_unit_per_bookie", storageUnit)
839853

840854
// Set lakehouse_storage_enabled
841-
if pulsarCluster.Spec.Config != nil && pulsarCluster.Spec.Config.LakehouseStorage != nil && pulsarCluster.Spec.Config.LakehouseStorage.Enabled != nil {
842-
_ = d.Set("lakehouse_storage_enabled", *pulsarCluster.Spec.Config.LakehouseStorage.Enabled)
855+
if pulsarInstance.Spec.Type == cloudv1alpha1.PulsarInstanceTypeServerless {
856+
// For serverless clusters, always set to true (computed)
857+
_ = d.Set("lakehouse_storage_enabled", true)
843858
} else {
844-
_ = d.Set("lakehouse_storage_enabled", false)
859+
// For non-serverless clusters, use the actual value
860+
if pulsarCluster.Spec.Config != nil && pulsarCluster.Spec.Config.LakehouseStorage != nil && pulsarCluster.Spec.Config.LakehouseStorage.Enabled != nil {
861+
_ = d.Set("lakehouse_storage_enabled", *pulsarCluster.Spec.Config.LakehouseStorage.Enabled)
862+
} else {
863+
_ = d.Set("lakehouse_storage_enabled", false)
864+
}
845865
}
846866

847867
// Set catalog information
@@ -901,11 +921,15 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me
901921
serverless := d.Get("type")
902922
displayNameChanged := d.HasChange("display_name")
903923
lakehouseStorageChanged := d.HasChange("lakehouse_storage_enabled")
904-
lakehouseStorageEnabled := d.Get("lakehouse_storage_enabled").(bool)
905-
if !displayNameChanged && serverless == string(cloudv1alpha1.PulsarInstanceTypeServerless) &&
906-
lakehouseStorageChanged && !lakehouseStorageEnabled {
907-
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
908-
"Disabling lakehouse_storage_enabled or changed display_name is not allowed for serverless pulsar cluster"))
924+
925+
// For serverless clusters, lakehouse_storage_enabled is computed and cannot be changed
926+
if serverless == string(cloudv1alpha1.PulsarInstanceTypeServerless) {
927+
if lakehouseStorageChanged {
928+
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
929+
"lakehouse_storage_enabled cannot be set for serverless pulsar cluster, it is automatically computed"))
930+
}
931+
// Always set to true for serverless clusters
932+
_ = d.Set("lakehouse_storage_enabled", true)
909933
}
910934
if d.HasChange("organization") {
911935
return diag.FromErr(fmt.Errorf("ERROR_UPDATE_PULSAR_CLUSTER: " +
@@ -949,8 +973,19 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me
949973
}
950974

951975
// Validate lakehouse_storage_enabled update: once enabled, cannot be disabled
952-
if diagErr := validateLakehouseStorageUpdate(d, pulsarCluster); diagErr != nil {
953-
return diagErr
976+
// For serverless clusters, skip validation as it's computed
977+
if serverless != string(cloudv1alpha1.PulsarInstanceTypeServerless) {
978+
if diagErr := validateLakehouseStorageUpdate(d, pulsarCluster); diagErr != nil {
979+
return diagErr
980+
}
981+
} else {
982+
// For serverless clusters, ensure lakehouse storage is enabled
983+
if pulsarCluster.Spec.Config == nil {
984+
pulsarCluster.Spec.Config = &cloudv1alpha1.Config{}
985+
}
986+
pulsarCluster.Spec.Config.LakehouseStorage = &cloudv1alpha1.LakehouseStorageConfig{
987+
Enabled: pointer.Bool(true),
988+
}
954989
}
955990
if d.HasChange("bookie_replicas") {
956991
bookieReplicas := int32(d.Get("bookie_replicas").(int))
@@ -999,10 +1034,10 @@ func resourcePulsarClusterUpdate(ctx context.Context, d *schema.ResourceData, me
9991034

10001035
// Handle table format determination when catalog or lakehouse storage changes
10011036
if (pulsarCluster.Spec.TableFormat == "" || pulsarCluster.Spec.TableFormat == "none") &&
1002-
d.HasChange("catalog") || d.HasChange("lakehouse_storage_enabled") || pulsarCluster.IsUsingUrsaEngine() {
1037+
(d.HasChange("catalog") || d.HasChange("lakehouse_storage_enabled") || pulsarCluster.IsUsingUrsaEngine()) {
10031038
catalogName := d.Get("catalog").(string)
1004-
lakehouseStorageEnabled = d.Get("lakehouse_storage_enabled").(bool)
1005-
1039+
// For serverless clusters, lakehouse storage is always enabled
1040+
// Determine table format based on catalog (lakehouse storage is always enabled for serverless)
10061041
tableFormat, err := determineTableFormat(ctx, clientSet, namespace, catalogName)
10071042
if err != nil {
10081043
return diag.FromErr(fmt.Errorf("ERROR_DETERMINE_TABLE_FORMAT: %w", err))
@@ -1143,6 +1178,7 @@ func getPulsarClusterChanged(ctx context.Context, pulsarCluster *cloudv1alpha1.P
11431178
}
11441179

11451180
// Handle lakehouse_storage_enabled at the top level
1181+
// Note: For serverless clusters, this should not be changed by user, but we handle it here for completeness
11461182
if d.HasChange("lakehouse_storage_enabled") {
11471183
enabledBool := d.Get("lakehouse_storage_enabled").(bool)
11481184
if enabledBool {
@@ -1358,6 +1394,44 @@ func convertCpuAndMemoryToStorageUnit(pc *cloudv1alpha1.PulsarCluster) float64 {
13581394
return 0.5 // default value
13591395
}
13601396

1397+
// makeLakehouseStorageComputedForServerless makes lakehouse_storage_enabled computed for serverless clusters
1398+
func makeLakehouseStorageComputedForServerless(ctx context.Context, diff *schema.ResourceDiff, meta interface{}) {
1399+
// Get instance information to check type
1400+
instanceName := diff.Get("instance_name").(string)
1401+
namespace := diff.Get("organization").(string)
1402+
if instanceName == "" || namespace == "" {
1403+
return
1404+
}
1405+
1406+
clientSet, err := getClientSet(getFactoryFromMeta(meta))
1407+
if err != nil {
1408+
// If we can't get client, skip
1409+
return
1410+
}
1411+
1412+
pulsarInstance, err := clientSet.CloudV1alpha1().
1413+
PulsarInstances(namespace).
1414+
Get(ctx, instanceName, metav1.GetOptions{})
1415+
if err != nil {
1416+
// If we can't get instance, skip
1417+
return
1418+
}
1419+
1420+
// Check if instance is serverless
1421+
if pulsarInstance.Spec.Type == cloudv1alpha1.PulsarInstanceTypeServerless {
1422+
// For serverless clusters, always set lakehouse_storage_enabled to computed
1423+
// and set its value to true
1424+
if diff.HasChange("lakehouse_storage_enabled") {
1425+
// If user tries to set it, clear the change and set as computed with value true
1426+
diff.Clear("lakehouse_storage_enabled")
1427+
}
1428+
// Always set as computed with value true for serverless
1429+
diff.SetNewComputed("lakehouse_storage_enabled")
1430+
// Set the value to true for serverless clusters
1431+
diff.SetNew("lakehouse_storage_enabled", true)
1432+
}
1433+
}
1434+
13611435
// determineTableFormat determines the table format based on catalog type and configuration
13621436
func determineTableFormat(ctx context.Context, cloudClientSet *cloudclient.Clientset, namespace, catalogName string) (string, error) {
13631437

0 commit comments

Comments
 (0)