@@ -76,17 +76,17 @@ public PipelineConnectionsResource(PipelineStreamConnectionsService connectionsS
7676 this .entityScopeService = entityScopeService ;
7777 }
7878
79- @ ApiOperation (value = "Connect processing pipelines to a stream" , notes = "" )
79+ @ ApiOperation (value = "Connect processing pipelines to a stream" )
8080 @ POST
8181 @ Path ("/to_stream" )
8282 @ RequiresPermissions (PipelineRestPermissions .PIPELINE_CONNECTION_EDIT )
8383 @ AuditEvent (type = PipelineProcessorAuditEventTypes .PIPELINE_CONNECTION_UPDATE )
8484 public PipelineConnections connectPipelines (@ ApiParam (name = "Json body" , required = true ) @ NotNull PipelineConnections connection ) throws NotFoundException {
8585 final String streamId = connection .streamId ();
8686
87+ // verify the stream exists and is editable
8788 checkNotEditable (streamId , "Cannot connect pipeline to non editable stream" );
88- // verify the stream exists
89- checkPermission (RestPermissions .STREAMS_READ , streamId );
89+ checkPermission (RestPermissions .STREAMS_EDIT , streamId );
9090 streamService .load (streamId );
9191
9292 // verify the pipelines exist
@@ -97,7 +97,7 @@ public PipelineConnections connectPipelines(@ApiParam(name = "Json body", requir
9797 return connectionsService .save (connection );
9898 }
9999
100- @ ApiOperation (value = "Connect streams to a processing pipeline" , notes = "" )
100+ @ ApiOperation (value = "Connect streams to a processing pipeline" )
101101 @ POST
102102 @ Path ("/to_pipeline" )
103103 @ RequiresPermissions (PipelineRestPermissions .PIPELINE_CONNECTION_EDIT )
@@ -106,7 +106,7 @@ public Set<PipelineConnections> connectStreams(@ApiParam(name = "Json body", req
106106 final String pipelineId = connection .pipelineId ();
107107 final Set <PipelineConnections > updatedConnections = Sets .newHashSet ();
108108
109- // verify the pipeline exists and is editable
109+ // verify the pipeline exists
110110 checkPermission (PipelineRestPermissions .PIPELINE_READ , pipelineId );
111111 checkScope (pipelineService .load (pipelineId ));
112112
@@ -118,6 +118,7 @@ public Set<PipelineConnections> connectStreams(@ApiParam(name = "Json body", req
118118 connection .streamIds ().forEach (streamId ->
119119 checkNotEditable (streamId , "Cannot connect pipeline to non editable stream" )
120120 );
121+
121122 // remove deleted pipeline connections
122123 for (PipelineConnections pipelineConnection : pipelineConnections ) {
123124 if (!connection .streamIds ().contains (pipelineConnection .streamId ())) {
@@ -132,8 +133,8 @@ public Set<PipelineConnections> connectStreams(@ApiParam(name = "Json body", req
132133
133134 // update pipeline connections
134135 for (String streamId : connection .streamIds ()) {
135- // verify the stream exist
136- checkPermission (RestPermissions .STREAMS_READ , streamId );
136+ // verify the stream exists and is editable
137+ checkPermission (RestPermissions .STREAMS_EDIT , streamId );
137138 streamService .load (streamId );
138139
139140 PipelineConnections updatedConnection ;
@@ -177,7 +178,7 @@ public PipelineConnections getPipelinesForStream(@ApiParam(name = "streamId") @P
177178 @ ApiOperation ("Get all pipeline connections" )
178179 @ GET
179180 @ RequiresPermissions (PipelineRestPermissions .PIPELINE_CONNECTION_READ )
180- public Set <PipelineConnections > getAll () throws NotFoundException {
181+ public Set <PipelineConnections > getAll () {
181182 final Set <PipelineConnections > pipelineConnections = connectionsService .loadAll ();
182183
183184 final Set <PipelineConnections > filteredConnections = Sets .newHashSetWithExpectedSize (pipelineConnections .size ());
0 commit comments