Skip to content

Commit 661976a

Browse files
authored
Introduce a ENABLE_VIRTUAL_THREADS env variable to enable virtual threads, default to real threads for now (#3882)
* Fix profiler job Signed-off-by: Pierangelo Di Pilato <[email protected]> * Use async logger Signed-off-by: Pierangelo Di Pilato <[email protected]> * Use real threads for profiling job Signed-off-by: Pierangelo Di Pilato <[email protected]> * Info level logging for profiler job Signed-off-by: Pierangelo Di Pilato <[email protected]> * Format java Signed-off-by: Pierangelo Di Pilato <[email protected]> * Artifact log per event Signed-off-by: Pierangelo Di Pilato <[email protected]> * Enable virtual threads flag Signed-off-by: Pierangelo Di Pilato <[email protected]> * Use and handle consumer wakeup Signed-off-by: Pierangelo Di Pilato <[email protected]> * Allow closing consumer only once Signed-off-by: Pierangelo Di Pilato <[email protected]> --------- Signed-off-by: Pierangelo Di Pilato <[email protected]>
1 parent 4109c94 commit 661976a

File tree

11 files changed

+185
-16
lines changed

11 files changed

+185
-16
lines changed

.github/workflows/knative-profile-data-plane.yaml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,18 @@ jobs:
7373
./data-plane/profiler/run.sh || exit 1
7474
ls -al
7575
76-
- uses: actions/upload-artifact@v2
76+
- uses: actions/upload-artifact@v4
7777
with:
78+
name: profile-${{ matrix.event }}-receiver.html
7879
path: profile-${{ matrix.event }}-receiver.html
7980

80-
- uses: actions/upload-artifact@v2
81+
- uses: actions/upload-artifact@v4
8182
with:
83+
name: profile-${{ matrix.event }}-dispatcher.html
8284
path: profile-${{ matrix.event }}-dispatcher.html
85+
86+
- uses: actions/upload-artifact@v4
87+
if: always()
88+
with:
89+
name: logs-${{ matrix.event }}
90+
path: /tmp/eventing-kafka-broker-logs/profiler/

data-plane/dispatcher-loom/pom.xml

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,68 @@
109109
</container>
110110
</configuration>
111111
</plugin>
112+
<plugin>
113+
<groupId>org.apache.maven.plugins</groupId>
114+
<artifactId>maven-shade-plugin</artifactId>
115+
<version>${maven.shade.plugin.version}</version>
116+
<configuration>
117+
<minimizeJar>true</minimizeJar>
118+
</configuration>
119+
<executions>
120+
<execution>
121+
<phase>package</phase>
122+
<goals>
123+
<goal>shade</goal>
124+
</goals>
125+
<configuration>
126+
<transformers>
127+
<!-- This merges all the META-INF/services -->
128+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
129+
<transformer
130+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
131+
<mainClass>dev.knative.eventing.kafka.broker.dispatcherloom.Main</mainClass>
132+
</transformer>
133+
</transformers>
134+
<filters>
135+
<filter>
136+
<artifact>*:*</artifact>
137+
<excludeDefaults>false</excludeDefaults>
138+
</filter>
139+
<filter>
140+
<artifact>net.logstash.logback:logstash-logback-encoder</artifact>
141+
<includes>
142+
<include>**</include>
143+
</includes>
144+
</filter>
145+
<filter>
146+
<artifact>ch.qos.logback:logback-core</artifact>
147+
<includes>
148+
<include>**</include>
149+
</includes>
150+
</filter>
151+
<filter>
152+
<artifact>ch.qos.logback:logback-classic</artifact>
153+
<includes>
154+
<include>**</include>
155+
</includes>
156+
</filter>
157+
<filter>
158+
<artifact>org.apache.kafka:kafka-clients</artifact>
159+
<includes>
160+
<include>**</include>
161+
</includes>
162+
</filter>
163+
<filter>
164+
<artifact>io.fabric8:kubernetes-client</artifact>
165+
<includes>
166+
<include>**</include>
167+
</includes>
168+
</filter>
169+
</filters>
170+
</configuration>
171+
</execution>
172+
</executions>
173+
</plugin>
112174
</plugins>
113175
</build>
114176
</project>

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,19 @@ public class LoomKafkaConsumer<K, V> implements ReactiveKafkaConsumer<K, V> {
4545
private final BlockingQueue<Runnable> taskQueue;
4646
private final AtomicBoolean isClosed;
4747
private final Thread taskRunnerThread;
48+
private final Promise<Void> closePromise = Promise.promise();
4849

4950
public LoomKafkaConsumer(Vertx vertx, Consumer<K, V> consumer) {
5051
this.consumer = consumer;
5152
this.taskQueue = new LinkedBlockingQueue<>();
5253
this.isClosed = new AtomicBoolean(false);
5354

54-
this.taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue);
55+
if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) {
56+
this.taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue);
57+
} else {
58+
this.taskRunnerThread = new Thread(this::processTaskQueue);
59+
this.taskRunnerThread.start();
60+
}
5561
}
5662

