diff --git a/src/main/java/org/jgroups/protocols/kubernetes/Client.java b/src/main/java/org/jgroups/protocols/kubernetes/Client.java index e1cb5da..8c3baa6 100644 --- a/src/main/java/org/jgroups/protocols/kubernetes/Client.java +++ b/src/main/java/org/jgroups/protocols/kubernetes/Client.java @@ -8,6 +8,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -21,7 +22,6 @@ */ public class Client { protected final String masterUrl; - protected final Map headers; protected final int connectTimeout; protected final int readTimeout; protected final int operationAttempts; @@ -30,29 +30,19 @@ public class Client { protected final String info; protected final Log log; - public Client(String masterUrl, Map headers, int connectTimeout, int readTimeout, int operationAttempts, + public Client(String masterUrl, int connectTimeout, int readTimeout, int operationAttempts, long operationSleep, StreamProvider streamProvider, Log log) { this.masterUrl = masterUrl; - this.headers = headers; this.connectTimeout = connectTimeout; this.readTimeout = readTimeout; this.operationAttempts = operationAttempts; this.operationSleep = operationSleep; this.streamProvider = streamProvider; this.log=log; - Map maskedHeaders=new TreeMap<>(); - if (headers != null) { - for (Map.Entry header : headers.entrySet()) { - String key = header.getKey(); - String value = header.getValue(); - if ("Authorization".equalsIgnoreCase(key) && value != null) - value = "#MASKED:" + value.length() + "#"; - maskedHeaders.put(key, value); - } - } - info=String.format("%s[masterUrl=%s, headers=%s, connectTimeout=%s, readTimeout=%s, operationAttempts=%s, " + + + info=String.format("%s[masterUrl=%s, connectTimeout=%s, readTimeout=%s, operationAttempts=%s, " + "operationSleep=%s, streamProvider=%s]", - getClass().getSimpleName(), masterUrl, maskedHeaders, connectTimeout, readTimeout, + getClass().getSimpleName(), masterUrl, connectTimeout, readTimeout, operationAttempts, operationSleep, streamProvider); } @@ -71,7 +61,7 @@ protected String fetchFromKubernetes(String op, String namespace, String labels, InputStream stream=null; String retval=null; try { - stream=openStream(url, headers, connectTimeout, readTimeout, operationAttempts, operationSleep, streamProvider); + stream=openStream(url, new HashMap<>(), connectTimeout, readTimeout, operationAttempts, operationSleep, streamProvider); retval=Util.readContents(stream); if(dump_requests) System.out.printf("--> %s\n<-- %s\n", url, retval); @@ -231,19 +221,16 @@ protected boolean podRunning(Json podStatus) { return false; } // 5. ready condition must be "True" - Boolean readyCondition = Boolean.FALSE; + boolean readyCondition = Boolean.FALSE; List conditions = podStatus.at("conditions").asJsonList(); // walk through all the conditions and find type=="Ready" and get the value of the status property for(Json condition: conditions) { String type = condition.at("type").asString(); if(type.equalsIgnoreCase("Ready")) { - readyCondition = new Boolean(condition.at("status").asString()); + readyCondition = Boolean.parseBoolean(condition.at("status").asString()); } } - log.trace( "conditions with type==\"Ready\" has status property value = %s", readyCondition.toString()); - if(!readyCondition.booleanValue()) { - return false; - } - return true; + log.trace( "conditions with type==\"Ready\" has status property value = %s", Boolean.toString(readyCondition)); + return readyCondition; } } diff --git a/src/main/java/org/jgroups/protocols/kubernetes/KUBE_PING.java b/src/main/java/org/jgroups/protocols/kubernetes/KUBE_PING.java index 3b565ca..8d2f7d0 100644 --- a/src/main/java/org/jgroups/protocols/kubernetes/KUBE_PING.java +++ b/src/main/java/org/jgroups/protocols/kubernetes/KUBE_PING.java @@ -11,6 +11,7 @@ import org.jgroups.protocols.PingHeader; import org.jgroups.protocols.kubernetes.stream.CertificateStreamProvider; import org.jgroups.protocols.kubernetes.stream.StreamProvider; +import org.jgroups.protocols.kubernetes.stream.TokenProvider; import org.jgroups.protocols.kubernetes.stream.TokenStreamProvider; import org.jgroups.stack.IpAddress; import org.jgroups.util.NameCache; @@ -149,7 +150,6 @@ public void init() throws Exception { return; // no further initialization necessary } log.info("namespace %s set; clustering enabled", namespace); - Map headers=new HashMap<>(); StreamProvider streamProvider; if(clientCertFile != null) { if(masterProtocol == null) @@ -157,16 +157,10 @@ public void init() throws Exception { streamProvider=new CertificateStreamProvider(clientCertFile, clientKeyFile, clientKeyPassword, clientKeyAlgo, caCertFile); } else { - String saToken=readFileToString(saTokenFile); - if(saToken != null) { - // curl -k -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" \ - // https://172.30.0.2:443/api/v1/namespaces/dward/pods?labelSelector=application%3Deap-app - headers.put("Authorization", "Bearer " + saToken); - } - streamProvider = new TokenStreamProvider(saToken, caCertFile); + streamProvider = new TokenStreamProvider(new TokenProvider(saTokenFile), caCertFile); } String url=String.format("%s://%s:%s/api/%s", masterProtocol, masterHost, masterPort, apiVersion); - client=new Client(url, headers, connectTimeout, readTimeout, operationAttempts, operationSleep, streamProvider, log); + client=new Client(url, connectTimeout, readTimeout, operationAttempts, operationSleep, streamProvider, log); log.debug("KubePING configuration: " + toString()); } diff --git a/src/main/java/org/jgroups/protocols/kubernetes/stream/TokenProvider.java b/src/main/java/org/jgroups/protocols/kubernetes/stream/TokenProvider.java new file mode 100644 index 0000000..3659be1 --- /dev/null +++ b/src/main/java/org/jgroups/protocols/kubernetes/stream/TokenProvider.java @@ -0,0 +1,50 @@ +package org.jgroups.protocols.kubernetes.stream; + +import mjson.Json; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.logging.Logger; + +import static org.jgroups.protocols.kubernetes.Utils.readFileToString; + +public class TokenProvider +{ + private static final Logger log = Logger.getLogger(TokenProvider.class.getName()); + private final String tokenFile; + private volatile long tokenExpiry; + private volatile String token; + + public TokenProvider(String tokenFile) { + this.tokenFile = tokenFile; + } + + public String getToken() throws IOException { + long currentTime = System.currentTimeMillis() / 1000; + if (token == null || (tokenExpiry > 0 && tokenExpiry < currentTime)) { + synchronized (this) { + if (token == null || (tokenExpiry > 0 && tokenExpiry < currentTime)) { + log.info("Refreshing token from file " + tokenFile); + token = readFileToString(tokenFile).trim(); + tokenExpiry = getExpiry(token); + } + } + } + return token; + } + + static long getExpiry(String jwtToken) throws IOException { + try { + String[] parts = jwtToken.split("\\."); + if (parts.length < 2) throw new IOException("Invalid JWT token"); + String payloadJson = new String(Base64.getUrlDecoder().decode(parts[1]), StandardCharsets.UTF_8); + Json payload = Json.read(payloadJson); + if (payload.has("exp")) return payload.at("exp").asLong(); + log.info("No 'exp' claim found."); + } catch (Exception e) { + throw new IOException("Error decoding JWT: " + e.getMessage()); + } + return -1; + } +} diff --git a/src/main/java/org/jgroups/protocols/kubernetes/stream/TokenStreamProvider.java b/src/main/java/org/jgroups/protocols/kubernetes/stream/TokenStreamProvider.java index 4029fef..85587fb 100644 --- a/src/main/java/org/jgroups/protocols/kubernetes/stream/TokenStreamProvider.java +++ b/src/main/java/org/jgroups/protocols/kubernetes/stream/TokenStreamProvider.java @@ -1,12 +1,14 @@ package org.jgroups.protocols.kubernetes.stream; import static org.jgroups.protocols.kubernetes.Utils.openFile; +import static org.jgroups.protocols.kubernetes.Utils.readFileToString; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -31,20 +33,27 @@ public class TokenStreamProvider extends BaseStreamProvider { private static final Logger log = Logger.getLogger(TokenStreamProvider.class.getName()); - private String token; + private final TokenProvider tokenProvider; - private String caCertFile; + private final String caCertFile; - private SSLSocketFactory factory; + private volatile SSLSocketFactory factory; - public TokenStreamProvider(String token, String caCertFile) { - this.token = token; + public TokenStreamProvider(TokenProvider tokenProvider, String caCertFile) { + this.tokenProvider = tokenProvider; this.caCertFile = caCertFile; } @Override public InputStream openStream(String url, Map headers, int connectTimeout, int readTimeout) throws IOException { + String saToken = tokenProvider.getToken(); + if (saToken != null) { + // curl -k -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" \ + // https://172.30.0.2:443/api/v1/namespaces/dward/pods?labelSelector=application%3Deap-app + headers.put("Authorization", "Bearer " + saToken); + } + URLConnection connection = openConnection(url, headers, connectTimeout, readTimeout); if (connection instanceof HttpsURLConnection) { @@ -60,15 +69,10 @@ public InputStream openStream(String url, Map headers, int conne } } - if (token != null) { - // curl -k -H "Authorization: Bearer $(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" \ - // https://172.30.0.2:443/api/v1/namespaces/dward/pods?labelSelector=application%3Deap-app - headers.put("Authorization", "Bearer " + token); - } return connection.getInputStream(); } - static TrustManager[] configureCaCert(String caCertFile) throws Exception { + static TrustManager[] configureCaCert(String caCertFile) throws Exception { if (caCertFile != null && !caCertFile.isEmpty()) { try { InputStream pemInputStream = openFile(caCertFile); diff --git a/src/test/java/org/jgroups/ping/kube/test/TestClient.java b/src/test/java/org/jgroups/ping/kube/test/TestClient.java index f08c743..f2dcfda 100644 --- a/src/test/java/org/jgroups/ping/kube/test/TestClient.java +++ b/src/test/java/org/jgroups/ping/kube/test/TestClient.java @@ -24,7 +24,7 @@ public TestClient() throws URISyntaxException, IOException { } public TestClient(String jsonFile) throws URISyntaxException, IOException { - super(null, null, 0, 0, 0, 0, + super(null, 0, 0, 0, 0, null, LogFactory.getLog(TestClient.class)); String json = readFileToString(new File(TestClient.class.getResource(jsonFile).toURI())); OPS.put("pods", json); diff --git a/src/test/java/org/jgroups/protocols/kubernetes/stream/TokenProviderTest.java b/src/test/java/org/jgroups/protocols/kubernetes/stream/TokenProviderTest.java new file mode 100644 index 0000000..5f0f899 --- /dev/null +++ b/src/test/java/org/jgroups/protocols/kubernetes/stream/TokenProviderTest.java @@ -0,0 +1,112 @@ +package org.jgroups.protocols.kubernetes.stream; + +import mjson.Json; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Base64; + +import static org.jgroups.protocols.kubernetes.Utils.readFileToString; +import static org.junit.Assert.*; + +public class TokenProviderTest +{ + private static final String JWT_PREFIX = "eyJhbGciOiJIUzI1NiJ9."; + private String validJwtWithExpiry; + private String validJwtWithoutExpiry; + private static final String INVALID_JWT = "invalid.jwt.token"; + private File tempTokenFile; + private TokenProvider tokenProvider; + + @Before + public void setUp() throws IOException, URISyntaxException { + validJwtWithExpiry = readFileToString(new File(TokenProvider.class.getResource("/tokenWithExpiry.txt").toURI())); + validJwtWithoutExpiry = readFileToString(new File(TokenProvider.class.getResource("/tokenWithoutExpiry.txt").toURI())); + tempTokenFile = File.createTempFile("token", ".txt"); + tempTokenFile.deleteOnExit(); + tokenProvider = new TokenProvider(tempTokenFile.getAbsolutePath()); + } + + @Test + public void returnsTokenWhenFileContainsValidJWT() throws IOException { + Files.writeString(tempTokenFile.toPath(), validJwtWithExpiry); + String token = tokenProvider.getToken(); + assertEquals(validJwtWithExpiry, token); + } + + @Test + public void throwsIOExceptionWhenFileContainsInvalidJWT() throws IOException { + Files.writeString(tempTokenFile.toPath(), INVALID_JWT); + assertThrows(IOException.class, () -> tokenProvider.getToken()); + } + + @Test + public void returnsSameTokenIfNotExpired() throws IOException { + Files.writeString(tempTokenFile.toPath(), validJwtWithExpiry); + String token1 = tokenProvider.getToken(); + String token2 = tokenProvider.getToken(); + assertEquals(token1, token2); + } + + @Test + public void refreshesTokenWhenExpired() throws IOException, InterruptedException { + Files.writeString(tempTokenFile.toPath(), validJwtWithExpiry); + updateTokenExpiryInFile(tempTokenFile, (System.currentTimeMillis() + 1000) / 1000); // Set expiry in the past + String token1 = tokenProvider.getToken(); + Thread.sleep(2000L); + updateTokenExpiryInFile(tempTokenFile, (System.currentTimeMillis() + 1000) / 1000); // Set expiry in the past + String toekn2 = tokenProvider.getToken(); + assertNotEquals(token1, toekn2); + } + + @Test + public void throwsIOExceptionForInvalidJWTExpiry() { + assertThrows(IOException.class, () -> TokenProvider.getExpiry(INVALID_JWT)); + } + + @Test + public void returnsExpiryForValidJWT() throws IOException { + Files.writeString(tempTokenFile.toPath(), validJwtWithExpiry); + long tokenExpiry = (System.currentTimeMillis() + 10000) / 1000; + updateTokenExpiryInFile(tempTokenFile, tokenExpiry); // JWT 'exp' is in seconds + long expiry = TokenProvider.getExpiry(tokenProvider.getToken()); + assertEquals(tokenExpiry, expiry); + } + + @Test + public void returnsMinusOneForJWTWithoutExpiry() throws IOException { + Files.writeString(tempTokenFile.toPath(), validJwtWithoutExpiry); + long expiry = TokenProvider.getExpiry(tokenProvider.getToken()); + assertEquals(-1, expiry); + } + + public static void updateTokenExpiryInFile(File tokenFile, long newExpiry) throws IOException { + String token = Files.readString(tokenFile.toPath()).trim(); + String[] parts = token.split("\\."); + if (parts.length < 3) throw new IOException("Invalid JWT token format"); + + // Decode payload + String payloadJson = new String(Base64.getUrlDecoder().decode(parts[1]), StandardCharsets.UTF_8); + Json payload = Json.read(payloadJson); + + // Update expiry + payload.set("exp", newExpiry); + + // Re-encode payload + String newPayload = Base64.getUrlEncoder().withoutPadding() + .encodeToString(payload.toString().getBytes(StandardCharsets.UTF_8)); + + // Reconstruct token + String newToken = parts[0] + "." + newPayload + "." + parts[2]; + + // Save to file + Files.write(tokenFile.toPath(), newToken.getBytes(StandardCharsets.UTF_8)); + } + +} + diff --git a/src/test/resources/tokenWithExpiry.txt b/src/test/resources/tokenWithExpiry.txt new file mode 100644 index 0000000..b0dd8de --- /dev/null +++ b/src/test/resources/tokenWithExpiry.txt @@ -0,0 +1 @@ +eyJhbGciOiJSUzI1NiIsImtpZCI6ImQyNWM2ZjQyODRhMTZlNzA0N2JkOWEwMWNmOWIxNGFkOWFmNWY3NzUifQ.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjIl0sImV4cCI6MTc5MjY1ODc5MywiaWF0IjoxNzYxMTIyNzkzLCJpc3MiOiJodHRwczovL29pZGMuZWtzLnVzLXdlc3QtMi5hbWF6b25hd3MuY29tL2lkLzFDMEU0M0U4QUZBOTI2RDg4REUyMUJCMUI5QkU3QzlCIiwianRpIjoiMjkwNjViNWItYTg4ZC00ZGE2LWJhNjktZGRjYzA2OTdiZGQ1Iiwia3ViZXJuZXRlcy5pbyI6eyJuYW1lc3BhY2UiOiJjYWktcWEtbWwiLCJub2RlIjp7Im5hbWUiOiJpcC0xMC0yMC0yMDQtMjkudXMtd2VzdC0yLmNvbXB1dGUuaW50ZXJuYWwiLCJ1aWQiOiJkMzE5ZmYwYi03YjAxLTQxYWMtOWMxYS1hODc2NmNmMzFjYWMifSwicG9kIjp7Im5hbWUiOiJhcHBsaWNhdGlvbi1pbnRlZ3JhdGlvbi1vYm0tMCIsInVpZCI6IjNiNjM3MjBhLTc1YTctNDllNS1iOTFhLTZmNTZhNjJhZDJjMiJ9LCJzZXJ2aWNlYWNjb3VudCI6eyJuYW1lIjoiY2Fpc2VydmljZS1zZXJ2aWNlLWFjY291bnQiLCJ1aWQiOiJkMzE1MWE1My1mODk4LTQ5MTMtYjczNC00MTU1ZmY0MWY0NjAifSwid2FybmFmdGVyIjoxNzYxMTI2NDAwfSwibmJmIjoxNzYxMTIyNzkzLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6Y2FpLXFhLW1sOmNhaXNlcnZpY2Utc2VydmljZS1hY2NvdW50In0.dW2Ekvy9ltpmLGHrE7JrS7F4VE2lqdY2PtKiQqdbca9ED7s7njq9QnP8STolxT7z3MNlee7ayZC2RGIoz_Hs84dKncQZDvxmVgHTgWI9ohaA2tFLUWBfEwx1WTGrlN6pL_DN_7kJlxZkAQeTAmmCPTYI6-mFyqZ9sjCuavzVvQBJ5cH7sqRg7R8FQDMaq45GF-G8I2QfWg7Pqz298F8AU8891AtxTh5K2jawFoWBqcp5BieozNgmdR76amktcL45AxvKj1nW5zW53zSEPRVxUgYoEq8r6rnRGULRW2mqohpFLS-ZbPfRJZM9FPpRA4LYNX_84tlaKdUqB8t4BjQfTQa \ No newline at end of file diff --git a/src/test/resources/tokenWithoutExpiry.txt b/src/test/resources/tokenWithoutExpiry.txt new file mode 100644 index 0000000..fc66dc8 --- /dev/null +++ b/src/test/resources/tokenWithoutExpiry.txt @@ -0,0 +1 @@ +eyJhbGciOiJSUzI1NiIsImtpZCI6IlJIeHZ6Z1d5NWFiekhLakFGczBwYjlxemNIdjdhWVN5S0ZfdTF3czFENVkifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJjYWktcHJvZC1hemNhYzIiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlY3JldC5uYW1lIjoiY2FpLXNlcnZpY2UtYWNjb3VudC1rdWJlcGluZyIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50Lm5hbWUiOiJjYWktc2VydmljZS1hY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQudWlkIjoiOTEwYzk5OTMtOTZmMS00OTQ4LTk2OWMtM2ZiNjZmNDk4NDRmIiwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50OmNhaS1wcm9kLWF6Y2FjMjpjYWktc2VydmljZS1hY2NvdW50In0.Ko3nOwk-aumtdCMomNcunV_7GD1_gK2pbR1L9IKRsfs_0srgzc3UyqNL-XrbdFhuy-VHF708299ArjSpUzhkdaD1zx1PvoHzMLS5ivgTPfnhSYrAmBOSFTFQjgUZzndDkxHcxwDhq3x7yRZsizKJ5V000cgHT5JGPWEpGmwPQkY4GsaT5ZLv324ZxtggUHoqXWeYYWciZKERenVtz1V5TJofEpmmau7fitb2iM5DnpJH_W6KYaEzQozDBqJ-EhaWjQn8bFJgi08N_ZA_qDOsCtNwIphrLPiUnmLS6F2TU_584njNawBE_vrcJL82J7M2f2g6eroE_DeEoDMmd9d77ZCF3Zy94fW_9x_CFF242g2EbD6yLGNZXvsNrf0qA6-DMA2Ihft6QxeNJX9ETDDroJ6IPUc13Zq_SmZdqLeCCwAQpajBbCCyU6ih8xsyxa1Os_bjmTdT2OUGQ8FBIkmJfTFnmMCrqsd60CZ--cPY9OGBmloJbZ3OEDNWPmb75pTulIgAy1b79bzco3y4qSiWeQ1-WI6hNaogf-sIVlkm1Q7Sh0IczlalJGcqIHbkFjYezY3ZR6PnjUKjGXDqqDpFMwjGo-F-tNwt3AgPjEJD0LdhUBFtnsW5T38RQIYFEuHyJT4YHKOCQ3iSDMyb1EY_sm3qnj6vEnjdRh_jthat3e8 \ No newline at end of file