Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
private final Map<String, Map<String, Double>> deploymentCpuUsage = new ConcurrentHashMap<>();
// map(namespace, map(deployment, memory))
private final Map<String, Map<String, Long>> deploymentMemoryUsage = new ConcurrentHashMap<>();
// map(namespace, map(deployment, stateSize))
private final Map<String, Map<String, Long>> deploymentStateSize = new ConcurrentHashMap<>();
public static final String FLINK_VERSION_GROUP_NAME = "FlinkVersion";
public static final String FLINK_MINOR_VERSION_GROUP_NAME = "FlinkMinorVersion";
public static final String UNKNOWN_VERSION = "UNKNOWN";
Expand All @@ -58,6 +60,7 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
public static final String COUNTER_NAME = "Count";
public static final String CPU_NAME = "Cpu";
public static final String MEMORY_NAME = "Memory";
public static final String STATE_SIZE = "StateSize";

public FlinkDeploymentMetrics(
KubernetesOperatorMetricGroup parentMetricGroup, Configuration configuration) {
Expand Down Expand Up @@ -144,6 +147,18 @@ public void onUpdate(FlinkDeployment flinkApp) {
NumberUtils.toLong(
clusterInfo.getOrDefault(
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "0")));

long stateSize =
NumberUtils.toLong(
clusterInfo.getOrDefault(AbstractFlinkService.FIELD_NAME_STATE_SIZE, "0"));
deploymentStateSize
.computeIfAbsent(
namespace,
ns -> {
initNamespaceStateSize(ns);
return new ConcurrentHashMap<>();
})
.put(deploymentName, stateSize > 0 ? stateSize : 0);
}