5763
private void addTask(Runnable task, Promise<?> promise) {
@@ -92,19 +98,21 @@ public Future<Map<TopicPartition, OffsetAndMetadata>> commit(Map<TopicPartition,
9298

9399
@Override
94100
public Future<Void> close() {
101+
if (!this.isClosed.compareAndSet(false, true)) {
102+
return closePromise.future();
103+
}
95104

96-
final Promise<Void> promise = Promise.promise();
97105
taskQueue.add(() -> {
98106
try {
99107
logger.debug("Closing underlying Kafka consumer client");
108+
consumer.wakeup();
100109
consumer.close();
101110
} catch (Exception e) {
102-
promise.tryFail(e);
111+
closePromise.tryFail(e);
103112
}
104113
});
105114

106115
logger.debug("Closing consumer {}", keyValue("size", taskQueue.size()));
107-
isClosed.set(true);
108116

109117
Thread.ofVirtual().start(() -> {
110118
try {
@@ -116,7 +124,7 @@ public Future<Void> close() {
116124

117125
taskRunnerThread.interrupt();
118126
taskRunnerThread.join();
119-
promise.tryComplete();
127+
closePromise.tryComplete();
120128

121129
logger.debug("Background thread completed");
122130

@@ -126,11 +134,11 @@ public Future<Void> close() {
126134
"Interrupted while waiting for taskRunnerThread to finish {}",
127135
keyValue("taskQueueSize", size),
128136
e);
129-
promise.tryFail(new InterruptedException("taskQueue.size = " + size + ". " + e.getMessage()));
137+
closePromise.tryFail(new InterruptedException("taskQueue.size = " + size + ". " + e.getMessage()));
130138
}
131139
});
132140

133-
return promise.future();
141+
return closePromise.future();
134142
}
135143

136144
@Override

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.kafka.clients.consumer.ConsumerRecord;
3939
import org.apache.kafka.clients.consumer.ConsumerRecords;
4040
import org.apache.kafka.common.TopicPartition;
41+
import org.apache.kafka.common.errors.WakeupException;
4142
import org.slf4j.Logger;
4243
import org.slf4j.LoggerFactory;
4344

@@ -143,7 +144,7 @@ private void poll() {
143144
.poll(POLLING_TIMEOUT)
144145
.onSuccess(records -> vertx.runOnContext(v -> this.recordsHandler(records)))
145146
.onFailure(t -> {
146-
if (this.closed.get()) {
147+
if (this.closed.get() || t instanceof WakeupException) {
147148
// The failure might have been caused by stopping the consumer, so we just ignore it
148149
return;
149150
}

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedConsumerVerticle.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.atomic.AtomicInteger;
3030
import org.apache.kafka.clients.consumer.ConsumerConfig;
3131
import org.apache.kafka.clients.consumer.ConsumerRecords;
32+
import org.apache.kafka.common.errors.WakeupException;
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
3435

@@ -86,7 +87,7 @@ private synchronized void poll() {
8687
return;
8788
}
8889
if (inFlightRecords.get() >= getConsumerVerticleContext().getMaxPollRecords()) {
89-
logger.info(
90+
logger.debug(
9091
"In flight records exceeds " + ConsumerConfig.MAX_POLL_RECORDS_CONFIG
9192
+ " waiting for response from subscriber before polling for new records {} {} {}",
9293
keyValue(
@@ -101,6 +102,10 @@ private synchronized void poll() {
101102
.poll(POLL_TIMEOUT)
102103
.onSuccess(records -> vertx.runOnContext(v -> this.handleRecords(records)))
103104
.onFailure(cause -> {
105+
if (cause instanceof WakeupException) {
106+
return; // Do nothing we're shutting down
107+
}
108+
104109
isPollInFlight.set(false);
105110
logger.error(
106111
"Failed to poll messages {}",

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@
5454
public class Main {
5555

5656
static {
57-
System.setProperty("logback.configurationFile", "/etc/logging/config.xml");
57+
if (System.getProperty("logback.configurationFile") == null
58+
|| System.getProperty("logback.configurationFile").isEmpty()) {
59+
System.setProperty("logback.configurationFile", "/etc/logging/config.xml");
60+
}
5861
}
5962

6063
private static final Logger logger = LoggerFactory.getLogger(Main.class);

data-plane/profiler/resources/config-logging.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
2020
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
2121
</appender>
22+
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
23+
<appender-ref ref="jsonConsoleAppender" />
24+
<neverBlock>true</neverBlock>
25+
<maxFlushTime>1000</maxFlushTime>
26+
</appender>
2227
<root level="INFO">
23-
<appender-ref ref="jsonConsoleAppender"/>
28+
<appender-ref ref="async"/>
2429
</root>
2530
</configuration>

data-plane/profiler/run.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ export METRICS_PUBLISH_QUANTILES="false"
9696
export EGRESSES_INITIAL_CAPACITY="1"
9797
export HTTP2_DISABLE="true"
9898
export WAIT_STARTUP_SECONDS="8"
99+
export CONFIG_FEATURES_PATH=""
100+
export ENABLE_VIRTUAL_THREADS="true"
99101

100102
# Define receiver specific env variables.
101103
export SERVICE_NAME="kafka-broker-receiver"
@@ -109,6 +111,8 @@ export INSTANCE_ID="receiver"
109111
java \
110112
-XX:+UnlockDiagnosticVMOptions \
111113
-XX:+DebugNonSafepoints \
114+
-XX:+EnableDynamicAgentLoading \
115+
-Djdk.tracePinnedThreads=full \
112116
-Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \
113117
-jar "${PROJECT_ROOT_DIR}"/receiver-loom/target/receiver-loom-1.0-SNAPSHOT.jar >"${LOG_DIR}/receiver.log" &
114118
receiver_pid=$!
@@ -125,6 +129,8 @@ export INSTANCE_ID="dispatcher"
125129
java \
126130
-XX:+UnlockDiagnosticVMOptions \
127131
-XX:+DebugNonSafepoints \
132+
-XX:+EnableDynamicAgentLoading \
133+
-Djdk.tracePinnedThreads=full \
128134
-Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \
129135
-jar "${PROJECT_ROOT_DIR}"/dispatcher-loom/target/dispatcher-loom-1.0-SNAPSHOT.jar >"${LOG_DIR}/dispatcher.log" &
130136
dispatcher_pid=$!

data-plane/receiver-loom/pom.xml

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,69 @@
126126
</container>
127127
</configuration>
128128
</plugin>
129+
<plugin>
130+
<groupId>org.apache.maven.plugins</groupId>
131+
<artifactId>maven-shade-plugin</artifactId>
132+
<version>${maven.shade.plugin.version}</version>
133+
<configuration>
134+
<minimizeJar>true</minimizeJar>
135+
</configuration>
136+
<executions>
137+
<execution>
138+
<phase>package</phase>
139+
<goals>
140+
<goal>shade</goal>
141+
</goals>
142+
<configuration>
143+
<transformers>
144+
<!-- This merges all the META-INF/services -->
145+
<transformer
146+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
147+
<transformer
148+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
149+
<mainClass>dev.knative.eventing.kafka.broker.receiverloom.Main</mainClass>
150+
</transformer>
151+
</transformers>
152+
<filters>
153+
<filter>
154+
<artifact>*:*</artifact>
155+
<excludeDefaults>false</excludeDefaults>
156+
</filter>
157+
<filter>
158+
<artifact>net.logstash.logback:logstash-logback-encoder</artifact>
159+
<includes>
160+
<include>**</include>
161+
</includes>
162+
</filter>
163+
<filter>
164+
<artifact>ch.qos.logback:logback-core</artifact>
165+
<includes>
166+
<include>**</include>
167+
</includes>
168+
</filter>
169+
<filter>
170+
<artifact>ch.qos.logback:logback-classic</artifact>
171+
<includes>
172+
<include>**</include>
173+
</includes>
174+
</filter>
175+
<filter>
176+
<artifact>org.apache.kafka:kafka-clients</artifact>
177+
<includes>
178+
<include>**</include>
179+
</includes>
180+
</filter>
181+
<filter>
182+
<artifact>io.fabric8:kubernetes-client</artifact>
183+
<includes>
184+
<include>**</include>
185+
</includes>
186+
</filter>
187+
</filters>
188+
</configuration>
189+
</execution>
190+
</executions>
191+
</plugin>
129192
</plugins>
130193
</build>
131194
</project>

data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,12 @@ public LoomKafkaProducer(Vertx v, Producer<K, V> producer) {
6161
this.tracer = null;
6262
}
6363

64-
sendFromQueueThread = Thread.ofVirtual().start(this::sendFromQueue);
64+
if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) {
65+
this.sendFromQueueThread = Thread.ofVirtual().start(this::sendFromQueue);
66+
} else {
67+
this.sendFromQueueThread = new Thread(this::sendFromQueue);
68+
this.sendFromQueueThread.start();
69+
}
6570
}
6671

6772
@Override

0 commit comments

Comments
 (0)