|
| 1 | +package com.clickhouse.flink; |
| 2 | + |
| 3 | +import com.google.gson.Gson; |
| 4 | +import com.google.gson.JsonArray; |
| 5 | +import com.google.gson.JsonElement; |
| 6 | +import com.google.gson.JsonObject; |
| 7 | +import okhttp3.*; |
| 8 | +import org.testcontainers.containers.GenericContainer; |
| 9 | +import org.testcontainers.containers.Network; |
| 10 | +import org.testcontainers.images.builder.Transferable; |
| 11 | +import org.testcontainers.utility.DockerImageName; |
| 12 | +import org.testcontainers.utility.MountableFile; |
| 13 | + |
| 14 | +import java.io.File; |
| 15 | +import java.io.IOException; |
| 16 | +import java.util.ArrayList; |
| 17 | +import java.util.List; |
| 18 | + |
| 19 | +public class Cluster { |
| 20 | + |
| 21 | + private static final int INTERNAL_REST_PORT = 8081; |
| 22 | + private static final int INTERNAL_JOB_MANAGER_RCP_PORT = 6123; |
| 23 | + |
| 24 | + private GenericContainer<?> containerJobManager; |
| 25 | + private List<GenericContainer<?>> containerTaskManagerList = new ArrayList<>(); |
| 26 | + |
| 27 | + public static class Builder { |
| 28 | + private String flinkVersion; |
| 29 | + private int taskManagers; |
| 30 | + private String sourcePath; |
| 31 | + private String dataFilename; |
| 32 | + private String targetPath; |
| 33 | + private Network network; |
| 34 | + |
| 35 | + public Builder() { |
| 36 | + taskManagers = 1; |
| 37 | + flinkVersion = "latest"; |
| 38 | + sourcePath = null; |
| 39 | + dataFilename = null; |
| 40 | + targetPath = null; |
| 41 | + network = null; |
| 42 | + } |
| 43 | + public Builder withTaskManagers(int taskManagers) { |
| 44 | + this.taskManagers = taskManagers; |
| 45 | + return this; |
| 46 | + } |
| 47 | + |
| 48 | + public Builder withFlinkVersion(String flinkVersion) { |
| 49 | + this.flinkVersion = flinkVersion; |
| 50 | + return this; |
| 51 | + } |
| 52 | + |
| 53 | + public Builder withDataFile(String sourcePath, String dataFilename, String targetPath) { |
| 54 | + this.sourcePath = sourcePath; |
| 55 | + this.dataFilename = dataFilename; |
| 56 | + this.targetPath = targetPath; |
| 57 | + return this; |
| 58 | + } |
| 59 | + |
| 60 | + public Builder withNetwork(Network network) { |
| 61 | + this.network = network; |
| 62 | + return this; |
| 63 | + } |
| 64 | + |
| 65 | + public Cluster build() { |
| 66 | + // when we are not specifying a network we should create one |
| 67 | + if (network == null) { |
| 68 | + network = Network.newNetwork(); |
| 69 | + } |
| 70 | + Cluster cluster = new Cluster(flinkVersion, taskManagers, sourcePath, dataFilename, targetPath, network); |
| 71 | + return cluster; |
| 72 | + } |
| 73 | + |
| 74 | + } |
| 75 | + |
| 76 | + public Cluster(String flinkVersion, int taskManagers, String sourcePath, String dataFilename, String targetPath, Network network) { |
| 77 | + MountableFile mountableFile = MountableFile.forHostPath(sourcePath + dataFilename); |
| 78 | + String dataFileInContainer = String.format("%s/%s", targetPath, dataFilename); |
| 79 | + String flinkImageTag = String.format("flink:%s", flinkVersion); |
| 80 | + DockerImageName FLINK_IMAGE = DockerImageName.parse(flinkImageTag); |
| 81 | + containerJobManager = new GenericContainer<>(FLINK_IMAGE) |
| 82 | + .withCommand("jobmanager") |
| 83 | + .withNetwork(network) |
| 84 | + .withExposedPorts(INTERNAL_REST_PORT, INTERNAL_JOB_MANAGER_RCP_PORT) |
| 85 | + .withNetworkAliases("jobmanager") |
| 86 | + .withEnv("FLINK_PROPERTIES","jobmanager.rpc.address: jobmanager"); |
| 87 | + |
| 88 | + if (sourcePath != null) { |
| 89 | + containerJobManager.withCopyFileToContainer(mountableFile, dataFileInContainer); |
| 90 | + } |
| 91 | + for (int i = 0; i < taskManagers; i++) { |
| 92 | + GenericContainer<?> containerTaskManager = new GenericContainer<>(FLINK_IMAGE) |
| 93 | + .withCommand("taskmanager") |
| 94 | + .withNetwork(network) |
| 95 | + .dependsOn(containerJobManager) |
| 96 | + .withEnv("FLINK_PROPERTIES","jobmanager.rpc.address: jobmanager"); |
| 97 | + if (sourcePath != null) { |
| 98 | + containerTaskManager.withCopyFileToContainer(mountableFile, dataFileInContainer); |
| 99 | + } |
| 100 | + containerTaskManagerList.add(containerTaskManager); |
| 101 | + } |
| 102 | + |
| 103 | + containerJobManager.start(); |
| 104 | + for (int i = 0; i < taskManagers; i++) { |
| 105 | + containerTaskManagerList.get(i).start(); |
| 106 | + } |
| 107 | + // TODO: add strategy for wait |
| 108 | + } |
| 109 | + |
| 110 | + public int getDashboardPort() { |
| 111 | + return containerJobManager.getMappedPort(INTERNAL_REST_PORT); |
| 112 | + } |
| 113 | + |
| 114 | + public String getDashboardUrl() { |
| 115 | + return String.format("%s:%s",containerJobManager.getContainerIpAddress(), getDashboardPort()); |
| 116 | + } |
| 117 | + |
| 118 | + public GenericContainer<?> getContainerJobManager() { |
| 119 | + return containerJobManager; |
| 120 | + } |
| 121 | + |
| 122 | + public List<GenericContainer<?>> getContainerTaskManagerList() { |
| 123 | + return containerTaskManagerList; |
| 124 | + } |
| 125 | + |
| 126 | + public String uploadJar(String jarFilePath) throws IOException { |
| 127 | + File jarFile = new File(jarFilePath); |
| 128 | + String clusterURLJarUploadAPI = String.format("http://%s/jars/upload", getDashboardUrl()); |
| 129 | + OkHttpClient client = new OkHttpClient(); |
| 130 | + RequestBody fileBody = RequestBody.create(jarFile, MediaType.parse("application/java-archive")); |
| 131 | + MultipartBody requestBody = new MultipartBody.Builder() |
| 132 | + .setType(MultipartBody.FORM) |
| 133 | + .addFormDataPart("jarfile", jarFile.getName(), fileBody) |
| 134 | + .build(); |
| 135 | + Request request = new Request.Builder() |
| 136 | + .url(clusterURLJarUploadAPI) |
| 137 | + .post(requestBody).build(); |
| 138 | + Response response = client.newCall(request).execute(); |
| 139 | + if (!response.isSuccessful()) { |
| 140 | + System.err.println("Unexpected code " + response); |
| 141 | + return null; |
| 142 | + } else { |
| 143 | + Gson gson = new Gson(); |
| 144 | + System.out.println("Upload successful!"); |
| 145 | + String responseBody = response.body().string(); |
| 146 | + JsonObject jsonObject = gson.fromJson(responseBody, JsonObject.class); |
| 147 | + if (jsonObject.has("status") & |
| 148 | + jsonObject.get("status").getAsString().equalsIgnoreCase("success") & |
| 149 | + jsonObject.has("filename") ) { |
| 150 | + String filename = jsonObject.get("filename").getAsString(); |
| 151 | + System.out.println("filename: " + filename); |
| 152 | + return filename; |
| 153 | + } |
| 154 | + return null; |
| 155 | + } |
| 156 | + |
| 157 | + |
| 158 | + } |
| 159 | + public List<String> listAllJars() throws IOException { |
| 160 | + String clusterURLListJars = String.format("http://%s/jars", getDashboardUrl()); |
| 161 | + |
| 162 | + OkHttpClient client = new OkHttpClient(); |
| 163 | + Request request = new Request.Builder() |
| 164 | + .url(clusterURLListJars) |
| 165 | + .build(); |
| 166 | + Response response = client.newCall(request).execute(); |
| 167 | + if (!response.isSuccessful()) { |
| 168 | + System.err.println("Unexpected code " + response); |
| 169 | + return null; |
| 170 | + } else { |
| 171 | + Gson gson = new Gson(); |
| 172 | + String responseBody = response.body().string(); |
| 173 | + System.out.println(responseBody); |
| 174 | + JsonObject jsonObject = gson.fromJson(responseBody, JsonObject.class); |
| 175 | + List<String> jars = new ArrayList<>(); |
| 176 | + if (jsonObject.has("files")) { |
| 177 | + JsonArray jsonArray = jsonObject.getAsJsonArray("files"); |
| 178 | + for (JsonElement element : jsonArray) { |
| 179 | + if (element.getAsJsonObject().has("id")) { |
| 180 | + jars.add(element.getAsJsonObject().get("id").getAsString()); |
| 181 | + System.out.println("id: " + element.getAsJsonObject().get("id").getAsString()); |
| 182 | + } |
| 183 | + } |
| 184 | + |
| 185 | + } |
| 186 | + return jars; |
| 187 | + } |
| 188 | + |
| 189 | + |
| 190 | + } |
| 191 | + |
| 192 | + public String runJob(String jarId, String entryClass, int parallelism, String... args) throws IOException { |
| 193 | + String programArg = String.join(",", args); |
| 194 | + String clusterURLJarRunAPI = String.format("http://%s/jars/%s/run?programArg=%s", getDashboardUrl(), jarId, programArg); |
| 195 | + RequestBody body = RequestBody.create( |
| 196 | + "", |
| 197 | + MediaType.get("application/json; charset=utf-8") |
| 198 | + ); |
| 199 | + |
| 200 | + OkHttpClient client = new OkHttpClient(); |
| 201 | + Request request = new Request.Builder() |
| 202 | + .url(clusterURLJarRunAPI) |
| 203 | + .post(body) |
| 204 | + .build(); |
| 205 | + |
| 206 | + Response response = client.newCall(request).execute(); |
| 207 | + if (!response.isSuccessful()) { |
| 208 | + System.err.println("Unexpected code " + response); |
| 209 | + return null; |
| 210 | + } else { |
| 211 | + Gson gson = new Gson(); |
| 212 | + System.out.println("Upload successful!"); |
| 213 | + String responseBody = response.body().string(); |
| 214 | + System.out.println(responseBody); |
| 215 | + JsonObject jsonObject = gson.fromJson(responseBody, JsonObject.class); |
| 216 | + if (jsonObject.has("jobid") ) { |
| 217 | + String jobid = jsonObject.get("jobid").getAsString(); |
| 218 | + System.out.println("jobid: " + jobid); |
| 219 | + return jobid; |
| 220 | + } |
| 221 | + return null; |
| 222 | + } |
| 223 | + } |
| 224 | + |
| 225 | + public String jobStatus(String jobId) throws IOException { |
| 226 | + String clusterURLJobStatusAPI = String.format("http://%s/jobs/%s", getDashboardUrl(), jobId); |
| 227 | + OkHttpClient client = new OkHttpClient(); |
| 228 | + Request request = new Request.Builder() |
| 229 | + .url(clusterURLJobStatusAPI) |
| 230 | + .build(); |
| 231 | + Response response = client.newCall(request).execute(); |
| 232 | + if (!response.isSuccessful()) { |
| 233 | + System.err.println("Unexpected code " + response); |
| 234 | + return null; |
| 235 | + } else { |
| 236 | + Gson gson = new Gson(); |
| 237 | + System.out.println("Upload successful!"); |
| 238 | + String responseBody = response.body().string(); |
| 239 | + System.out.println(responseBody); |
| 240 | + JsonObject jsonObject = gson.fromJson(responseBody, JsonObject.class); |
| 241 | + if (jsonObject.has("state")) { |
| 242 | + String state = jsonObject.get("state").getAsString(); |
| 243 | + System.out.println("state: " + state); |
| 244 | + return state; |
| 245 | + } |
| 246 | + return null; |
| 247 | + } |
| 248 | + } |
| 249 | + |
| 250 | + public void tearDown() { |
| 251 | + |
| 252 | + } |
| 253 | + |
| 254 | +} |
0 commit comments