public void onRemove(FlinkDeployment flinkApp) {
Expand All @@ -168,6 +183,9 @@ public void onRemove(FlinkDeployment flinkApp) {
if (deploymentMemoryUsage.containsKey(namespace)) {
deploymentMemoryUsage.get(namespace).remove(name);
}
if (deploymentStateSize.containsKey(namespace)) {
deploymentStateSize.get(namespace).remove(name);
}
}

private void initNamespaceDeploymentCounts(String ns) {
Expand Down Expand Up @@ -231,6 +249,15 @@ private void initNamespaceMemoryUsage(String ns) {
.reduce(0L, Long::sum));
}

private void initNamespaceStateSize(String ns) {
parentMetricGroup
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
.addGroup(RESOURCE_USAGE_GROUP_NAME)
.gauge(
STATE_SIZE,
() -> deploymentStateSize.get(ns).values().stream().reduce(0L, Long::sum));
}

private Map<JobManagerDeploymentStatus, Set<String>> createDeploymentStatusMap() {
Map<JobManagerDeploymentStatus, Set<String>> statuses = new ConcurrentHashMap<>();
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ private void observeClusterInfo(FlinkResourceContext<FlinkDeployment> ctx) {
var flinkApp = ctx.getResource();
try {
Map<String, String> clusterInfo =
ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig());
ctx.getFlinkService()
.getClusterInfo(
ctx.getObserveConfig(),
flinkApp.getStatus().getJobStatus().getJobId());
flinkApp.getStatus().getClusterInfo().putAll(clusterInfo);
logger.debug("ClusterInfo: {}", flinkApp.getStatus().getClusterInfo());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void observeFlinkCluster(FlinkResourceContext<FlinkDeployment> ctx) {
// Check if session cluster can serve rest calls following our practice in JobObserver
try {
logger.debug("Observing session cluster");
ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig());
ctx.getFlinkService().getClusterInfo(ctx.getObserveConfig(), null);
var rs = ctx.getResource().getStatus().getReconciliationStatus();
if (rs.getState() == ReconciliationState.DEPLOYED) {
rs.markReconciledSpecAsStable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
Expand Down Expand Up @@ -161,6 +163,7 @@ public abstract class AbstractFlinkService implements FlinkService {
private static final String EMPTY_JAR_FILENAME = "empty.jar";
public static final String FIELD_NAME_TOTAL_CPU = "total-cpu";
public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory";
public static final String FIELD_NAME_STATE_SIZE = "state-size";

protected final KubernetesClient kubernetesClient;
protected final ExecutorService executorService;
Expand Down Expand Up @@ -735,9 +738,27 @@ public CheckpointStatsResult fetchCheckpointStats(
}

@Override
public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
public Map<String, String> getClusterInfo(Configuration conf, @Nullable String jobId)
throws Exception {
Map<String, String> clusterInfo = new HashMap<>();

populateFlinkVersion(conf, clusterInfo);

var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size();
clusterInfo.put(
FIELD_NAME_TOTAL_CPU,
String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas)));
clusterInfo.put(
FIELD_NAME_TOTAL_MEMORY,
String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas)));

populateStateSize(conf, jobId, clusterInfo);

return clusterInfo;
}

private void populateFlinkVersion(Configuration conf, Map<String, String> clusterInfo)
throws Exception {
try (var clusterClient = getClusterClient(conf)) {

CustomDashboardConfiguration dashboardConfiguration =
Expand All @@ -757,16 +778,35 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
dashboardConfiguration.getFlinkRevision());
}
}

var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size();
clusterInfo.put(
FIELD_NAME_TOTAL_CPU,
String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas)));
clusterInfo.put(
FIELD_NAME_TOTAL_MEMORY,
String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas)));
private void populateStateSize(
Configuration conf, @Nullable String jobId, Map<String, String> clusterInfo)
throws Exception {
if (jobId != null) {
try (RestClusterClient<String> clusterClient = getClusterClient(conf)) {
var checkpointingStatisticsHeaders = CheckpointingStatisticsHeaders.getInstance();
var parameters = checkpointingStatisticsHeaders.getUnresolvedMessageParameters();
parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));

return clusterInfo;
CheckpointingStatistics checkpointingStatistics =
clusterClient
.sendRequest(
checkpointingStatisticsHeaders,
parameters,
EmptyRequestBody.getInstance())
.get();
CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics =
checkpointingStatistics
.getLatestCheckpoints()
.getCompletedCheckpointStatistics();
if (completedCheckpointStatistics != null) {
clusterInfo.put(
FIELD_NAME_STATE_SIZE,
String.valueOf(completedCheckpointStatistics.getStateSize()));
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ String triggerCheckpoint(

void disposeSavepoint(String savepointPath, Configuration conf) throws Exception;

Map<String, String> getClusterInfo(Configuration conf) throws Exception;
Map<String, String> getClusterInfo(Configuration conf, @Nullable String jobId) throws Exception;

PodList getJmPodList(FlinkDeployment deployment, Configuration conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,8 @@ public void markApplicationJobFailedWithError(JobID jobID, String error) throws
}

@Override
public Map<String, String> getClusterInfo(Configuration conf) throws TimeoutException {
public Map<String, String> getClusterInfo(Configuration conf, String jobId)
throws TimeoutException {
if (!isPortReady) {
throw new TimeoutException("JM port is unavailable");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.FLINK_VERSION_GROUP_NAME;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.MEMORY_NAME;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.RESOURCE_USAGE_GROUP_NAME;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATE_SIZE;
import static org.apache.flink.kubernetes.operator.metrics.FlinkDeploymentMetrics.STATUS_GROUP_NAME;
import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.OPERATOR_RESOURCE_METRICS_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -339,50 +340,63 @@ public void testResourceMetrics() {
.putAll(
Map.of(
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5",
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "1024"));
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "1024",
AbstractFlinkService.FIELD_NAME_STATE_SIZE, "4096"));

deployment2
.getStatus()
.getClusterInfo()
.putAll(
Map.of(
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "10",
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "2048"));
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "2048",
AbstractFlinkService.FIELD_NAME_STATE_SIZE, "8192"));

deployment3
.getStatus()
.getClusterInfo()
.putAll(
Map.of(
AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "13",
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "4096"));
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "4096",
AbstractFlinkService.FIELD_NAME_STATE_SIZE, "16384"));

var cpuGroupId1 =
listener.getNamespaceMetricId(
FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, CPU_NAME);
var memoryGroupId1 =
listener.getNamespaceMetricId(
FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME);
var stateSizeGroupId1 =
listener.getNamespaceMetricId(
FlinkDeployment.class, namespace1, RESOURCE_USAGE_GROUP_NAME, STATE_SIZE);
var cpuGroupId2 =
listener.getNamespaceMetricId(
FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, CPU_NAME);
var memoryGroupId2 =
listener.getNamespaceMetricId(
FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, MEMORY_NAME);
var stateSizeGroupId2 =
listener.getNamespaceMetricId(
FlinkDeployment.class, namespace2, RESOURCE_USAGE_GROUP_NAME, STATE_SIZE);

assertTrue(listener.getGauge(cpuGroupId1).isEmpty());
assertTrue(listener.getGauge(memoryGroupId1).isEmpty());
assertTrue(listener.getGauge(stateSizeGroupId1).isEmpty());
assertTrue(listener.getGauge(cpuGroupId2).isEmpty());
assertTrue(listener.getGauge(memoryGroupId2).isEmpty());
assertTrue(listener.getGauge(stateSizeGroupId2).isEmpty());

metricManager.onUpdate(deployment1);
metricManager.onUpdate(deployment2);
metricManager.onUpdate(deployment3);

assertEquals(15D, listener.getGauge(cpuGroupId1).get().getValue());
assertEquals(3072L, listener.getGauge(memoryGroupId1).get().getValue());
assertEquals(12288L, listener.getGauge(stateSizeGroupId1).get().getValue());
assertEquals(13D, listener.getGauge(cpuGroupId2).get().getValue());
assertEquals(4096L, listener.getGauge(memoryGroupId2).get().getValue());
assertEquals(16384L, listener.getGauge(stateSizeGroupId2).get().getValue());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.HardwareDescription;
Expand All @@ -64,8 +65,11 @@
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
Expand Down Expand Up @@ -114,6 +118,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -988,13 +993,44 @@ public void getClusterInfoTest() throws Exception {
null);
var tmsInfo = new TaskManagersInfo(List.of(tmInfo));

var checkpointingStatistics =
new CheckpointingStatistics(
new CheckpointingStatistics.Counts(-1, -1, -1, -1, -1),
new CheckpointingStatistics.Summary(null, null, null, null, null, null),
new CheckpointingStatistics.LatestCheckpoints(
new CheckpointStatistics.CompletedCheckpointStatistics(
42,
CheckpointStatsStatus.COMPLETED,
false,
null,
123,
1234,
-1,
42424242,
-1,
-1,
-1,
-1,
0,
0,
CheckpointStatistics.RestAPICheckpointType.CHECKPOINT,
Map.of(),
"path",
false),
null,
null,
null),
List.of());

var flinkService =
getTestingService(
(h, p, r) -> {
if (h instanceof CustomDashboardConfigurationHeaders) {
return CompletableFuture.completedFuture(config);
} else if (h instanceof TaskManagersHeaders) {
return CompletableFuture.completedFuture(tmsInfo);
} else if (h instanceof CheckpointingStatisticsHeaders) {
return CompletableFuture.completedFuture(checkpointingStatistics);
}
fail("unknown request");
return null;
Expand All @@ -1004,7 +1040,7 @@ public void getClusterInfoTest() throws Exception {
conf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000));
conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(1000));

assertEquals(
Map<String, String> expectedEntries =
Map.of(
DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
testVersion,
Expand All @@ -1013,8 +1049,16 @@ public void getClusterInfoTest() throws Exception {
AbstractFlinkService.FIELD_NAME_TOTAL_CPU,
"2.0",
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY,
"" + MemorySize.ofMebiBytes(1000).getBytes() * 2),
flinkService.getClusterInfo(conf));
"" + MemorySize.ofMebiBytes(1000).getBytes() * 2);

assertEquals(expectedEntries, flinkService.getClusterInfo(conf, null));

assertEquals(
ImmutableMap.<String, String>builder()
.putAll(expectedEntries)
.put(AbstractFlinkService.FIELD_NAME_STATE_SIZE, "42424242")
.build(),
flinkService.getClusterInfo(conf, JobID.generate().toHexString()));
}

@Test
Expand Down
Loading