Skip to content

Commit d7faf78

Browse files
Use resize API to scale running clusters (#1541)
1 parent 828db64 commit d7faf78

File tree

5 files changed

+561
-1
lines changed

5 files changed

+561
-1
lines changed

clusters/acceptance/clusters_api_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,64 @@ func TestAccListClustersIntegration(t *testing.T) {
6464
assert.NoError(t, err, err)
6565
assert.True(t, clusterReadInfo.State == clusters.ClusterStateRunning)
6666
}
67+
68+
func TestAccListClustersResizeIntegrationTest(t *testing.T) {
69+
qa.RequireAnyCloudEnv(t)
70+
t.Parallel()
71+
client := common.CommonEnvironmentClient()
72+
ctx := context.Background()
73+
clustersAPI := clusters.NewClustersAPI(ctx, client)
74+
randomName := qa.RandomName()
75+
76+
cluster := clusters.Cluster{
77+
NumWorkers: 1,
78+
ClusterName: "Terraform Integration Test " + randomName,
79+
SparkVersion: clustersAPI.LatestSparkVersionOrDefault(
80+
clusters.SparkVersionRequest{
81+
Latest: true,
82+
LongTermSupport: true,
83+
}),
84+
InstancePoolID: compute.CommonInstancePoolID(),
85+
IdempotencyToken: "acc-list-" + randomName,
86+
AutoterminationMinutes: 15,
87+
}
88+
89+
clusterReadInfo, err := clustersAPI.Create(cluster)
90+
require.NoError(t, err, err)
91+
assert.True(t, clusterReadInfo.NumWorkers == cluster.NumWorkers)
92+
assert.True(t, clusterReadInfo.ClusterName == cluster.ClusterName)
93+
assert.True(t, reflect.DeepEqual(clusterReadInfo.SparkEnvVars, cluster.SparkEnvVars))
94+
assert.True(t, clusterReadInfo.SparkVersion == cluster.SparkVersion)
95+
assert.True(t, clusterReadInfo.AutoterminationMinutes == cluster.AutoterminationMinutes)
96+
assert.True(t, clusterReadInfo.State == clusters.ClusterStateRunning)
97+
98+
// Resize num workers
99+
clusterReadInfo, err = clustersAPI.Resize(clusters.ResizeRequest{ClusterID: clusterReadInfo.ClusterID, NumWorkers: 2})
100+
require.NoError(t, err, err)
101+
assert.True(t, clusterReadInfo.NumWorkers == 2)
102+
assert.True(t, clusterReadInfo.State == clusters.ClusterStateRunning)
103+
104+
// Resize cluster to become autoscaling
105+
clusterReadInfo, err = clustersAPI.Resize(clusters.ResizeRequest{ClusterID: clusterReadInfo.ClusterID, AutoScale: &clusters.AutoScale{
106+
MinWorkers: 1,
107+
MaxWorkers: 2,
108+
}})
109+
require.NoError(t, err, err)
110+
assert.True(t, clusterReadInfo.AutoScale.MinWorkers == 1)
111+
assert.True(t, clusterReadInfo.AutoScale.MaxWorkers == 2)
112+
assert.True(t, clusterReadInfo.State == clusters.ClusterStateRunning)
113+
114+
// cleanup
115+
err = clustersAPI.Terminate(clusterReadInfo.ClusterID)
116+
assert.NoError(t, err, err)
117+
118+
clusterReadInfo, err = clustersAPI.Get(clusterReadInfo.ClusterID)
119+
assert.NoError(t, err, err)
120+
assert.True(t, clusterReadInfo.State == clusters.ClusterStateTerminated)
121+
122+
err = clustersAPI.Unpin(clusterReadInfo.ClusterID)
123+
assert.NoError(t, err, err)
124+
125+
err = clustersAPI.PermanentDelete(clusterReadInfo.ClusterID)
126+
assert.NoError(t, err, err)
127+
}

clusters/clusters_api.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,12 @@ type ClusterSize struct {
311311
AutoScale *AutoScale `json:"autoscale"`
312312
}
313313

314+
type ResizeRequest struct {
315+
ClusterID string `json:"cluster_id"`
316+
NumWorkers int32 `json:"num_workers"`
317+
AutoScale *AutoScale `json:"autoscale,omitempty"`
318+
}
319+
314320
// ResizeCause holds reason for resizing
315321
type ResizeCause string
316322

@@ -533,6 +539,24 @@ func (a ClustersAPI) Create(cluster Cluster) (info ClusterInfo, err error) {
533539
return
534540
}
535541

542+
// Resize api can only be used when the cluster is in Running State
543+
func (a ClustersAPI) Resize(resizeRequest ResizeRequest) (info ClusterInfo, err error) {
544+
info, err = a.Get(resizeRequest.ClusterID)
545+
if err != nil {
546+
return info, err
547+
}
548+
if info.State != ClusterStateRunning {
549+
return info, fmt.Errorf("resize: Cluster %v is in %v state. RUNNING state required to use resize API", info.ClusterID, info.State)
550+
}
551+
552+
err = a.client.Post(a.context, "/clusters/resize", resizeRequest, &info)
553+
if err != nil {
554+
return info, fmt.Errorf("resize: %w", err)
555+
}
556+
info, err = a.waitForClusterStatus(resizeRequest.ClusterID, ClusterStateRunning)
557+
return info, err
558+
}
559+
536560
// Edit edits the configuration of a cluster to match the provided attributes and size
537561
func (a ClustersAPI) Edit(cluster Cluster) (info ClusterInfo, err error) {
538562
info, err = a.Get(cluster.ClusterID)

clusters/clusters_api_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,92 @@ func TestEditCluster_Pending(t *testing.T) {
313313
assert.Equal(t, ClusterStateRunning, string(clusterInfo.State))
314314
}
315315

316+
func TestResizeCluster_FailsForNonRunningCluster(t *testing.T) {
317+
clusterStates := []ClusterState{ClusterStateUnknown,
318+
ClusterStateError,
319+
ClusterStatePending,
320+
ClusterStateRestarting,
321+
ClusterStateResizing,
322+
ClusterStateTerminating,
323+
ClusterStateTerminated,
324+
}
325+
for _, clusterState := range clusterStates {
326+
t.Run(fmt.Sprintf("CLUSTER STATE %s", clusterState), func(t *testing.T) {
327+
client, server, err := qa.HttpFixtureClient(t, []qa.HTTPFixture{
328+
{
329+
Method: "GET",
330+
Resource: "/api/2.0/clusters/get?cluster_id=abc",
331+
Response: ClusterInfo{
332+
State: clusterState,
333+
ClusterID: "abc",
334+
},
335+
},
336+
})
337+
require.NoError(t, err)
338+
339+
ctx := context.Background()
340+
_, err = NewClustersAPI(ctx, client).Resize(ResizeRequest{
341+
ClusterID: "abc",
342+
NumWorkers: 10,
343+
})
344+
require.Error(t, err)
345+
assert.Contains(t, err.Error(), "resize: Cluster abc is in "+clusterState+" state. RUNNING state required to use resize API")
346+
server.Close()
347+
})
348+
}
349+
}
350+
351+
func TestResizeCluster_NormalRun(t *testing.T) {
352+
client, server, err := qa.HttpFixtureClient(t, []qa.HTTPFixture{
353+
{
354+
Method: "GET",
355+
Resource: "/api/2.0/clusters/get?cluster_id=abc",
356+
Response: ClusterInfo{
357+
State: ClusterStateRunning,
358+
ClusterID: "abc",
359+
NumWorkers: 4,
360+
},
361+
},
362+
{
363+
Method: "POST",
364+
Resource: "/api/2.0/clusters/resize",
365+
ExpectedRequest: ResizeRequest{
366+
ClusterID: "abc",
367+
NumWorkers: 10,
368+
},
369+
},
370+
{
371+
Method: "GET",
372+
Resource: "/api/2.0/clusters/get?cluster_id=abc",
373+
Response: ClusterInfo{
374+
State: ClusterStateResizing,
375+
ClusterID: "abc",
376+
NumWorkers: 10,
377+
},
378+
},
379+
{
380+
Method: "GET",
381+
Resource: "/api/2.0/clusters/get?cluster_id=abc",
382+
Response: ClusterInfo{
383+
State: ClusterStateRunning,
384+
ClusterID: "abc",
385+
NumWorkers: 10,
386+
},
387+
},
388+
})
389+
defer server.Close()
390+
require.NoError(t, err)
391+
392+
ctx := context.Background()
393+
clusterInfo, err := NewClustersAPI(ctx, client).Resize(ResizeRequest{
394+
ClusterID: "abc",
395+
NumWorkers: 10,
396+
})
397+
require.NoError(t, err)
398+
assert.Equal(t, ClusterStateRunning, string(clusterInfo.State))
399+
assert.Equal(t, 10, int(clusterInfo.NumWorkers))
400+
}
401+
316402
func TestEditCluster_Terminating(t *testing.T) {
317403
client, server, err := qa.HttpFixtureClient(t, []qa.HTTPFixture{
318404
{

clusters/resource_cluster.go

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,17 +241,76 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo
241241
common.DataToStructPointer(d, clusterSchema, &cluster)
242242
var clusterInfo ClusterInfo
243243
var err error
244+
244245
if hasClusterConfigChanged(d) {
245246
log.Printf("[DEBUG] Cluster state has changed!")
246247
if err := cluster.Validate(); err != nil {
247248
return err
248249
}
249250
cluster.ModifyRequestOnInstancePool()
250251
fixInstancePoolChangeIfAny(d, &cluster)
251-
clusterInfo, err = clusters.Edit(cluster)
252+
253+
// We can only call the resize api if the cluster is in the running state
254+
// and only the cluster size (ie num_workers OR autoscale) is being changed
255+
hasNumWorkersChanged := d.HasChange("num_workers")
256+
hasAutoscaleChanged := d.HasChange("autoscale")
257+
hasOnlyResizeClusterConfigChanged := true
258+
for k := range clusterSchema {
259+
if k == "library" ||
260+
k == "is_pinned" ||
261+
k == "num_workers" ||
262+
k == "autoscale" {
263+
continue
264+
}
265+
if d.HasChange(k) {
266+
hasOnlyResizeClusterConfigChanged = false
267+
}
268+
}
269+
clusterInfo, err = clusters.Get(clusterID)
270+
if err != nil {
271+
return err
272+
}
273+
274+
isNumWorkersResizeForNonAutoscalingCluster := hasOnlyResizeClusterConfigChanged &&
275+
hasNumWorkersChanged &&
276+
!hasAutoscaleChanged &&
277+
clusterInfo.State == ClusterStateRunning
278+
isAutoScalingToNonAutoscalingResize := hasOnlyResizeClusterConfigChanged &&
279+
hasAutoscaleChanged &&
280+
hasNumWorkersChanged &&
281+
cluster.Autoscale == nil &&
282+
clusterInfo.State == ClusterStateRunning
283+
isAutoscaleConfigResizeForAutoscalingCluster := hasOnlyResizeClusterConfigChanged &&
284+
hasAutoscaleChanged &&
285+
!hasNumWorkersChanged &&
286+
clusterInfo.State == ClusterStateRunning
287+
isNonAutoScalingToAutoscalingResize := hasOnlyResizeClusterConfigChanged &&
288+
hasAutoscaleChanged &&
289+
hasNumWorkersChanged &&
290+
cluster.Autoscale != nil &&
291+
clusterInfo.State == ClusterStateRunning
292+
293+
// We prefer to use the resize API in cases when only the number of
294+
// workers is changed because a resizing cluster can still serve queries
295+
if isNumWorkersResizeForNonAutoscalingCluster ||
296+
isAutoScalingToNonAutoscalingResize {
297+
clusterInfo, err = clusters.Resize(ResizeRequest{
298+
ClusterID: clusterID,
299+
NumWorkers: cluster.NumWorkers,
300+
})
301+
} else if isAutoscaleConfigResizeForAutoscalingCluster ||
302+
isNonAutoScalingToAutoscalingResize {
303+
clusterInfo, err = clusters.Resize(ResizeRequest{
304+
ClusterID: clusterID,
305+
AutoScale: cluster.Autoscale,
306+
})
307+
} else {
308+
clusterInfo, err = clusters.Edit(cluster)
309+
}
252310
if err != nil {
253311
return err
254312
}
313+
255314
} else {
256315
clusterInfo, err = clusters.Get(clusterID)
257316
if err != nil {

0 commit comments

Comments
 (0)