Skip to content

Commit 16e0e29

Browse files
authored
Add vertical scaling options to scaling api for clickpipes terraform (#322)
1 parent 654756b commit 16e0e29

File tree

6 files changed

+140
-84
lines changed

6 files changed

+140
-84
lines changed

examples/clickpipe/multiple_pipes_example/main.tf

Lines changed: 59 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ variable "number_of_pipes" {
2222
resource "clickhouse_clickpipe" "multiple" {
2323
for_each = toset([for i in range(1, var.number_of_pipes + 1) : tostring(i)])
2424

25-
name = "📈 multiple pipe ${each.key}"
25+
name = "📈 multiple pipe ${each.key}"
2626

2727
service_id = var.service_id
2828

2929
scaling = {
30-
replicas = tonumber(each.key)
30+
replicas = tonumber(each.key)
31+
replica_cpu_millicores = 250
32+
replica_memory_gb = 1.0
3133
}
3234

3335
state = "Running"
@@ -47,7 +49,7 @@ resource "clickhouse_clickpipe" "multiple" {
4749
}
4850

4951
destination = {
50-
table = "multiple_pipes_example_${each.key}"
52+
table = "multiple_pipes_example_${each.key}"
5153
managed_table = true
5254

5355
table_definition = {
@@ -58,116 +60,116 @@ resource "clickhouse_clickpipe" "multiple" {
5860

5961
columns = [
6062
{
61-
name: "area",
62-
type: "Int64"
63+
name : "area",
64+
type : "Int64"
6365
},
6466
{
65-
name: "averageSignal",
66-
type: "Int64"
67+
name : "averageSignal",
68+
type : "Int64"
6769
},
6870
{
69-
name: "cell",
70-
type: "Int64"
71+
name : "cell",
72+
type : "Int64"
7173
},
7274
{
73-
name: "changeable",
74-
type: "Int64"
75+
name : "changeable",
76+
type : "Int64"
7577
},
7678
{
77-
name: "created",
78-
type: "DateTime64(9)"
79+
name : "created",
80+
type : "DateTime64(9)"
7981
},
8082
{
81-
name: "lat",
82-
type: "Float64"
83+
name : "lat",
84+
type : "Float64"
8385
},
8486
{
85-
name: "lon",
86-
type: "Float64"
87+
name : "lon",
88+
type : "Float64"
8789
},
8890
{
89-
name: "mcc",
90-
type: "Int64"
91+
name : "mcc",
92+
type : "Int64"
9193
},
9294
{
93-
name: "net",
94-
type: "Int64"
95+
name : "net",
96+
type : "Int64"
9597
},
9698
{
97-
name: "radio",
98-
type: "String"
99+
name : "radio",
100+
type : "String"
99101
},
100102
{
101-
name: "range",
102-
type: "Int64"
103+
name : "range",
104+
type : "Int64"
103105
},
104106
{
105-
name: "samples-renamed",
106-
type: "Int64"
107+
name : "samples-renamed",
108+
type : "Int64"
107109
},
108110
{
109-
name: "unit",
110-
type: "Int64"
111+
name : "unit",
112+
type : "Int64"
111113
},
112114
{
113-
name: "updated",
114-
type: "DateTime64(9)"
115+
name : "updated",
116+
type : "DateTime64(9)"
115117
}
116118
]
117119
}
118120

119121
field_mappings = [
120122
{
121-
source_field: "averageSignal",
122-
destination_field: "averageSignal"
123+
source_field : "averageSignal",
124+
destination_field : "averageSignal"
123125
},
124126
{
125-
source_field: "cell",
126-
destination_field: "cell"
127+
source_field : "cell",
128+
destination_field : "cell"
127129
},
128130
{
129-
source_field: "changeable",
130-
destination_field: "changeable"
131+
source_field : "changeable",
132+
destination_field : "changeable"
131133
},
132134
{
133-
source_field: "created",
134-
destination_field: "created"
135+
source_field : "created",
136+
destination_field : "created"
135137
},
136138
{
137-
source_field: "lat",
138-
destination_field: "lat"
139+
source_field : "lat",
140+
destination_field : "lat"
139141
},
140142
{
141-
source_field: "lon",
142-
destination_field: "lon"
143+
source_field : "lon",
144+
destination_field : "lon"
143145
},
144146
{
145-
source_field: "mcc",
146-
destination_field: "mcc"
147+
source_field : "mcc",
148+
destination_field : "mcc"
147149
},
148150
{
149-
source_field: "net",
150-
destination_field: "net"
151+
source_field : "net",
152+
destination_field : "net"
151153
},
152154
{
153-
source_field: "radio",
154-
destination_field: "radio"
155+
source_field : "radio",
156+
destination_field : "radio"
155157
},
156158
{
157-
source_field: "range",
158-
destination_field: "range"
159+
source_field : "range",
160+
destination_field : "range"
159161
},
160162
{
161-
source_field: "samples",
162-
destination_field: "samples-renamed"
163+
source_field : "samples",
164+
destination_field : "samples-renamed"
163165
},
164166
{
165-
source_field: "unit",
166-
destination_field: "unit"
167+
source_field : "unit",
168+
destination_field : "unit"
167169
},
168170
{
169-
source_field: "updated",
170-
destination_field: "updated"
171+
source_field : "updated",
172+
destination_field : "updated"
171173
}
172174
]
173175
}

examples/resources/clickhouse_clickpipe/resource.tf

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
resource "clickhouse_clickpipe" "kafka_clickpipe" {
2-
name = "My Kafka ClickPipe"
3-
description = "Data pipeline from Kafka to ClickHouse"
2+
name = "My Kafka ClickPipe"
3+
description = "Data pipeline from Kafka to ClickHouse"
44

5-
service_id = "e9465b4b-f7e5-4937-8e21-8d508b02843d"
5+
service_id = "e9465b4b-f7e5-4937-8e21-8d508b02843d"
66

77
scaling {
8-
replicas = 1
8+
replicas = 2
9+
replica_cpu_millicores = 250
10+
replica_memory_gb = 1.0
911
}
1012

1113
state = "Running"
1214

1315
source {
1416
kafka {
15-
type = "confluent"
16-
format = "JSONEachRow"
17+
type = "confluent"
18+
format = "JSONEachRow"
1719
brokers = "my-kafka-broker:9092"
18-
topics = "my_topic"
20+
topics = "my_topic"
1921

2022
consumer_group = "clickpipe-test"
2123

@@ -27,9 +29,9 @@ resource "clickhouse_clickpipe" "kafka_clickpipe" {
2729
}
2830

2931
destination {
30-
table = "my_table"
32+
table = "my_table"
3133
managed_table = true
32-
34+
3335
tableDefinition {
3436
engine {
3537
type = "MergeTree"
@@ -49,7 +51,7 @@ resource "clickhouse_clickpipe" "kafka_clickpipe" {
4951

5052
field_mappings = [
5153
{
52-
source_field = "my_field"
54+
source_field = "my_field"
5355
destination_field = "my_field1"
5456
}
5557
]

pkg/internal/api/clickpipe.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,10 @@ var ClickPipeKafkaOffsetStrategies = []string{
152152
}
153153

154154
type ClickPipeScalingRequest struct {
155-
Replicas *int64 `json:"replicas,omitempty"`
156-
Concurrency *int64 `json:"concurrency,omitempty"`
155+
Replicas *int64 `json:"replicas,omitempty"`
156+
ReplicaCpuMillicores *int64 `json:"replicaCpuMillicores,omitempty"`
157+
ReplicaMemoryGb *float64 `json:"replicaMemoryGb,omitempty"`
158+
Concurrency *int64 `json:"concurrency,omitempty"`
157159
}
158160

159161
type ClickPipeStateRequest struct {

pkg/internal/api/clickpipe_models.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package api
33
import "time"
44

55
type ClickPipeScaling struct {
6-
Replicas *int64 `json:"replicas,omitempty"`
6+
Replicas *int64 `json:"replicas,omitempty"`
7+
ReplicaCpuMillicores *int64 `json:"replicaCpuMillicores,omitempty"`
8+
ReplicaMemoryGb *float64 `json:"replicaMemoryGb,omitempty"`
79
}
810

911
type ClickPipeSourceCredentials struct {

pkg/resource/clickpipe.go

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/ClickHouse/terraform-provider-clickhouse/pkg/internal/api"
1111
"github.com/ClickHouse/terraform-provider-clickhouse/pkg/resource/models"
12+
"github.com/hashicorp/terraform-plugin-framework-validators/float64validator"
1213
"github.com/hashicorp/terraform-plugin-framework-validators/int64validator"
1314
"github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator"
1415
"github.com/hashicorp/terraform-plugin-framework/attr"
@@ -110,6 +111,20 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest,
110111
int64validator.Between(1, 10),
111112
},
112113
},
114+
"replica_cpu_millicores": schema.Int64Attribute{
115+
Description: "The CPU allocation per replica in millicores. Must be between 125 and 2000.",
116+
Optional: true,
117+
Validators: []validator.Int64{
118+
int64validator.Between(125, 2000),
119+
},
120+
},
121+
"replica_memory_gb": schema.Float64Attribute{
122+
Description: "The memory allocation per replica in GB. Must be between 0.5 and 8.0.",
123+
Optional: true,
124+
Validators: []validator.Float64{
125+
float64validator.Between(0.5, 8.0),
126+
},
127+
},
113128
},
114129
Optional: true,
115130
Computed: true,
@@ -741,17 +756,40 @@ func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateR
741756
}
742757

743758
if !plan.Scaling.IsUnknown() && !plan.Scaling.IsNull() {
744-
replicasModel := models.ClickPipeScalingModel{}
745-
response.Diagnostics.Append(plan.Scaling.As(ctx, &replicasModel, basetypes.ObjectAsOptions{})...)
759+
scalingModel := models.ClickPipeScalingModel{}
760+
response.Diagnostics.Append(plan.Scaling.As(ctx, &scalingModel, basetypes.ObjectAsOptions{})...)
746761

747762
var desiredReplicas *int64
748-
if !replicasModel.Replicas.IsUnknown() && !replicasModel.Replicas.IsNull() && createdClickPipe.Scaling.Replicas != nil && *createdClickPipe.Scaling.Replicas != replicasModel.Replicas.ValueInt64() {
749-
desiredReplicas = replicasModel.Replicas.ValueInt64Pointer()
763+
var desiredCpuMillicores *int64
764+
var desiredMemoryGb *float64
765+
needsScaling := false
766+
767+
if !scalingModel.Replicas.IsUnknown() && !scalingModel.Replicas.IsNull() &&
768+
(createdClickPipe.Scaling == nil || createdClickPipe.Scaling.Replicas == nil ||
769+
*createdClickPipe.Scaling.Replicas != scalingModel.Replicas.ValueInt64()) {
770+
desiredReplicas = scalingModel.Replicas.ValueInt64Pointer()
771+
needsScaling = true
772+
}
773+
774+
if !scalingModel.ReplicaCpuMillicores.IsUnknown() && !scalingModel.ReplicaCpuMillicores.IsNull() &&
775+
(createdClickPipe.Scaling == nil || createdClickPipe.Scaling.ReplicaCpuMillicores == nil ||
776+
*createdClickPipe.Scaling.ReplicaCpuMillicores != scalingModel.ReplicaCpuMillicores.ValueInt64()) {
777+
desiredCpuMillicores = scalingModel.ReplicaCpuMillicores.ValueInt64Pointer()
778+
needsScaling = true
779+
}
780+
781+
if !scalingModel.ReplicaMemoryGb.IsUnknown() && !scalingModel.ReplicaMemoryGb.IsNull() &&
782+
(createdClickPipe.Scaling == nil || createdClickPipe.Scaling.ReplicaMemoryGb == nil ||
783+
*createdClickPipe.Scaling.ReplicaMemoryGb != scalingModel.ReplicaMemoryGb.ValueFloat64()) {
784+
desiredMemoryGb = scalingModel.ReplicaMemoryGb.ValueFloat64Pointer()
785+
needsScaling = true
750786
}
751787

752-
if desiredReplicas != nil {
788+
if needsScaling {
753789
scalingRequest := api.ClickPipeScaling{
754-
Replicas: desiredReplicas,
790+
Replicas: desiredReplicas,
791+
ReplicaCpuMillicores: desiredCpuMillicores,
792+
ReplicaMemoryGb: desiredMemoryGb,
755793
}
756794

757795
if createdClickPipe, err = c.client.ScalingClickPipe(ctx, serviceID, createdClickPipe.ID, scalingRequest); err != nil {
@@ -1014,9 +1052,11 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model
10141052

10151053
state.State = types.StringValue(clickPipe.State)
10161054

1017-
if clickPipe.Scaling != nil && clickPipe.Scaling.Replicas != nil {
1055+
if clickPipe.Scaling != nil {
10181056
scalingModel := models.ClickPipeScalingModel{
1019-
Replicas: types.Int64PointerValue(clickPipe.Scaling.Replicas),
1057+
Replicas: types.Int64PointerValue(clickPipe.Scaling.Replicas),
1058+
ReplicaCpuMillicores: types.Int64PointerValue(clickPipe.Scaling.ReplicaCpuMillicores),
1059+
ReplicaMemoryGb: types.Float64PointerValue(clickPipe.Scaling.ReplicaMemoryGb),
10201060
}
10211061

10221062
state.Scaling = scalingModel.ObjectValue()
@@ -1394,11 +1434,13 @@ func (c *ClickPipeResource) Update(ctx context.Context, req resource.UpdateReque
13941434
}
13951435

13961436
if !plan.Scaling.Equal(state.Scaling) {
1397-
replicasModel := models.ClickPipeScalingModel{}
1398-
response.Diagnostics.Append(plan.Scaling.As(ctx, &replicasModel, basetypes.ObjectAsOptions{})...)
1437+
scalingModel := models.ClickPipeScalingModel{}
1438+
response.Diagnostics.Append(plan.Scaling.As(ctx, &scalingModel, basetypes.ObjectAsOptions{})...)
13991439

14001440
scalingRequest := api.ClickPipeScaling{
1401-
Replicas: replicasModel.Replicas.ValueInt64Pointer(),
1441+
Replicas: scalingModel.Replicas.ValueInt64Pointer(),
1442+
ReplicaCpuMillicores: scalingModel.ReplicaCpuMillicores.ValueInt64Pointer(),
1443+
ReplicaMemoryGb: scalingModel.ReplicaMemoryGb.ValueFloat64Pointer(),
14021444
}
14031445

14041446
if _, err := c.client.ScalingClickPipe(ctx, state.ServiceID.ValueString(), state.ID.ValueString(), scalingRequest); err != nil {

0 commit comments

Comments
 (0)