Skip to content

Commit 927057e

Browse files
authored
Gather attributes of downstream resources (#61)
1 parent 1755ea7 commit 927057e

18 files changed

+223
-21
lines changed

deploy/subscriptions.crd.yaml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,23 @@ spec:
6868
type: object
6969
additionalProperties:
7070
type: string
71+
attributes:
72+
description: Physical attributes of the job and sink/output table.
73+
type: object
74+
additionalProperties:
75+
type: string
7176
resources:
72-
description: The YAML generated to implement this pipeline.
77+
description: The yaml generated to implement this pipeline.
78+
type: array
79+
items:
80+
type: string
81+
jobResources:
82+
description: The yaml generated to implement the job.
83+
type: array
84+
items:
85+
type: string
86+
downstreamResources:
87+
description: The yaml generated to implement the sink/output table.
7388
type: array
7489
items:
7590
type: string

hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/operator/kafka/KafkaTopicReconciler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.linkedin.hoptimator.operator.Operator;
44
import com.linkedin.hoptimator.operator.ConfigAssembler;
55
import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
6+
import com.linkedin.hoptimator.models.V1alpha1KafkaTopicStatus;
67

78
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
89
import io.kubernetes.client.extended.controller.reconciler.Request;
@@ -52,6 +53,10 @@ public Result reconcile(Request request) {
5253
return new Result(false);
5354
}
5455

56+
if (object.getStatus() == null) {
57+
object.setStatus(new V1alpha1KafkaTopicStatus());
58+
}
59+
5560
String topicName = object.getSpec().getTopicName();
5661
Integer desiredPartitions = object.getSpec().getNumPartitions();
5762
Integer desiredReplicationFactor = object.getSpec().getReplicationFactor();
@@ -72,22 +77,29 @@ public Result reconcile(Request request) {
7277

7378
log.info("Found existing topic {}", topicName);
7479
int actualPartitions = topicDescription.partitions().size();
80+
object.getStatus().setNumPartitions(actualPartitions);
7581
if (desiredPartitions != null && desiredPartitions > actualPartitions) {
7682
log.info("Desired partitions {} > actual partitions {}. Creating additional partitions.",
7783
desiredPartitions, actualPartitions);
7884
admin.createPartitions(Collections.singletonMap(topicName, NewPartitions.increaseTo(desiredPartitions))).all().get();
85+
object.getStatus().setNumPartitions(desiredPartitions);
7986
}
8087
} catch(ExecutionException e) {
8188
if (e.getCause() instanceof UnknownTopicOrPartitionException ) {
8289
log.info("No existing topic {}. Will create it.", topicName);
8390
admin.createTopics(Collections.singleton(new NewTopic(topicName, Optional.ofNullable(desiredPartitions),
8491
Optional.ofNullable(desiredReplicationFactor).map(x -> x.shortValue())))).all().get();
92+
object.getStatus().setNumPartitions(desiredPartitions);
8593
} else {
8694
throw e;
8795
}
8896
} finally {
8997
admin.close();
9098
}
99+
100+
operator.apiFor(KAFKATOPIC).updateStatus(object, x -> object.getStatus())
101+
.onFailure((x, y) -> log.error("Failed to update status of KafkaTopic {}/{}: {}.", namespace, name,
102+
y.getMessage()));
91103
} catch (Exception e) {
92104
log.error("Encountered exception while reconciling KafkaTopic {}/{}", namespace, name, e);
93105
return new Result(true, operator.failureRetryDuration());

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Access control rule (colloquially, an Acl)
3232
*/
3333
@ApiModel(description = "Access control rule (colloquially, an Acl)")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
3535
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
3636
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3737
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* AclList is a list of Acl
3333
*/
3434
@ApiModel(description = "AclList is a list of Acl")
35-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
35+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
3636
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
3737
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3838
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* A set of related ACL rules.
3030
*/
3131
@ApiModel(description = "A set of related ACL rules.")
32-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
32+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
3333
public class V1alpha1AclSpec {
3434
/**
3535
* The resource access method.

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* The resource being controlled.
2929
*/
3030
@ApiModel(description = "The resource being controlled.")
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
3232
public class V1alpha1AclSpecResource {
3333
public static final String SERIALIZED_NAME_KIND = "kind";
3434
@SerializedName(SERIALIZED_NAME_KIND)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Status, as set by the operator.
2929
*/
3030
@ApiModel(description = "Status, as set by the operator.")
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
3232
public class V1alpha1AclStatus {
3333
public static final String SERIALIZED_NAME_MESSAGE = "message";
3434
@SerializedName(SERIALIZED_NAME_MESSAGE)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Kafka Topic
3232
*/
3333
@ApiModel(description = "Kafka Topic")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
3535
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
3636
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3737
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* KafkaTopicList is a list of KafkaTopic
3333
*/
3434
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
35-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
35+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
3636
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
3737
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3838
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* Desired Kafka topic configuration.
3434
*/
3535
@ApiModel(description = "Desired Kafka topic configuration.")
36-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
36+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
3737
public class V1alpha1KafkaTopicSpec {
3838
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
3939
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)

0 commit comments

Comments
 (0)