11package io .streamzi .openshift ;
22
33
4+ import com .fasterxml .jackson .core .JsonProcessingException ;
5+ import com .fasterxml .jackson .databind .ObjectMapper ;
46import io .fabric8 .kubernetes .api .model .*;
57import io .fabric8 .openshift .api .model .DeploymentConfig ;
6- import io .fabric8 .openshift .api .model .DeploymentConfigBuilder ;
78import io .streamzi .openshift .dataflow .model .ProcessorFlow ;
89import io .streamzi .openshift .dataflow .model .ProcessorNodeTemplate ;
910import io .streamzi .openshift .dataflow .model .serialization .ProcessorFlowReader ;
1516import javax .enterprise .context .ApplicationScoped ;
1617import javax .ws .rs .*;
1718import java .io .File ;
18- import java .util .ArrayList ;
19- import java .util .HashMap ;
20- import java .util .List ;
21- import java .util .Map ;
19+ import java .util .*;
2220import java .util .logging .Level ;
2321import java .util .logging .Logger ;
2422
@@ -33,6 +31,8 @@ public class API {
3331 @ EJB (beanInterface = ClientContainer .class )
3432 private ClientContainer container ;
3533
34+ private final String bootstrapServersDefault = "my-cluster-kafka" ;
35+
3636 @ GET
3737 @ Path ("/pods" )
3838 @ Produces ("application/json" )
@@ -98,6 +98,25 @@ public List<String> listProcessors() {
9898 //todo: look at applying labels to imagestreams and getting the necessary data from there.
9999 //todo: could apply special labels to the deployment configs to hold the graph structure.
100100
101+ // List<ImageStream> images = container.getOSClient().imageStreams().inAnyNamespace().withLabel("streamzi.io/kind", "processor").list().getItems();
102+ // for (ImageStream image : images) {
103+ // Map<String, String> labels = image.getMetadata().getLabels();
104+ // final ProcessorNodeTemplate template = new ProcessorNodeTemplate();
105+ // template.setId(labels.get("streamzi.io/processor/id"));
106+ // template.setDescription(labels.get("streamzi.io/processor/description"));
107+ // template.setName(labels.get("streamzi.io/processor/label"));
108+ // template.setImageName(labels.get("streamzi.io/processor/imagename"));
109+ //
110+ // String[] inputsLabel = labels.get("inputs").split(",");
111+ // List<String> inputs = new ArrayList<>(Arrays.asList(inputsLabel));
112+ //
113+ // String[] outputsLabel = labels.get("outputs").split(",");
114+ // List<String> outputs = new ArrayList<>(Arrays.asList(outputsLabel));
115+ //
116+ // template.setInputs(inputs);
117+ // template.setOutputs(outputs);
118+ // }
119+
101120 File [] templates = container .getTemplateDir ().listFiles ();
102121 if (templates != null ) {
103122 for (File f : templates ) {
@@ -202,16 +221,16 @@ public void postFlow(String flowJson) {
202221
203222 //remove DCs that are no longer required.
204223 List <DeploymentConfig > existingDCs = container .getOSClient ().deploymentConfigs ().inNamespace (container .getNamespace ()).withLabel ("app" , flow .getName ()).list ().getItems ();
205- for (DeploymentConfig existingDC : existingDCs ){
224+ for (DeploymentConfig existingDC : existingDCs ) {
206225
207226 boolean found = false ;
208- for (DeploymentConfig newDC : deploymentConfigs ){
209- if (existingDC .getMetadata ().getName ().equals (newDC .getMetadata ().getName ())){
227+ for (DeploymentConfig newDC : deploymentConfigs ) {
228+ if (existingDC .getMetadata ().getName ().equals (newDC .getMetadata ().getName ())) {
210229 found = true ;
211230 }
212231 }
213232
214- if (!found ){
233+ if (!found ) {
215234 logger .info ("Removing DeploymentConfig: " + container .getNamespace () + "/" + existingDC .getMetadata ().getName ());
216235 container .getOSClient ().deploymentConfigs ().inNamespace (container .getNamespace ()).withName (existingDC .getMetadata ().getName ()).delete ();
217236 }
@@ -221,4 +240,26 @@ public void postFlow(String flowJson) {
221240 logger .log (Level .SEVERE , "Error parsing JSON flow data: " + e .getMessage (), e );
222241 }
223242 }
243+
244+ @ GET
245+ @ Path ("/globalproperties" )
246+ @ Produces ("application/json" )
247+ public String getGlobalProperties () {
248+ final Properties props = new Properties ();
249+
250+ String bootstrapServers = EnvironmentResolver .get ("bootstrap.servers" );
251+ if (bootstrapServers != null && !bootstrapServers .equals ("" )) {
252+ props .put ("bootstrap_servers" , bootstrapServers );
253+ } else {
254+ props .put ("bootstrap_servers" , bootstrapServersDefault );
255+ }
256+
257+ ObjectMapper mapper = new ObjectMapper ();
258+ try {
259+ return mapper .writeValueAsString (props );
260+ } catch (JsonProcessingException e ) {
261+ logger .severe (e .getMessage ());
262+ return "{}" ;
263+ }
264+ }
224265}
0 commit comments