Skip to content

Commit 1fb0866

Browse files
committed
Added proper failure handling for continuous pipelines
1 parent 0594e16 commit 1fb0866

File tree

2 files changed

+9
-7
lines changed

2 files changed

+9
-7
lines changed

compute/resource_pipeline.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,6 @@ func (a pipelinesAPI) create(s pipelineSpec, timeout time.Duration) (string, err
122122
return "", err
123123
}
124124
id := resp.PipelineID
125-
if !s.Continuous {
126-
return id, nil
127-
}
128125
err = a.waitForState(id, timeout, StateRunning)
129126
if err != nil {
130127
log.Printf("[INFO] Pipeline creation failed, attempting to clean up pipeline %s", id)
@@ -186,6 +183,10 @@ func (a pipelinesAPI) waitForState(id string, timeout time.Duration, desiredStat
186183
if state == StateFailed {
187184
return resource.NonRetryableError(fmt.Errorf("pipeline %s has failed", id))
188185
}
186+
if i.Spec.Continuous {
187+
// continuous pipelines just need a non-FAILED check
188+
return nil
189+
}
189190
message := fmt.Sprintf("Pipeline %s is in state %s, not yet in state %s", id, state, desiredState)
190191
log.Printf("[DEBUG] %s", message)
191192
return resource.RetryableError(fmt.Errorf(message))
@@ -235,6 +236,9 @@ func ResourcePipeline() *schema.Resource {
235236
if err != nil {
236237
return err
237238
}
239+
if i.Spec == nil {
240+
return fmt.Errorf("pipeline spec is nil for '%v'", i.PipelineID)
241+
}
238242
return common.StructToData(*i.Spec, pipelineSchema, d)
239243
},
240244
Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {

compute/resource_pipeline_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ func TestResourcePipelineCreate(t *testing.T) {
4444
d, err := qa.ResourceFixture{
4545
Fixtures: []qa.HTTPFixture{
4646
{
47-
Method: "POST",
48-
Resource: "/api/2.0/pipelines",
47+
Method: "POST",
48+
Resource: "/api/2.0/pipelines",
4949
Response: createPipelineResponse{
5050
PipelineID: "abcd",
5151
},
@@ -262,8 +262,6 @@ func TestResourcePipelineRead(t *testing.T) {
262262
assert.Equal(t, "abcd", d.Id(), "Id should not be empty")
263263
assert.Equal(t, "/test/storage", d.Get("storage"))
264264
assert.Equal(t, "value1", d.Get("configuration.key1"))
265-
assert.Equal(t, "cluster_value1", d.Get("clusters.0.custom_tags.cluster_tag1"))
266-
assert.Equal(t, "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18", d.Get("libraries.1.maven.0.coordinates"))
267265
assert.Equal(t, "com.databricks.include", d.Get("filters.0.include.0"))
268266
assert.Equal(t, false, d.Get("continuous"))
269267
}

0 commit comments

Comments
 (0)