|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +package org.apache.cloudstack.vnf.broker; |
| 19 | + |
| 20 | +import com.cloud.exception.CloudException; |
| 21 | +import org.apache.cloudstack.vnf.VnfFrameworkConfig; |
| 22 | +import org.apache.logging.log4j.LogManager; |
| 23 | +import org.apache.logging.log4j.Logger; |
| 24 | +import org.apache.http.client.config.RequestConfig; |
| 25 | +import org.apache.http.client.methods.CloseableHttpResponse; |
| 26 | +import org.apache.http.client.methods.HttpDelete; |
| 27 | +import org.apache.http.client.methods.HttpGet; |
| 28 | +import org.apache.http.client.methods.HttpPost; |
| 29 | +import org.apache.http.client.methods.HttpPut; |
| 30 | +import org.apache.http.entity.StringEntity; |
| 31 | +import org.apache.http.impl.client.CloseableHttpClient; |
| 32 | +import org.apache.http.impl.client.HttpClients; |
| 33 | +import org.apache.http.util.EntityUtils; |
| 34 | + |
| 35 | +import java.io.Closeable; |
| 36 | +import java.io.IOException; |
| 37 | +import java.nio.charset.StandardCharsets; |
| 38 | + |
| 39 | +/** |
| 40 | + * Thin HTTP client wrapper around the VNF Broker. |
| 41 | + * Handles timeouts, retries (simple exponential backoff), authentication, and correlation IDs. |
| 42 | + */ |
| 43 | +public class VnfBrokerClient implements Closeable { |
| 44 | + private static final Logger LOGGER = LogManager.getLogger(VnfBrokerClient.class); |
| 45 | + |
| 46 | + private final CloseableHttpClient httpClient; |
| 47 | + private final String baseUrl; |
| 48 | + private final int maxRetries; |
| 49 | + private final int initialDelayMs; |
| 50 | + private final int maxDelayMs; |
| 51 | + private final String authType; |
| 52 | + private final String authToken; |
| 53 | + private final String authUser; |
| 54 | + private final String authPass; |
| 55 | + |
| 56 | + public VnfBrokerClient() { |
| 57 | + this.baseUrl = VnfFrameworkConfig.VnfBrokerUrl.value(); |
| 58 | + int requestTimeout = VnfFrameworkConfig.VnfBrokerTimeout.value() * 1000; |
| 59 | + int connectTimeout = VnfFrameworkConfig.VnfBrokerConnectTimeout.value() * 1000; |
| 60 | + this.maxRetries = VnfFrameworkConfig.VnfMaxRetries.value(); |
| 61 | + this.initialDelayMs = VnfFrameworkConfig.VnfRetryDelayMs.value(); |
| 62 | + this.maxDelayMs = VnfFrameworkConfig.VnfRetryMaxDelayMs.value(); |
| 63 | + this.authType = VnfFrameworkConfig.VnfBrokerAuthType.value(); |
| 64 | + this.authToken = VnfFrameworkConfig.VnfBrokerAuthToken.value(); |
| 65 | + this.authUser = VnfFrameworkConfig.VnfBrokerUsername.value(); |
| 66 | + this.authPass = VnfFrameworkConfig.VnfBrokerPassword.value(); |
| 67 | + |
| 68 | + RequestConfig config = RequestConfig.custom() |
| 69 | + .setConnectTimeout(connectTimeout) |
| 70 | + .setSocketTimeout(requestTimeout) |
| 71 | + .setConnectionRequestTimeout(connectTimeout) |
| 72 | + .build(); |
| 73 | + this.httpClient = HttpClients.custom().setDefaultRequestConfig(config).build(); |
| 74 | + } |
| 75 | + |
| 76 | + public String testConnectivity(String correlationId) throws CloudException { |
| 77 | + return executeGet("/health", correlationId); |
| 78 | + } |
| 79 | + |
| 80 | + public String createFirewallRule(String payload, String correlationId) throws CloudException { |
| 81 | + return executePost("/rules/firewall", payload, correlationId); |
| 82 | + } |
| 83 | + |
| 84 | + public String updateFirewallRule(String ruleId, String payload, String correlationId) throws CloudException { |
| 85 | + return executePut("/rules/firewall/" + ruleId, payload, correlationId); |
| 86 | + } |
| 87 | + |
| 88 | + public String deleteFirewallRule(String ruleId, String correlationId) throws CloudException { |
| 89 | + return executeDelete("/rules/firewall/" + ruleId, correlationId); |
| 90 | + } |
| 91 | + |
| 92 | + public String createNatRule(String payload, String correlationId) throws CloudException { |
| 93 | + return executePost("/rules/nat", payload, correlationId); |
| 94 | + } |
| 95 | + |
| 96 | + public String reconcileNetwork(String networkIdentifier, String payload, String correlationId) throws CloudException { |
| 97 | + return executePost("/reconcile/" + networkIdentifier, payload, correlationId); |
| 98 | + } |
| 99 | + |
| 100 | + private String executeGet(String path, String correlationId) throws CloudException { |
| 101 | + HttpGet get = new HttpGet(composeUrl(path)); |
| 102 | + decorateHeaders(get, correlationId); |
| 103 | + return perform(get, correlationId); |
| 104 | + } |
| 105 | + |
| 106 | + private String executePost(String path, String payload, String correlationId) throws CloudException { |
| 107 | + HttpPost post = new HttpPost(composeUrl(path)); |
| 108 | + decorateHeaders(post, correlationId); |
| 109 | + if (payload != null) { |
| 110 | + post.setEntity(new StringEntity(payload, StandardCharsets.UTF_8)); |
| 111 | + post.setHeader("Content-Type", "application/json"); |
| 112 | + } |
| 113 | + return perform(post, correlationId); |
| 114 | + } |
| 115 | + |
| 116 | + private String executePut(String path, String payload, String correlationId) throws CloudException { |
| 117 | + HttpPut put = new HttpPut(composeUrl(path)); |
| 118 | + decorateHeaders(put, correlationId); |
| 119 | + if (payload != null) { |
| 120 | + put.setEntity(new StringEntity(payload, StandardCharsets.UTF_8)); |
| 121 | + put.setHeader("Content-Type", "application/json"); |
| 122 | + } |
| 123 | + return perform(put, correlationId); |
| 124 | + } |
| 125 | + |
| 126 | + private String executeDelete(String path, String correlationId) throws CloudException { |
| 127 | + HttpDelete del = new HttpDelete(composeUrl(path)); |
| 128 | + decorateHeaders(del, correlationId); |
| 129 | + return perform(del, correlationId); |
| 130 | + } |
| 131 | + |
| 132 | + private String composeUrl(String path) throws CloudException { |
| 133 | + if (baseUrl == null || baseUrl.isEmpty()) { |
| 134 | + throw new CloudException("VNF Broker URL not configured (vnf.broker.url)"); |
| 135 | + } |
| 136 | + if (!path.startsWith("/")) { |
| 137 | + path = "/" + path; |
| 138 | + } |
| 139 | + return baseUrl + path; |
| 140 | + } |
| 141 | + |
| 142 | + private void decorateHeaders(org.apache.http.client.methods.HttpRequestBase request, String correlationId) { |
| 143 | + request.setHeader("X-Correlation-ID", correlationId); |
| 144 | + switch (authType == null ? "none" : authType.toLowerCase()) { |
| 145 | + case "bearer": |
| 146 | + case "jwt": |
| 147 | + if (authToken != null && !authToken.isEmpty()) { |
| 148 | + request.setHeader("Authorization", "Bearer " + authToken); |
| 149 | + } |
| 150 | + break; |
| 151 | + case "basic": |
| 152 | + if (authUser != null && authPass != null && !authUser.isEmpty()) { |
| 153 | + String basic = java.util.Base64.getEncoder().encodeToString((authUser + ":" + authPass).getBytes(StandardCharsets.UTF_8)); |
| 154 | + request.setHeader("Authorization", "Basic " + basic); |
| 155 | + } |
| 156 | + break; |
| 157 | + default: |
| 158 | + // no auth |
| 159 | + } |
| 160 | + } |
| 161 | + |
| 162 | + private String perform(org.apache.http.client.methods.HttpRequestBase request, String correlationId) throws CloudException { |
| 163 | + int attempt = 0; |
| 164 | + int delay = initialDelayMs; |
| 165 | + while (true) { |
| 166 | + attempt++; |
| 167 | + try (CloseableHttpResponse response = httpClient.execute(request)) { |
| 168 | + int status = response.getStatusLine().getStatusCode(); |
| 169 | + String body = response.getEntity() != null ? EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8) : ""; |
| 170 | + if (status >= 200 && status < 300) { |
| 171 | + LOGGER.info("Broker call success path=" + request.getURI() + " status=" + status + " corrId=" + correlationId); |
| 172 | + return body; |
| 173 | + } |
| 174 | + LOGGER.warn("Broker call failed path=" + request.getURI() + " status=" + status + " attempt=" + attempt + " corrId=" + correlationId); |
| 175 | + if (attempt > maxRetries || status < 500) { // don't retry client errors |
| 176 | + throw new CloudException("Broker call failed (status=" + status + ") body=" + body); |
| 177 | + } |
| 178 | + } catch (IOException ioe) { |
| 179 | + LOGGER.warn("Broker IO error attempt=" + attempt + " corrId=" + correlationId + " msg=" + ioe.getMessage()); |
| 180 | + if (attempt > maxRetries) { |
| 181 | + throw new CloudException("Broker call IO failure: " + ioe.getMessage(), ioe); |
| 182 | + } |
| 183 | + } |
| 184 | + try { |
| 185 | + Thread.sleep(delay); |
| 186 | + } catch (InterruptedException ie) { |
| 187 | + Thread.currentThread().interrupt(); |
| 188 | + throw new CloudException("Interrupted during broker retry", ie); |
| 189 | + } |
| 190 | + delay = Math.min(delay * 2, maxDelayMs); |
| 191 | + } |
| 192 | + } |
| 193 | + |
| 194 | + @Override |
| 195 | + public void close() throws IOException { |
| 196 | + httpClient.close(); |
| 197 | + } |
| 198 | +} |
0 commit comments