diff --git a/ci.py b/ci.py index 8349b8cafd19f..1dd550310dcc5 100644 --- a/ci.py +++ b/ci.py @@ -23,7 +23,7 @@ import subprocess import sys -from confluent.ci.scripts.ci_utils import run_cmd, regex_replace, replace +from ci_tools.utils import run_cmd, regex_replace, replace logging.basicConfig(level=logging.INFO, format='%(message)s') log = logging.getLogger(__name__) diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 08fad4535685e..77c476b49462d 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -181,7 +181,8 @@ public class TopicConfig { "When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. " + "A typical scenario would be to create a topic with a replication factor of 3, " + "set min.insync.replicas to 2, and produce with acks of \"all\". " + - "This will ensure that a majority of replicas must persist a write before it's considered successful by the producer and it's visible to consumers."; + "This ensures that a majority of replicas must persist a write before it's considered successful by the producer and it's visible to consumers." + + "

Note that when the Eligible Leader Replicas feature is enabled, the semantics of this config changes. Please refer to the ELR section for more info.

"; public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " + diff --git a/config/log4j2.yaml b/config/log4j2.yaml index 49bcf78d136c6..de263c57c928e 100644 --- a/config/log4j2.yaml +++ b/config/log4j2.yaml @@ -44,7 +44,7 @@ Configuration: # State Change appender - name: StateChangeAppender fileName: "${sys:kafka.logs.dir}/state-change.log" - filePattern: "${sys:kafka.logs.dir}/stage-change.log.%d{yyyy-MM-dd-HH}" + filePattern: "${sys:kafka.logs.dir}/state-change.log.%d{yyyy-MM-dd-HH}" PatternLayout: pattern: "${logPattern}" TimeBasedTriggeringPolicy: diff --git a/docs/ops.html b/docs/ops.html index 529bce159bb1e..be0bfe89e8bf9 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -4499,9 +4499,16 @@

Tool

-

The ELR fields can be checked through the API DescribeTopicPartitions. The admin client can fetch the ELR info by describing the topics. - Also note that, if min.insync.replicas is updated for a topic, the ELR field will be cleaned. If cluster default min ISR is updated, - all the ELR fields will be cleaned.

+

The ELR fields can be checked through the API DescribeTopicPartitions. The admin client can fetch the ELR info by describing the topics.

+

Note that when the ELR feature is enabled:

+ diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html index c400ca08453c6..a2d1b7209b551 100644 --- a/docs/streams/core-concepts.html +++ b/docs/streams/core-concepts.html @@ -279,7 +279,7 @@

Lambda Architecture. + to the stream processing pipeline, known as the Lambda Architecture. Prior to 0.11.0.0, Kafka only provides at-least-once delivery guarantees and hence any stream processing systems that leverage it as the backend storage could not guarantee end-to-end exactly-once semantics. In fact, even for those stream processing systems that claim to support exactly-once processing, as long as they are reading from / writing to Kafka as the source / sink, their applications cannot actually guarantee that no duplicates will be generated throughout the pipeline.
diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 6a555ae3ccf6f..8289423d3d798 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -298,12 +298,12 @@

num.standby.replicascommit.interval.ms Low The frequency in milliseconds with which to save the position (offsets in source topics) of tasks. - 30000 (30 seconds) + 30000 (30 seconds) (at-least-once) / 100 (exactly-once) default.deserialization.exception.handler (Deprecated. Use deserialization.exception.handler instead.) Medium Exception handling class that implements the DeserializationExceptionHandler interface. - LogAndContinueExceptionHandler + LogAndFailExceptionHandler default.key.serde Medium @@ -328,11 +328,10 @@

num.standby.replicasnull - default.dsl.store + default.dsl.store (Deprecated. Use dsl.store.suppliers.class instead.) Low - [DEPRECATED] The default state store type used by DSL operators. Deprecated in - favor of dsl.store.suppliers.class + The default state store type used by DSL operators. "ROCKS_DB" @@ -492,54 +491,59 @@

num.standby.replicas-1 - retry.backoff.ms + repartition.purge.interval.ms + Low + The frequency in milliseconds with which to delete fully consumed records from repartition topics. Purging will occur after at least this value since the last purge, but may be delayed until later. + 30000 (30 seconds) + + retry.backoff.ms Low The amount of time in milliseconds, before a request is retried. 100 - rocksdb.config.setter + rocksdb.config.setter Medium The RocksDB configuration. null - state.cleanup.delay.ms + state.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. 600000 (10 minutes) - state.dir + state.dir High Directory location for state stores. /${java.io.tmpdir}/kafka-streams - task.assignor.class + task.assignor.class Medium A task assignor class or class name implementing the TaskAssignor interface. The high-availability task assignor. - task.timeout.ms + task.timeout.ms Medium The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0 ms, a task would raise an error for the first internal error. For any timeout larger than 0 ms, a task will retry at least once before an error is raised. 300000 (5 minutes) - topology.optimization + topology.optimization Medium A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: StreamsConfig.NO_OPTIMIZATION (none), StreamsConfig.OPTIMIZE (all) or a comma separated list of specific optimizations: StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS (reuse.ktable.source.topics), StreamsConfig.MERGE_REPARTITION_TOPICS (merge.repartition.topics), StreamsConfig.SINGLE_STORE_SELF_JOIN (single.store.self.join). "NO_OPTIMIZATION" - upgrade.from + upgrade.from Medium The version you are upgrading from during a rolling upgrade. See Upgrade From null - windowstore.changelog.additional.retention.ms + windowstore.changelog.additional.retention.ms Low Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. 86400000 (1 day) - window.size.ms + window.size.ms Low Sets window size for the deserializer in order to calculate window end times. null diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 8816f3b933a35..2230ffb8aaae9 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -213,6 +213,9 @@

Early Access of the Streams Rebalance Protocol

Set unstable.feature.versions.enable=true for controllers and brokers, and set unstable.api.versions.enable=true on the brokers as well. In your Kafka Streams application configuration, set group.protocol=streams. + After the new feature is configured, check + kafka-features.sh --bootstrap-server localhost:9092 describe + and `streams.version` should now have FinalizedVersionLevel 1.

diff --git a/docs/toc.html b/docs/toc.html index 032228de3d219..304bd1c8a53c1 100644 --- a/docs/toc.html +++ b/docs/toc.html @@ -153,6 +153,7 @@

  • 6.7 Monitoring @@ -82,9 +87,9 @@
    Notable changes in 4
  • -

    Upgrading to 4.0.0

    +

    Upgrading to 4.0.1

    -

    Upgrading Clients to 4.0.0
    +
    Upgrading Clients to 4.0.1

    For a rolling upgrade:

    @@ -95,7 +100,7 @@
    Upgrading Client or KIP-1124. -
    Upgrading Servers to 4.0.0 from any version 3.3.x through 3.9.x
    +
    Upgrading Servers to 4.0.1 from any version 3.3.x through 3.9.x

    Note: Apache Kafka 4.0 only supports KRaft mode - ZooKeeper mode has been removed. As such, broker upgrades to 4.0.0 (and higher) require KRaft mode and the software and metadata versions must be at least 3.3.x (the first version when KRaft mode was deemed production ready). For clusters in KRaft mode @@ -119,7 +124,13 @@

    Upgrading Servers to 4.0.0 from has a boolean parameter that indicates if there are metadata changes (i.e. IBP_4_0_IV1(23, "4.0", "IV1", true) means this version has metadata changes). Given your current and target versions, a downgrade is only possible if there are no metadata changes in the versions between. - +
    Notable changes in 4.0.1
    +
    Notable changes in 4.0.0
    +
  • + The filename for rotated state-change.log files incorrectly rotates to stage-change.log.[date] (changing state to stage). This issue is corrected in 4.0.1. + See KAFKA-19576 for details. +
  • diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java index b367fda8114d5..d64695bd39b52 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java @@ -613,7 +613,7 @@ int getStaticallyConfiguredMinInsyncReplicas() { /** * Generate any configuration records that are needed to make it safe to enable ELR. - * Specifically, we need to remove all cluster-level configurations for min.insync.replicas, + * Specifically, we need to remove all broker-level configurations for min.insync.replicas, * and create a cluster-level configuration for min.insync.replicas. It is always safe to call * this function if ELR is already enabled; it will simply do nothing if the necessary * configurations already exist. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 716d8c42ec5ff..a3a44f6f02d31 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task.State; import org.apache.kafka.streams.processor.internals.TaskAndAction.Action; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; @@ -89,7 +90,7 @@ private class StateUpdaterThread extends Thread { private volatile KafkaFutureImpl clientInstanceIdFuture = new KafkaFutureImpl<>(); public StateUpdaterThread(final String name, - final Metrics metrics, + final StreamsMetricsImpl metrics, final ChangelogReader changelogReader) { super(name); this.changelogReader = changelogReader; @@ -745,7 +746,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t private final Time time; private final Logger log; private final String name; - private final Metrics metrics; + private final StreamsMetricsImpl metrics; private final Consumer restoreConsumer; private final ChangelogReader changelogReader; private final TopologyMetadata topologyMetadata; @@ -766,7 +767,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t private StateUpdaterThread stateUpdaterThread = null; public DefaultStateUpdater(final String name, - final Metrics metrics, + final StreamsMetricsImpl metrics, final StreamsConfig config, final Consumer restoreConsumer, final ChangelogReader changelogReader, @@ -1062,70 +1063,71 @@ private class StateUpdaterMetrics { private final Sensor standbyRestoreRatioSensor; private final Sensor checkpointRatioSensor; - private final Deque allSensorNames = new LinkedList<>(); + private final Deque allSensors = new LinkedList<>(); private final Deque allMetricNames = new LinkedList<>(); - private StateUpdaterMetrics(final Metrics metrics, final String threadId) { + private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threadId) { final Map threadLevelTags = new LinkedHashMap<>(); threadLevelTags.put(THREAD_ID_TAG, threadId); + final Metrics metricsRegistry = metrics.metricsRegistry(); - MetricName metricName = metrics.metricName("active-restoring-tasks", + MetricName metricName = metricsRegistry.metricName("active-restoring-tasks", STATE_LEVEL_GROUP, "The number of active tasks currently undergoing restoration", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numRestoringActiveTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("standby-updating-tasks", + metricName = metricsRegistry.metricName("standby-updating-tasks", STATE_LEVEL_GROUP, "The number of standby tasks currently undergoing state update", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numUpdatingStandbyTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("active-paused-tasks", + metricName = metricsRegistry.metricName("active-paused-tasks", STATE_LEVEL_GROUP, "The number of active tasks paused restoring", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numPausedActiveTasks() : 0); allMetricNames.push(metricName); - metricName = metrics.metricName("standby-paused-tasks", + metricName = metricsRegistry.metricName("standby-paused-tasks", STATE_LEVEL_GROUP, "The number of standby tasks paused state update", threadLevelTags); - metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? + metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ? stateUpdaterThread.numPausedStandbyTasks() : 0); allMetricNames.push(metricName); - this.idleRatioSensor = metrics.sensor("idle-ratio", RecordingLevel.INFO); + this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO); this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("idle-ratio"); + allSensors.add(this.idleRatioSensor); - this.activeRestoreRatioSensor = metrics.sensor("active-restore-ratio", RecordingLevel.INFO); + this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO); this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("active-restore-ratio"); + allSensors.add(this.activeRestoreRatioSensor); - this.standbyRestoreRatioSensor = metrics.sensor("standby-update-ratio", RecordingLevel.INFO); + this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO); this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("standby-update-ratio"); + allSensors.add(this.standbyRestoreRatioSensor); - this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio", RecordingLevel.INFO); + this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO); this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg()); - allSensorNames.add("checkpoint-ratio"); + allSensors.add(this.checkpointRatioSensor); - this.restoreSensor = metrics.sensor("restore-records", RecordingLevel.INFO); + this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO); this.restoreSensor.add(new MetricName("restore-records-rate", STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new Rate()); this.restoreSensor.add(new MetricName("restore-call-rate", STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new WindowedCount())); - allSensorNames.add("restore-records"); + allSensors.add(this.restoreSensor); } void clear() { - while (!allSensorNames.isEmpty()) { - metrics.removeSensor(allSensorNames.pop()); + while (!allSensors.isEmpty()) { + metrics.removeSensor(allSensors.pop()); } while (!allMetricNames.isEmpty()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 89cc988313bdf..ad66a822e7269 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -646,7 +646,7 @@ private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateU final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx; final StateUpdater stateUpdater = new DefaultStateUpdater( name, - streamsMetrics.metricsRegistry(), + streamsMetrics, streamsConfig, restoreConsumer, changelogReader, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index abb128698a081..b6d41966257a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask; import org.apache.kafka.streams.processor.internals.Task.State; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.hamcrest.Matcher; import org.junit.jupiter.api.AfterEach; @@ -105,7 +106,7 @@ class DefaultStateUpdaterTest { // need an auto-tick timer to work for draining with timeout private final Time time = new MockTime(1L); - private final Metrics metrics = new Metrics(time); + private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time); private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL)); private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final TopologyMetadata topologyMetadata = unnamedTopology().build(); @@ -1680,8 +1681,59 @@ public void shouldRecordMetrics() throws Exception { assertThat(metrics.metrics().size(), is(1)); } + @Test + public void shouldRemoveMetricsWithoutInterference() { + final DefaultStateUpdater stateUpdater2 = + new DefaultStateUpdater("test-state-updater2", metrics, config, null, changelogReader, topologyMetadata, time); + final List threadMetrics = getMetricNames("test-state-updater"); + final List threadMetrics2 = getMetricNames("test-state-updater2"); + + stateUpdater.start(); + stateUpdater2.start(); + + for (final MetricName metricName : threadMetrics) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + + stateUpdater2.shutdown(Duration.ofMinutes(1)); + + for (final MetricName metricName : threadMetrics) { + assertTrue(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + + stateUpdater.shutdown(Duration.ofMinutes(1)); + + for (final MetricName metricName : threadMetrics) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + for (final MetricName metricName : threadMetrics2) { + assertFalse(metrics.metrics().containsKey(metricName)); + } + } + + private static List getMetricNames(final String threadId) { + final Map tagMap = Map.of("thread-id", threadId); + return List.of( + new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("active-paused-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "", tagMap), + new MetricName("idle-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("standby-update-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "", tagMap), + new MetricName("restore-records-rate", "stream-state-updater-metrics", "", tagMap), + new MetricName("restore-call-rate", "stream-state-updater-metrics", "", tagMap) + ); + } + @SuppressWarnings("unchecked") - private static void verifyMetric(final Metrics metrics, + private static void verifyMetric(final StreamsMetricsImpl metrics, final MetricName metricName, final Matcher matcher) { assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description())); diff --git a/terraform/kafka_runner/run-test.py b/terraform/kafka_runner/run-test.py index 256bb590e3838..203dbdcad387e 100644 --- a/terraform/kafka_runner/run-test.py +++ b/terraform/kafka_runner/run-test.py @@ -264,7 +264,9 @@ def main(): ssh_account=ssh_account, instance_name=args.instance_name, jdk_version=args.jdk_version, - jdk_arch=args.jdk_arch + jdk_arch=args.jdk_arch, + JOB_ID=JOB_ID, + enable_cloudwatch=args.enable_cloudwatch ) else: logging.info(f"using existing ami: {args.existing_ami}") diff --git a/terraform/kafka_runner/util.py b/terraform/kafka_runner/util.py index e9426fbfc7372..0f341b2bf0395 100644 --- a/terraform/kafka_runner/util.py +++ b/terraform/kafka_runner/util.py @@ -170,6 +170,7 @@ def parse_args(): parser.add_argument("--jdk-arch", action="store", type=str, default="x64", help="JDK arch to execute."), parser.add_argument("--nightly", action="store_true", default=False, help="Mark this as a nightly run") + parser.add_argument("--enable-cloudwatch", action="store", type=parse_bool, default=False, help="Enable cloudwatch metrics if enabled") parser.add_argument("--new-globals", action="store", type=str, default=None, help="Additional global params to be passed in ducktape") parser.add_argument("--arm-image", action="store_true", help="load the ARM based image of specified distro") parser.add_argument("--existing-ami", action="store", type=str, default=None, help="AMI ID to use for the instance, skipping ami creation") diff --git a/vagrant/aws-packer.json b/vagrant/aws-packer.json index 7025202447e2e..69a356e9153a2 100644 --- a/vagrant/aws-packer.json +++ b/vagrant/aws-packer.json @@ -27,12 +27,31 @@ "max_attempts": 60 }, "security_group_id": "{{user `security_group_id`}}", + "run_tags": { + "Owner": "ce-kafka", + "Service": "ce-kafka", + "Type": "Base", + "role": "ce-kafka", + "CreatedBy": "kafka-system-test", + "distro": "{{user `linux_distro`}}", + "cflt_managed_by": "iac", + "cflt_managed_id": "kafka", + "cflt_service": "kafka-system-test" + }, "tags": { "Owner": "ce-kafka", "Service": "ce-kafka", "Type": "Base", "role": "ce-kafka", - "CreatedBy": "kafka-system-test" + "CreatedBy": "kafka-system-test", + "cflt_managed_by": "iac", + "cflt_managed_id": "kafka", + "cflt_service": "kafka-system-test" + }, + "run_volume_tags": { + "cflt_managed_by": "iac", + "cflt_managed_id": "kafka", + "cflt_service": "kafka-system-test" }, "launch_block_device_mappings": [{ "device_name": "/dev/sda1", @@ -47,15 +66,23 @@ "type": "shell", "inline": ["while [ ! -f /var/lib/cloud/instance/boot-finished ]; do echo 'Waiting for cloud-init...'; sleep 1; done"] }, + { + "type": "file", + "source": "../vagrant/cloudwatch-agent-configuration.json", + "destination": "/tmp/cloudwatch-agent-configuration.json" + }, { "environment_vars": [ "JDK_MAJOR={{ user `jdk_version` }}", - "JDK_ARCH={{ user `jdk_arch` }}" + "JDK_ARCH={{ user `jdk_arch` }}", + "SEMAPHORE_JOB_ID={{ user `JOB_ID` }}", + "ENABLE_CLOUDWATCH={{ user `enable_cloudwatch` }}" ], "execute_command": "echo 'packer' | {{ .Vars }} sudo -E -S bash '{{ .Path }}'", "type": "shell", "scripts": [ - "../vagrant/base.sh" + "../vagrant/base.sh", + "../vagrant/cloudwatch-agent-setup.sh" ] } ] diff --git a/vagrant/cloudwatch-agent-setup.sh b/vagrant/cloudwatch-agent-setup.sh index a3fde878d69ca..1ae6f3094d675 100644 --- a/vagrant/cloudwatch-agent-setup.sh +++ b/vagrant/cloudwatch-agent-setup.sh @@ -11,6 +11,13 @@ set -ex +if [ "${ENABLE_CLOUDWATCH,,}" = "false" ]; then + echo "Cloudwatch is disabled, skipping setup" + exit 0 +else + echo "CloudWatch setup enabled, installing amazon-cloudwatch-agent..." +fi + architecture=arm64 arch=$(uname -m) diff --git a/vagrant/worker-ami.json b/vagrant/worker-ami.json index 1bd2e549143db..5d7a9fe080a2c 100644 --- a/vagrant/worker-ami.json +++ b/vagrant/worker-ami.json @@ -32,12 +32,25 @@ "max_attempts": 60 }, "security_group_id": "{{user `security_group_id`}}", + "run_tags": { + "Owner": "ce-kafka", + "Service": "ce-kafka", + "CreatedBy": "kafka-system-test", + "distro": "{{user `linux_distro`}}", + "cflt_managed_by": "iac", + "cflt_managed_id": "kafka", + "cflt_service": "kafka-system-test", + "Name": "{{user `instance_name`}}" + }, "tags": { "Owner": "ce-kafka", "Service": "ce-kafka", "Type": "Worker", "role": "kafka-worker", "CreatedBy": "kafka-system-test", + "cflt_managed_by": "iac", + "cflt_managed_id": "kafka", + "cflt_service": "kafka-system-test", "Name": "{{user `instance_name`}}", "ResourceUrl": "{{user `resource_url`}}", "NightlyRun": "{{user `nightly_run`}}", @@ -45,6 +58,11 @@ "JdkVersion": "{{user `jdk_version`}}", "Arch": "{{user `jdk_arch`}}" }, + "run_volume_tags": { + "cflt_managed_by": "iac", + "cflt_managed_id": "kafka", + "cflt_service": "kafka-system-test" + }, "launch_block_device_mappings": [{ "device_name": "/dev/sda1", "volume_size": "{{user `volume_size`}}",