Skip to content
This repository was archived by the owner on Dec 20, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.security.access.prepost.PreAuthorize
import org.springframework.web.bind.annotation.*
import org.springframework.web.util.UriComponents
import org.springframework.web.util.UriComponentsBuilder
import retrofit.RetrofitError

import static net.logstash.logback.argument.StructuredArguments.value
Expand Down Expand Up @@ -69,10 +71,11 @@ class PipelineController {
@CompileDynamic
@ApiOperation(value = "Save a pipeline definition")
@PostMapping('')
void savePipeline(
ResponseEntity<Void> savePipeline(
@RequestBody Map pipeline,
@RequestParam(value = "staleCheck", required = false, defaultValue = "false")
Boolean staleCheck) {
@RequestParam(value = "staleCheck", required = false, defaultValue = "false") Boolean staleCheck,
@RequestParam(value = "waitForCompletion", required = false, defaultValue = "true") Boolean waitForCompletion,
UriComponentsBuilder uriComponentsBuilder) {
def operation = [
description: (String) "Save pipeline '${pipeline.get("name") ?: "Unknown"}'",
application: pipeline.get('application'),
Expand All @@ -85,15 +88,43 @@ class PipelineController {
]
]
]
def result = taskService.createAndWaitForCompletion(operation)
String resultStatus = result.get("status")
def result = waitForCompletion ? taskService.createAndWaitForCompletion(operation) : taskService.create(operation)
return processTaskResponse(result, waitForCompletion, uriComponentsBuilder)
}

@CompileDynamic
private ResponseEntity<Void> processTaskResponse(Map<String, Object> result,
Boolean waitForCompletion,
UriComponentsBuilder uriComponentsBuilder) {

if (!waitForCompletion) {
if (result.get("ref") == null) {
log.warn("No ref field found in result")
return ResponseEntity.accepted().build()
}

String taskId = ((String) result.get("ref")).split("/")[2]
//Let api client poll the task status URL.
UriComponents uriComponents = uriComponentsBuilder.path("/tasks/{id}").buildAndExpand(taskId)
URI location = uriComponents.toUri();
return ResponseEntity.accepted().location(location).build()
}

// Process status of the operation and send response accordingly.
String resultStatus = result.get("status")
// Send 400 bad request
if (!"SUCCEEDED".equalsIgnoreCase(resultStatus)) {
String exception = result.variables.find { it.key == "exception" }?.value?.details?.errors?.getAt(0)
Map variables = result.get("variables") ? (Map)result.get("variables") : [:]
Map exceptionDetails = (variables.find { it.key == "exception" }?.value as Map)?.get('details') as Map
String exception = (exceptionDetails?.get('errors') as List)?.getAt(0)
throw new PipelineException(
exception ?: "Pipeline save operation did not succeed: ${result.get("id", "unknown task id")} (status: ${resultStatus})"
)
}

//Send 200 if Succeeds
return ResponseEntity.ok().build()

}

@ApiOperation(value = "Rename a pipeline definition")
Expand All @@ -117,7 +148,7 @@ class PipelineController {
@CompileDynamic
@ApiOperation(value = "Update a pipeline definition", response = HashMap.class)
@PutMapping("{id}")
Map updatePipeline(@PathVariable("id") String id, @RequestBody Map pipeline) {
ResponseEntity<?> updatePipeline(@PathVariable("id") String id, @RequestBody Map pipeline, @RequestParam(value = "waitForCompletion", required = false, defaultValue = "true") Boolean waitForCompletion, UriComponentsBuilder uriComponentsBuilder) {
def operation = [
description: (String) "Update pipeline '${pipeline.get("name") ?: 'Unknown'}'",
application: (String) pipeline.get('application'),
Expand All @@ -130,19 +161,17 @@ class PipelineController {
]
]

def result = taskService.createAndWaitForCompletion(operation)
String resultStatus = result.get("status")
def result = waitForCompletion ? taskService.createAndWaitForCompletion(operation) : taskService.create(operation)
ResponseEntity responseEntity = processTaskResponse(result, waitForCompletion, uriComponentsBuilder)

if (!"SUCCEEDED".equalsIgnoreCase(resultStatus)) {
String exception = result.variables.find { it.key == "exception" }?.value?.details?.errors?.getAt(0)
throw new PipelineException(
exception ?: "Pipeline save operation did not succeed: ${result.get("id", "unknown task id")} (status: ${resultStatus})"
)
if (responseEntity.getStatusCodeValue() == HttpStatus.OK.value()) {
Map body = front50Service.getPipelineConfigsForApplication((String) pipeline.get("application"), true)?.find {
id == (String) it.get("id")
}
return ResponseEntity.ok(body)
}

return front50Service.getPipelineConfigsForApplication((String) pipeline.get("application"), true)?.find {
id == (String) it.get("id")
}
return responseEntity
}

@ApiOperation(value = "Cancel a pipeline execution")
Expand Down Expand Up @@ -346,4 +375,5 @@ class PipelineController {
super(message)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,59 +27,134 @@ import retrofit.RetrofitError
import retrofit.client.Response
import retrofit.converter.JacksonConverter
import spock.lang.Specification
import spock.lang.Unroll

import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put

class PipelineControllerSpec extends Specification {

def "should update a pipeline"() {
private Map pipeline = [
id: "id",
name: "test pipeline",
stages: [],
triggers: [],
limitConcurrent: true,
parallel: true,
index: 4,
application: "application"
]

@Unroll
def "should create a pipeline and return status #result for task status #taskStatus"() {
given:
def taskSerivce = Mock(TaskService)
def front50Service = Mock(Front50Service)
MockMvc mockMvc = MockMvcBuilders.standaloneSetup(new PipelineController(objectMapper: new ObjectMapper(), taskService: taskSerivce, front50Service: front50Service)).build()

and:
def pipeline = [
id: "id",
name: "test pipeline",
stages: [],
triggers: [],
limitConcurrent: true,
parallel: true,
index: 4,
application: "application"
Map inputMap = [
description: "Save pipeline 'test pipeline'" as String,
application: 'application',
job : [
[
type : 'savePipeline',
pipeline : Base64.encoder.encodeToString(new ObjectMapper().writeValueAsString(pipeline).bytes),
user : 'anonymous',
'staleCheck': false
]
]
]
MockMvc mockMvc = MockMvcBuilders.standaloneSetup(new PipelineController(objectMapper: new ObjectMapper(), taskService: taskSerivce, front50Service: front50Service)).build()

when:
def response = mockMvc.perform(
put("/pipelines/${pipeline.id}").contentType(MediaType.APPLICATION_JSON)
post("/pipelines/").contentType(MediaType.APPLICATION_JSON).param('waitForCompletion', waitForCompletion.toString())
.content(new ObjectMapper().writeValueAsString(pipeline))
).andReturn().response

then:
response.status == 200
1 * taskSerivce.createAndWaitForCompletion([
response.status == result

if (waitForCompletion) {
1 * taskSerivce.createAndWaitForCompletion(inputMap) >> { [id: 'task-id', application: 'application', status: taskStatus] }
}

if (!waitForCompletion) {
1 * taskSerivce.create(inputMap) >> { ['ref': "/pipelines/task-id"] }
}

if (result == 200) { // check for empty response body.
assert response.getContentAsString().length() == 0
}
if (result == 202) { // check location header exists.
assert response.getHeader('Location').equalsIgnoreCase('http://localhost/tasks/task-id')
}
0 * _

where:
taskStatus | waitForCompletion || result
'SUCCEEDED' | true || 200
'BUFFERED' | true || 400
'TERMINAL' | true || 400
'SKIPPED' | true || 400
'STOPPED' | true || 400
'CANCELED' | true || 400
'ASYNC' | false || 202
}

@Unroll
def "should update a pipeline and return status #result for task status #taskStatus"() {
given:
def taskSerivce = Mock(TaskService)
def front50Service = Mock(Front50Service)
def inputMap = [
description: "Update pipeline 'test pipeline'" as String,
application: 'application',
job: [
[
type: 'updatePipeline',
pipeline: Base64.encoder.encodeToString(new ObjectMapper().writeValueAsString([
id: 'id',
name: 'test pipeline',
stages: [],
triggers: [],
limitConcurrent: true,
parallel: true,
index: 4,
application: 'application'
]).bytes),
pipeline: Base64.encoder.encodeToString(new ObjectMapper().writeValueAsString(pipeline).bytes),
user: 'anonymous'
]
]
]) >> { [id: 'task-id', application: 'application', status: 'SUCCEEDED'] }
1 * front50Service.getPipelineConfigsForApplication('application', true) >> []
]
MockMvc mockMvc = MockMvcBuilders.standaloneSetup(new PipelineController(objectMapper: new ObjectMapper(), taskService: taskSerivce, front50Service: front50Service)).build()

when:
def response = mockMvc.perform(
put("/pipelines/${pipeline.id}").contentType(MediaType.APPLICATION_JSON).param('waitForCompletion', waitForCompletion.toString())
.content(new ObjectMapper().writeValueAsString(pipeline))
).andReturn().response

then:
response.status == result

if (waitForCompletion) {
1 * taskSerivce.createAndWaitForCompletion(inputMap) >> { [id: 'task-id', application: 'application', status: taskStatus] }
}

if (!waitForCompletion) {
1 * taskSerivce.create(inputMap) >> { ['ref': "/pipelines/task-id"] }
}
if (result == 200) {
1 * front50Service.getPipelineConfigsForApplication('application', true) >> [['id': 'id']]
}
if (result == 200) { // check body exists.
assert response.getContentAsString().equalsIgnoreCase('{"id":"id"}')
}
if (result == 202) { // check location header exists.
assert response.getHeader('Location').equalsIgnoreCase('http://localhost/tasks/task-id')
}
0 * _

where:
taskStatus | waitForCompletion || result
'SUCCEEDED' | true || 200
'BUFFERED' | true || 400
'TERMINAL' | true || 400
'SKIPPED' | true || 400
'STOPPED' | true || 400
'CANCELED' | true || 400
'ASYNC' | false || 202

}

def "should propagate pipeline template errors"() {
Expand Down