Skip to content

Commit d10918c

Browse files
mateczaganygyfora
authored andcommitted
[FLINK-35265][snapshot] Create FlinkStateSnapshot in the namespace of job
1 parent 8479f22 commit d10918c

File tree

3 files changed

+42
-3
lines changed

3 files changed

+42
-3
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtils.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,12 @@ public static String getValidatedFlinkStateSnapshotPath(
122122

123123
protected static FlinkStateSnapshot createFlinkStateSnapshot(
124124
KubernetesClient kubernetesClient,
125+
String namespace,
125126
String name,
126127
FlinkStateSnapshotSpec spec,
127128
SnapshotTriggerType triggerType) {
128129
var metadata = new ObjectMeta();
130+
metadata.setNamespace(namespace);
129131
metadata.setName(name);
130132
metadata.getLabels().put(CrdConstants.LABEL_SNAPSHOT_TYPE, triggerType.name());
131133

@@ -169,7 +171,12 @@ public static FlinkStateSnapshot createSavepointResource(
169171
.build();
170172

171173
var resourceName = getFlinkStateSnapshotName(SAVEPOINT, triggerType, resource);
172-
return createFlinkStateSnapshot(kubernetesClient, resourceName, snapshotSpec, triggerType);
174+
return createFlinkStateSnapshot(
175+
kubernetesClient,
176+
resource.getMetadata().getNamespace(),
177+
resourceName,
178+
snapshotSpec,
179+
triggerType);
173180
}
174181

175182
/**
@@ -191,7 +198,12 @@ public static FlinkStateSnapshot createCheckpointResource(
191198
.build();
192199

193200
var resourceName = getFlinkStateSnapshotName(CHECKPOINT, triggerType, resource);
194-
return createFlinkStateSnapshot(kubernetesClient, resourceName, snapshotSpec, triggerType);
201+
return createFlinkStateSnapshot(
202+
kubernetesClient,
203+
resource.getMetadata().getNamespace(),
204+
resourceName,
205+
snapshotSpec,
206+
triggerType);
195207
}
196208

197209
/**

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,12 @@ public KubernetesClient getClient() {
286286
public static <CR extends AbstractFlinkResource<?, ?>>
287287
List<FlinkStateSnapshot> getFlinkStateSnapshotsForResource(
288288
KubernetesClient kubernetesClient, CR resource) {
289-
return kubernetesClient.resources(FlinkStateSnapshot.class).list().getItems().stream()
289+
return kubernetesClient
290+
.resources(FlinkStateSnapshot.class)
291+
.inAnyNamespace()
292+
.list()
293+
.getItems()
294+
.stream()
290295
.filter(
291296
s ->
292297
s.getSpec()

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,28 @@ public void testCreatePeriodicSavepointResource() {
229229
false);
230230
}
231231

232+
@Test
233+
public void testCreateSnapshotInSameNamespace() {
234+
var namespace = "different-namespace";
235+
var deployment = initDeployment();
236+
deployment.getMetadata().setNamespace(namespace);
237+
238+
var savepoint =
239+
FlinkStateSnapshotUtils.createSavepointResource(
240+
client,
241+
deployment,
242+
SAVEPOINT_PATH,
243+
SnapshotTriggerType.PERIODIC,
244+
SavepointFormatType.CANONICAL,
245+
true);
246+
assertThat(savepoint.getMetadata().getNamespace()).isEqualTo(namespace);
247+
248+
var checkpoint =
249+
FlinkStateSnapshotUtils.createCheckpointResource(
250+
client, deployment, SnapshotTriggerType.MANUAL);
251+
assertThat(checkpoint.getMetadata().getNamespace()).isEqualTo(namespace);
252+
}
253+
232254
@Test
233255
public void testCreateCheckpointResource() {
234256
var deployment = initDeployment();

0 commit comments

Comments
 (0)