diff --git a/benchmark-runner/README.md b/benchmark-runner/README.md new file mode 100644 index 0000000..149576c --- /dev/null +++ b/benchmark-runner/README.md @@ -0,0 +1,229 @@ +# Benchmark Runner + +A comprehensive benchmarking module for testing handoff strategies between Netty event loops and virtual threads. + +## Overview + +This module provides: + +1. **HandoffHttpServer** - An HTTP server that: + - Receives requests on Netty event loops + - Hands off to virtual threads (with configurable scheduler) + - Makes blocking HTTP calls to a mock backend using JDK HttpClient + - Parses JSON with Jackson + - Returns response via event loop + +2. **run-benchmark.sh** - A complete benchmarking script with: + - Mock server management + - CPU affinity control (taskset) + - Warmup phase (no profiling) + - Load generation via jbang wrk/wrk2 + - Async-profiler integration + - pidstat monitoring + - Duration configuration with validation + +## Quick Start + +```bash +# Set JAVA_HOME to your Java 27 build +export JAVA_HOME=/path/to/jdk + +# Build the module (includes both MockHttpServer and HandoffHttpServer) +mvn package -pl benchmark-runner -am -DskipTests + +# Run a basic benchmark +cd benchmark-runner/scripts +./run-benchmark.sh +``` + +## Build Details + +Everything is packaged in a single JAR: `benchmark-runner/target/benchmark-runner.jar` + +| Class | Description | +|-------|-------------| +| `io.netty.loom.benchmark.runner.MockHttpServer` | Backend mock server (think time + JSON response) | +| `io.netty.loom.benchmark.runner.HandoffHttpServer` | Server under test (handoff logic) | + +The `run-benchmark.sh` script will automatically build the JAR if missing. + +## Configuration + +All configuration is via environment variables: + +### Mock Server +| Variable | Default | Description | +|----------|---------|-------------| +| `MOCK_PORT` | 8080 | Mock server port | +| `MOCK_THINK_TIME_MS` | 1 | Simulated processing delay (ms) | +| `MOCK_THREADS` | 1 | Number of Netty threads | +| `MOCK_TASKSET` | | CPU affinity (e.g., "0-1") | + +### Handoff Server +| Variable | Default | Description | +|----------|---------|-------------| +| `SERVER_PORT` | 8081 | Server port | +| `SERVER_THREADS` | 1 | Number of event loop threads | +| `SERVER_USE_CUSTOM_SCHEDULER` | false | Use custom Netty scheduler | +| `SERVER_IO` | epoll | I/O type: epoll or nio | +| `SERVER_TASKSET` | | CPU affinity (e.g., "2-5") | +| `SERVER_JVM_ARGS` | | Additional JVM arguments | + +### Load Generator +| Variable | Default | Description | +|----------|---------|-------------| +| `LOAD_GEN_CONNECTIONS` | 100 | Number of connections | +| `LOAD_GEN_THREADS` | 2 | Number of threads | +| `LOAD_GEN_RATE` | | Target rate (empty = max throughput with wrk) | +| `LOAD_GEN_TASKSET` | | CPU affinity (e.g., "6-7") | + +### Timing +| Variable | Default | Description | +|----------|---------|-------------| +| `WARMUP_DURATION` | 10s | Warmup duration (no profiling) | +| `TOTAL_DURATION` | 30s | Total test duration | + +### Profiling +| Variable | Default | Description | +|----------|---------|-------------| +| `ENABLE_PROFILER` | false | Enable async-profiler | +| `ASYNC_PROFILER_PATH` | | Path to async-profiler installation | +| `PROFILER_EVENT` | cpu | Profiler event type | +| `PROFILER_OUTPUT` | profile.html | Output filename | + +### pidstat +| Variable | Default | Description | +|----------|---------|-------------| +| `ENABLE_PIDSTAT` | false | Enable pidstat collection | +| `PIDSTAT_INTERVAL` | 1 | Collection interval (seconds) | +| `PIDSTAT_OUTPUT` | pidstat.log | Output filename | + +## Example Runs + +### Basic comparison: custom vs default scheduler + +```bash +# With custom scheduler +JAVA_HOME=/path/to/jdk \ +SERVER_USE_CUSTOM_SCHEDULER=true \ +./run-benchmark.sh + +# With default scheduler +JAVA_HOME=/path/to/jdk \ +SERVER_USE_CUSTOM_SCHEDULER=false \ +./run-benchmark.sh +``` + +### With CPU pinning + +```bash +JAVA_HOME=/path/to/jdk \ +MOCK_TASKSET="0" \ +SERVER_TASKSET="1-4" \ +LOAD_GEN_TASKSET="5-7" \ +SERVER_THREADS=4 \ +SERVER_USE_CUSTOM_SCHEDULER=true \ +./run-benchmark.sh +``` + +### With profiling + +```bash +JAVA_HOME=/path/to/jdk \ +ENABLE_PROFILER=true \ +ASYNC_PROFILER_PATH=/path/to/async-profiler \ +PROFILER_EVENT=cpu \ +SERVER_USE_CUSTOM_SCHEDULER=true \ +WARMUP_DURATION=15s \ +TOTAL_DURATION=45s \ +./run-benchmark.sh +``` + +### Rate-limited test with wrk2 + +```bash +JAVA_HOME=/path/to/jdk \ +LOAD_GEN_RATE=10000 \ +LOAD_GEN_CONNECTIONS=200 \ +TOTAL_DURATION=60s \ +WARMUP_DURATION=15s \ +./run-benchmark.sh +``` + +### With pidstat monitoring + +```bash +JAVA_HOME=/path/to/jdk \ +ENABLE_PIDSTAT=true \ +PIDSTAT_INTERVAL=1 \ +./run-benchmark.sh +``` + +## Output + +Results are saved to `./benchmark-results/` (configurable via `OUTPUT_DIR`): + +- `wrk-results.txt` - Load generator output with throughput/latency +- `profile.html` - Flamegraph (if profiling enabled) +- `pidstat.log` - Thread-level CPU usage (if pidstat enabled) + +## Architecture + +``` +┌─────────────────┐ ┌──────────────────────────┐ ┌─────────────────┐ +│ wrk/wrk2 │────▶│ HandoffHttpServer │────▶│ MockHttpServer │ +│ (load gen) │ │ │ │ │ +└─────────────────┘ │ 1. Receive on EL │ │ Think time + │ + │ 2. Handoff to VThread │ │ JSON response │ + │ 3. Blocking HTTP call │ │ │ + │ 4. Parse JSON (Jackson) │ └─────────────────┘ + │ 5. Write back on EL │ + └──────────────────────────┘ +``` + +## Running Manually + +### Mock Server + +```bash +java -cp benchmark-runner/target/benchmark-runner.jar \ + io.netty.loom.benchmark.runner.MockHttpServer \ + 8080 1 1 # port, thinkTimeMs, threads +``` + +### Handoff Server (with custom scheduler) + +```bash +java \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + -XX:+UnlockExperimentalVMOptions \ + -XX:-DoJVMTIVirtualThreadTransitions \ + -Djdk.trackAllThreads=false \ + -Djdk.virtualThreadScheduler.implClass=io.netty.loom.NettyScheduler \ + -Djdk.pollerMode=3 \ + -cp benchmark-runner/target/benchmark-runner.jar \ + io.netty.loom.benchmark.runner.HandoffHttpServer \ + --port 8081 \ + --mock-url http://localhost:8080/fruits \ + --threads 2 \ + --use-custom-scheduler true \ + --io epoll +``` + +### Handoff Server (with default scheduler) + +```bash +java \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + -XX:+UnlockExperimentalVMOptions \ + -XX:-DoJVMTIVirtualThreadTransitions \ + -Djdk.trackAllThreads=false \ + -cp benchmark-runner/target/benchmark-runner.jar \ + io.netty.loom.benchmark.runner.HandoffHttpServer \ + --port 8081 \ + --mock-url http://localhost:8080/fruits \ + --threads 2 \ + --use-custom-scheduler false \ + --io epoll +``` + diff --git a/benchmark-runner/pom.xml b/benchmark-runner/pom.xml new file mode 100644 index 0000000..36e2984 --- /dev/null +++ b/benchmark-runner/pom.xml @@ -0,0 +1,99 @@ + + + 4.0.0 + + + io.netty.loom + netty-virtualthread-parent + 1.0-SNAPSHOT + + + benchmark-runner + jar + + + + io.netty.loom + netty-virtualthread-core + ${project.version} + + + io.netty + netty-all + + + + com.fasterxml.jackson.core + jackson-databind + 2.17.0 + + + org.apache.httpcomponents.client5 + httpclient5 + 5.6 + compile + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-params + test + + + io.rest-assured + rest-assured + 5.4.0 + test + + + org.awaitility + awaitility + 4.2.0 + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.6.1 + + + package + + shade + + + benchmark-runner + false + + + io.netty.loom.benchmark.runner.HandoffHttpServer + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.14.1 + + ${java.version} + ${java.version} + + + + + + diff --git a/benchmark-runner/scripts/run-benchmark.sh b/benchmark-runner/scripts/run-benchmark.sh new file mode 100755 index 0000000..72596e3 --- /dev/null +++ b/benchmark-runner/scripts/run-benchmark.sh @@ -0,0 +1,647 @@ +#!/usr/bin/env bash +# +# Benchmark Runner Script +# +# This script orchestrates the full benchmark workflow: +# 1. Start mock HTTP server +# 2. Start handoff HTTP server (with optional profiling) +# 3. Run load generator (wrk/wrk2) +# 4. Optionally collect pidstat and async-profiler data +# +# Copyright 2026 The Netty VirtualThread Scheduler Project +# Licensed under Apache License 2.0 + +set -euo pipefail + +# ============================================================================ +# Configuration with defaults +# ============================================================================ + +# Java configuration +JAVA_HOME="${JAVA_HOME:-}" +JAVA_OPTS="${JAVA_OPTS:--Xms1g -Xmx1g}" + +# Mock server configuration +MOCK_PORT="${MOCK_PORT:-8080}" +MOCK_THINK_TIME_MS="${MOCK_THINK_TIME_MS:-1}" +MOCK_THREADS="${MOCK_THREADS:-1}" +MOCK_TASKSET="${MOCK_TASKSET:-4,5}" # CPUs for mock server + +# Handoff server configuration +SERVER_PORT="${SERVER_PORT:-8081}" +SERVER_THREADS="${SERVER_THREADS:-2}" +SERVER_USE_CUSTOM_SCHEDULER="${SERVER_USE_CUSTOM_SCHEDULER:-false}" +SERVER_IO="${SERVER_IO:-epoll}" +SERVER_TASKSET="${SERVER_TASKSET:-2,3}" # CPUs for handoff server +SERVER_JVM_ARGS="${SERVER_JVM_ARGS:-}" +SERVER_POLLER_MODE="${SERVER_POLLER_MODE:-3}" # jdk.pollerMode value (1, 2, or 3) +SERVER_FJ_PARALLELISM="${SERVER_FJ_PARALLELISM:-}" # ForkJoinPool parallelism (empty = JVM default) + +# Load generator configuration +LOAD_GEN_TASKSET="${LOAD_GEN_TASKSET:-0,1}" # CPUs for load generator +LOAD_GEN_CONNECTIONS="${LOAD_GEN_CONNECTIONS:-100}" +LOAD_GEN_THREADS="${LOAD_GEN_THREADS:-2}" +LOAD_GEN_DURATION="${LOAD_GEN_DURATION:-30s}" +LOAD_GEN_RATE="${LOAD_GEN_RATE:-}" # Empty = wrk (max throughput), set value = wrk2 (rate limited) +LOAD_GEN_URL="${LOAD_GEN_URL:-http://localhost:8081/fruits}" + +# Timing configuration +WARMUP_DURATION="${WARMUP_DURATION:-10s}" +TOTAL_DURATION="${TOTAL_DURATION:-30s}" + +# Profiling configuration +ENABLE_PROFILER="${ENABLE_PROFILER:-false}" +PROFILER_EVENT="${PROFILER_EVENT:-cpu}" +PROFILER_OUTPUT="${PROFILER_OUTPUT:-profile.html}" +ASYNC_PROFILER_PATH="${ASYNC_PROFILER_PATH:-}" # Path to async-profiler + +# pidstat configuration +ENABLE_PIDSTAT="${ENABLE_PIDSTAT:-false}" +PIDSTAT_INTERVAL="${PIDSTAT_INTERVAL:-1}" +PIDSTAT_OUTPUT="${PIDSTAT_OUTPUT:-pidstat.log}" + +# perf stat configuration +ENABLE_PERF_STAT="${ENABLE_PERF_STAT:-false}" +PERF_STAT_OUTPUT="${PERF_STAT_OUTPUT:-perf-stat.txt}" + +# Output directory +OUTPUT_DIR="${OUTPUT_DIR:-./benchmark-results}" + +# ============================================================================ +# Computed paths +# ============================================================================ + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +RUNNER_JAR="${PROJECT_ROOT}/benchmark-runner/target/benchmark-runner.jar" + +# ============================================================================ +# Helper functions +# ============================================================================ + +log() { + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" +} + +error() { + log "ERROR: $*" >&2 + exit 1 +} + +parse_duration_to_seconds() { + local duration="$1" + local value="${duration%[smhd]}" + local unit="${duration: -1}" + + case "$unit" in + s) echo "$value" ;; + m) echo $((value * 60)) ;; + h) echo $((value * 3600)) ;; + d) echo $((value * 86400)) ;; + *) echo "$duration" ;; # Assume seconds if no unit + esac +} + +validate_config() { + local warmup_secs=$(parse_duration_to_seconds "$WARMUP_DURATION") + local total_secs=$(parse_duration_to_seconds "$TOTAL_DURATION") + + if [[ $warmup_secs -ge $total_secs ]]; then + error "Warmup duration ($WARMUP_DURATION = ${warmup_secs}s) must be less than total duration ($TOTAL_DURATION = ${total_secs}s)" + fi + + if [[ -z "$JAVA_HOME" ]]; then + error "JAVA_HOME must be set" + fi + + if [[ ! -x "$JAVA_HOME/bin/java" ]]; then + error "Java executable not found at $JAVA_HOME/bin/java" + fi + + if [[ "$ENABLE_PROFILER" == "true" && -z "$ASYNC_PROFILER_PATH" ]]; then + error "ASYNC_PROFILER_PATH must be set when ENABLE_PROFILER=true" + fi + + log "Configuration validated" + log " Warmup: $WARMUP_DURATION (${warmup_secs}s)" + log " Total: $TOTAL_DURATION (${total_secs}s)" + log " Measurement window: $((total_secs - warmup_secs))s" +} + +check_jbang() { + if ! command -v jbang &> /dev/null; then + log "jbang not found, installing..." + curl -Ls https://sh.jbang.dev | bash -s - app setup + export PATH="$HOME/.jbang/bin:$PATH" + fi + log "jbang version: $(jbang --version)" +} + +wait_for_server() { + local url="$1" + local name="$2" + local max_attempts="${3:-30}" + local attempt=0 + + log "Waiting for $name at $url..." + while [[ $attempt -lt $max_attempts ]]; do + if curl -s -o /dev/null -w "%{http_code}" "$url" | grep -q "200"; then + log "$name is ready" + return 0 + fi + attempt=$((attempt + 1)) + sleep 1 + done + + error "$name failed to start within ${max_attempts}s" +} + +build_taskset_cmd() { + local cpus="$1" + if [[ -n "$cpus" ]]; then + echo "taskset -c $cpus" + else + echo "" + fi +} + +cleanup() { + log "Cleaning up..." + + # Kill mock server + if [[ -n "${MOCK_PID:-}" ]]; then + log "Stopping mock server (PID: $MOCK_PID)" + kill "$MOCK_PID" 2>/dev/null || true + wait "$MOCK_PID" 2>/dev/null || true + fi + + # Kill handoff server + if [[ -n "${SERVER_PID:-}" ]]; then + log "Stopping handoff server (PID: $SERVER_PID)" + kill "$SERVER_PID" 2>/dev/null || true + wait "$SERVER_PID" 2>/dev/null || true + fi + + # Kill pidstat + if [[ -n "${PIDSTAT_PID:-}" ]]; then + log "Stopping pidstat (PID: $PIDSTAT_PID)" + kill "$PIDSTAT_PID" 2>/dev/null || true + fi + + # Kill perf stat (should already be done, but clean up just in case) + if [[ -n "${PERF_STAT_PID:-}" ]]; then + log "Stopping perf stat (PID: $PERF_STAT_PID)" + kill "$PERF_STAT_PID" 2>/dev/null || true + fi + + log "Cleanup complete" +} + +trap cleanup EXIT + +# ============================================================================ +# Build JARs if needed +# ============================================================================ + +build_jars() { + log "Building project JARs..." + + cd "$PROJECT_ROOT" + + + if [[ ! -f "$RUNNER_JAR" ]]; then + log "Building benchmark-runner module..." + JAVA_HOME="$JAVA_HOME" mvn package -pl benchmark-runner -am -DskipTests -q + fi + + log "JARs ready" +} + +# ============================================================================ +# Start Mock Server +# ============================================================================ + +start_mock_server() { + log "Starting mock HTTP server..." + + local taskset_cmd=$(build_taskset_cmd "$MOCK_TASKSET") + local java_cmd="$JAVA_HOME/bin/java" + + local cmd="$taskset_cmd $java_cmd $JAVA_OPTS -cp $RUNNER_JAR \ + io.netty.loom.benchmark.runner.MockHttpServer \ + --port $MOCK_PORT --think-time $MOCK_THINK_TIME_MS --threads $MOCK_THREADS --silent" + + log "Mock server command: $cmd" + + $cmd & + MOCK_PID=$! + + wait_for_server "http://localhost:$MOCK_PORT/health" "Mock server" +} + +# ============================================================================ +# Start Handoff Server +# ============================================================================ + +start_handoff_server() { + log "Starting handoff HTTP server..." + + local taskset_cmd=$(build_taskset_cmd "$SERVER_TASKSET") + local java_cmd="$JAVA_HOME/bin/java" + + # Build JVM args + local jvm_args="--add-opens=java.base/java.lang=ALL-UNNAMED" + jvm_args="$jvm_args -XX:+UnlockExperimentalVMOptions" + jvm_args="$jvm_args -XX:-DoJVMTIVirtualThreadTransitions" + jvm_args="$jvm_args -Djdk.trackAllThreads=false" + + if [[ "$SERVER_USE_CUSTOM_SCHEDULER" == "true" ]]; then + jvm_args="$jvm_args -Djdk.virtualThreadScheduler.implClass=io.netty.loom.NettyScheduler" + jvm_args="$jvm_args -Djdk.pollerMode=$SERVER_POLLER_MODE" + fi + + if [[ -n "$SERVER_FJ_PARALLELISM" ]]; then + jvm_args="$jvm_args -Djdk.virtualThreadScheduler.parallelism=1" + fi + + # Add debug non-safepoints if profiling is enabled + if [[ "$ENABLE_PROFILER" == "true" ]]; then + jvm_args="$jvm_args -XX:+UnlockDiagnosticVMOptions" + jvm_args="$jvm_args -XX:+DebugNonSafepoints" + fi + + # Add custom JVM args + if [[ -n "$SERVER_JVM_ARGS" ]]; then + jvm_args="$jvm_args $SERVER_JVM_ARGS" + fi + + local cmd="$taskset_cmd $java_cmd $JAVA_OPTS $jvm_args -cp $RUNNER_JAR \ + io.netty.loom.benchmark.runner.HandoffHttpServer \ + --port $SERVER_PORT \ + --mock-url http://localhost:$MOCK_PORT/fruits \ + --threads $SERVER_THREADS \ + --use-custom-scheduler $SERVER_USE_CUSTOM_SCHEDULER \ + --io $SERVER_IO \ + --silent" + + log "Handoff server command: $cmd" + + $cmd & + SERVER_PID=$! + + wait_for_server "http://localhost:$SERVER_PORT/health" "Handoff server" +} + +# ============================================================================ +# Warmup Phase +# ============================================================================ + +run_warmup() { + local warmup_secs=$(parse_duration_to_seconds "$WARMUP_DURATION") + + if [[ $warmup_secs -eq 0 ]]; then + log "Skipping warmup (duration is 0)" + return + fi + + log "Running warmup for $WARMUP_DURATION..." + + local taskset_cmd=$(build_taskset_cmd "$LOAD_GEN_TASKSET") + + # Use wrk for warmup (no rate limiting) + $taskset_cmd jbang wrk@hyperfoil \ + -t "$LOAD_GEN_THREADS" \ + -c "$LOAD_GEN_CONNECTIONS" \ + -d "$WARMUP_DURATION" \ + "$LOAD_GEN_URL" > /dev/null 2>&1 || true + + log "Warmup complete" +} + +# ============================================================================ +# Start Profiler +# ============================================================================ + +start_profiler() { + if [[ "$ENABLE_PROFILER" != "true" ]]; then + return + fi + + log "Attaching async-profiler to handoff server (PID: $SERVER_PID)..." + + local asprof="$ASYNC_PROFILER_PATH/bin/asprof" + + if [[ ! -x "$asprof" ]]; then + error "async-profiler not found at $asprof" + fi + + # Start profiler + "$asprof" start --threads -e "$PROFILER_EVENT" -o flamegraph "$SERVER_PID" + + log "Profiler attached" +} + +stop_profiler() { + if [[ "$ENABLE_PROFILER" != "true" ]]; then + return + fi + + log "Stopping async-profiler..." + + local asprof="$ASYNC_PROFILER_PATH/bin/asprof" + local output_file="$OUTPUT_DIR/$PROFILER_OUTPUT" + + "$asprof" stop --threads -o flamegraph -f "$output_file" "$SERVER_PID" + + log "Profiler output: $output_file" +} + +# ============================================================================ +# Start pidstat +# ============================================================================ + +start_pidstat() { + if [[ "$ENABLE_PIDSTAT" != "true" ]]; then + return + fi + + log "Starting pidstat for handoff server (PID: $SERVER_PID)..." + + local output_file="$OUTPUT_DIR/$PIDSTAT_OUTPUT" + + # add -t to see all threads + pidstat -p "$SERVER_PID" "$PIDSTAT_INTERVAL" > "$output_file" 2>&1 & + PIDSTAT_PID=$! + + log "pidstat running (PID: $PIDSTAT_PID)" +} + +stop_pidstat() { + if [[ "$ENABLE_PIDSTAT" != "true" ]]; then + return + fi + + if [[ -n "${PIDSTAT_PID:-}" ]]; then + log "Stopping pidstat..." + kill "$PIDSTAT_PID" 2>/dev/null || true + wait "$PIDSTAT_PID" 2>/dev/null || true + log "pidstat output: $OUTPUT_DIR/$PIDSTAT_OUTPUT" + fi +} + +# ============================================================================ +# Start perf stat +# ============================================================================ + +start_perf_stat() { + if [[ "$ENABLE_PERF_STAT" != "true" ]]; then + return + fi + + log "Starting perf stat for handoff server (PID: $SERVER_PID)..." + + local warmup_secs=$(parse_duration_to_seconds "$WARMUP_DURATION") + local total_secs=$(parse_duration_to_seconds "$TOTAL_DURATION") + local profiling_secs=$((total_secs - warmup_secs)) + local output_file="$OUTPUT_DIR/$PERF_STAT_OUTPUT" + + perf stat -p "$SERVER_PID" -o "$output_file" sleep "$profiling_secs" & + PERF_STAT_PID=$! + + log "perf stat running (PID: $PERF_STAT_PID) for ${profiling_secs}s" +} + +# Note: perf stat stops automatically after the sleep duration, no explicit stop needed + +# ============================================================================ +# Run Load Test +# ============================================================================ + +run_load_test() { + local warmup_secs=$(parse_duration_to_seconds "$WARMUP_DURATION") + local total_secs=$(parse_duration_to_seconds "$TOTAL_DURATION") + local test_secs=$((total_secs - warmup_secs)) + + log "Running load test for ${test_secs}s..." + + local taskset_cmd=$(build_taskset_cmd "$LOAD_GEN_TASKSET") + local output_file="$OUTPUT_DIR/wrk-results.txt" + + if [[ -n "$LOAD_GEN_RATE" ]]; then + # Use wrk2 with rate limiting + log "Using wrk2 with rate: $LOAD_GEN_RATE req/s" + + $taskset_cmd jbang wrk2@hyperfoil \ + -t "$LOAD_GEN_THREADS" \ + -c "$LOAD_GEN_CONNECTIONS" \ + -d "${test_secs}s" \ + -R "$LOAD_GEN_RATE" \ + --latency \ + "$LOAD_GEN_URL" 2>&1 | tee "$output_file" + else + # Use wrk for max throughput + log "Using wrk for max throughput" + + $taskset_cmd jbang wrk@hyperfoil \ + -t "$LOAD_GEN_THREADS" \ + -c "$LOAD_GEN_CONNECTIONS" \ + -d "${test_secs}s" \ + "$LOAD_GEN_URL" 2>&1 | tee "$output_file" + fi + + log "Load test complete" + log "Results saved to: $output_file" +} + +# ============================================================================ +# Print Configuration Summary +# ============================================================================ + +print_config() { + log "==============================================" + log "Benchmark Configuration" + log "==============================================" + log "" + log "Mock Server:" + log " Port: $MOCK_PORT" + log " Think Time: ${MOCK_THINK_TIME_MS}ms" + log " Threads: $MOCK_THREADS" + log " CPU Affinity: ${MOCK_TASKSET:-}" + log "" + log "Handoff Server:" + log " Port: $SERVER_PORT" + log " Threads: $SERVER_THREADS" + log " Custom Sched: $SERVER_USE_CUSTOM_SCHEDULER" + log " I/O Type: $SERVER_IO" + log " Poller Mode: $SERVER_POLLER_MODE" + log " FJ Parallelism: ${SERVER_FJ_PARALLELISM:-}" + log " CPU Affinity: ${SERVER_TASKSET:-}" + log " Extra JVM Args: ${SERVER_JVM_ARGS:-}" + log "" + log "Load Generator:" + log " Connections: $LOAD_GEN_CONNECTIONS" + log " Threads: $LOAD_GEN_THREADS" + log " Rate: ${LOAD_GEN_RATE:-}" + log " CPU Affinity: ${LOAD_GEN_TASKSET:-}" + log "" + log "Timing:" + log " Warmup: $WARMUP_DURATION" + log " Total: $TOTAL_DURATION" + log "" + log "Profiling:" + log " Enabled: $ENABLE_PROFILER" + if [[ "$ENABLE_PROFILER" == "true" ]]; then + log " Event: $PROFILER_EVENT" + log " Output: $PROFILER_OUTPUT" + fi + log "" + log "pidstat:" + log " Enabled: $ENABLE_PIDSTAT" + if [[ "$ENABLE_PIDSTAT" == "true" ]]; then + log " Interval: ${PIDSTAT_INTERVAL}s" + log " Output: $PIDSTAT_OUTPUT" + fi + log "" + log "perf stat:" + log " Enabled: $ENABLE_PERF_STAT" + if [[ "$ENABLE_PERF_STAT" == "true" ]]; then + log " Output: $PERF_STAT_OUTPUT" + fi + log "" + log "Output Directory: $OUTPUT_DIR" + log "==============================================" +} + +# ============================================================================ +# Main +# ============================================================================ + +main() { + # Parse command line arguments + while [[ $# -gt 0 ]]; do + case "$1" in + --help|-h) + cat << 'EOF' +Benchmark Runner Script + +Usage: ./run-benchmark.sh [OPTIONS] + +Environment Variables (can also be set via command line options): + +Mock Server: + MOCK_PORT Mock server port (default: 8080) + MOCK_THINK_TIME_MS Response delay in ms (default: 1) + MOCK_THREADS Number of threads (default: 1) + MOCK_TASKSET CPU affinity range (default: "4,5") + +Handoff Server: + SERVER_PORT Server port (default: 8081) + SERVER_THREADS Number of event loop threads (default: 2) + SERVER_USE_CUSTOM_SCHEDULER Use custom Netty scheduler (default: false) + SERVER_IO I/O type: epoll or nio (default: epoll) + SERVER_TASKSET CPU affinity range (default: "2,3") + SERVER_JVM_ARGS Additional JVM arguments + SERVER_POLLER_MODE jdk.pollerMode value: 1, 2, or 3 (default: 3) + SERVER_FJ_PARALLELISM ForkJoinPool parallelism (empty = JVM default) + +Load Generator: + LOAD_GEN_CONNECTIONS Number of connections (default: 100) + LOAD_GEN_THREADS Number of threads (default: 2) + LOAD_GEN_DURATION Test duration (default: 30s) + LOAD_GEN_RATE Target rate for wrk2 (empty = use wrk) + LOAD_GEN_TASKSET CPU affinity range (default: "0,1") + +Timing: + WARMUP_DURATION Warmup duration (default: 10s) + TOTAL_DURATION Total test duration (default: 30s) + +Profiling: + ENABLE_PROFILER Enable async-profiler (default: false) + ASYNC_PROFILER_PATH Path to async-profiler installation + PROFILER_EVENT Profiler event type (default: cpu) + PROFILER_OUTPUT Profiler output file (default: profile.html) + +pidstat: + ENABLE_PIDSTAT Enable pidstat collection (default: false) + PIDSTAT_INTERVAL Collection interval in seconds (default: 1) + PIDSTAT_OUTPUT Output file (default: pidstat.log) + +perf stat: + ENABLE_PERF_STAT Enable perf stat collection (default: false) + PERF_STAT_OUTPUT Output file (default: perf-stat.txt) + +General: + JAVA_HOME Path to Java installation (required) + OUTPUT_DIR Output directory (default: ./benchmark-results) + +Examples: + + # Basic run with custom scheduler + JAVA_HOME=/path/to/jdk SERVER_USE_CUSTOM_SCHEDULER=true ./run-benchmark.sh + + # Run with CPU pinning and profiling + JAVA_HOME=/path/to/jdk \ + MOCK_TASKSET="0" \ + SERVER_TASKSET="1-2" \ + LOAD_GEN_TASKSET="3" \ + ENABLE_PROFILER=true \ + ASYNC_PROFILER_PATH=/path/to/async-profiler \ + ./run-benchmark.sh + + # Rate-limited test with wrk2 + JAVA_HOME=/path/to/jdk \ + LOAD_GEN_RATE=10000 \ + TOTAL_DURATION=60s \ + WARMUP_DURATION=15s \ + ./run-benchmark.sh + +EOF + exit 0 + ;; + *) + error "Unknown option: $1" + ;; + esac + shift + done + + # Validate configuration + validate_config + + # Print configuration + print_config + + # Create output directory + mkdir -p "$OUTPUT_DIR" + + # Check jbang + check_jbang + + # Build JARs + build_jars + + # Start servers + start_mock_server + start_handoff_server + + # Run warmup (no profiling/pidstat) + run_warmup + + # Start monitoring after warmup + start_profiler + start_pidstat + start_perf_stat + + # Run actual load test + run_load_test + + # Stop monitoring + stop_profiler + stop_pidstat + + log "Benchmark complete!" + log "Results in: $OUTPUT_DIR" +} + +main "$@" + diff --git a/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/Fruit.java b/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/Fruit.java new file mode 100644 index 0000000..a90b89d --- /dev/null +++ b/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/Fruit.java @@ -0,0 +1,24 @@ +/* + * Copyright 2026 The Netty VirtualThread Scheduler Project + * + * The Netty VirtualThread Scheduler Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ +package io.netty.loom.benchmark.runner; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +/** + * POJO representing a fruit from the mock server JSON response. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public record Fruit(String name, String color, double price) { +} diff --git a/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/FruitsResponse.java b/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/FruitsResponse.java new file mode 100644 index 0000000..3bf41a6 --- /dev/null +++ b/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/FruitsResponse.java @@ -0,0 +1,26 @@ +/* + * Copyright 2026 The Netty VirtualThread Scheduler Project + * + * The Netty VirtualThread Scheduler Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ +package io.netty.loom.benchmark.runner; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.List; + +/** + * POJO representing the mock server JSON response containing a list of fruits. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public record FruitsResponse(List fruits) { +} diff --git a/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/HandoffHttpServer.java b/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/HandoffHttpServer.java new file mode 100644 index 0000000..89d1921 --- /dev/null +++ b/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/HandoffHttpServer.java @@ -0,0 +1,336 @@ +/* + * Copyright 2026 The Netty VirtualThread Scheduler Project + * + * The Netty VirtualThread Scheduler Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ +package io.netty.loom.benchmark.runner; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.IoEventLoop; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.EpollIoHandler; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.nio.NioIoHandler; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.loom.VirtualMultithreadIoEventLoopGroup; +import io.netty.util.AsciiString; +import io.netty.util.CharsetUtil; +import org.apache.hc.client5.http.ConnectionKeepAliveStrategy; +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.util.TimeValue; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.function.Supplier; + +/** + * HTTP server that demonstrates handoff benchmark patterns. + *

+ * Processing flow: 1. Receive HTTP request on Netty event loop 2. Hand off to + * virtual thread (optionally using custom scheduler) 3. Make blocking HTTP call + * to mock server using JDK HttpClient 4. Parse JSON response with Jackson into + * Fruit objects 5. Re-encode to JSON and write back to client via event loop + *

+ * Usage: java -cp benchmark-runner.jar + * io.netty.loom.benchmark.runner.HandoffHttpServer \ --port 8081 \ --mock-url + * http://localhost:8080/fruits \ --threads 2 \ --use-custom-scheduler true \ + * --io epoll + */ +public class HandoffHttpServer { + + public enum IO { + EPOLL, NIO + } + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ByteBuf HEALTH_RESPONSE = Unpooled + .unreleasableBuffer(Unpooled.copiedBuffer("OK", CharsetUtil.UTF_8)); + + private final int port; + private final String mockUrl; + private final int threads; + private final boolean useCustomScheduler; + private final IO io; + private final boolean silent; + + private MultiThreadIoEventLoopGroup workerGroup; + private Channel serverChannel; + private Supplier threadFactorySupplier; + + public HandoffHttpServer(int port, String mockUrl, int threads, boolean useCustomScheduler, IO io) { + this(port, mockUrl, threads, useCustomScheduler, io, false); + } + + public HandoffHttpServer(int port, String mockUrl, int threads, boolean useCustomScheduler, IO io, boolean silent) { + this.port = port; + this.mockUrl = mockUrl; + this.threads = threads; + this.useCustomScheduler = useCustomScheduler; + this.io = io; + this.silent = silent; + } + + public void start() throws InterruptedException { + var ioHandlerFactory = switch (io) { + case NIO -> NioIoHandler.newFactory(); + case EPOLL -> EpollIoHandler.newFactory(); + }; + + Class serverChannelClass = switch (io) { + case NIO -> NioServerSocketChannel.class; + case EPOLL -> EpollServerSocketChannel.class; + }; + + if (useCustomScheduler) { + var group = new VirtualMultithreadIoEventLoopGroup(threads, ioHandlerFactory); + threadFactorySupplier = group::vThreadFactory; + workerGroup = group; + } else { + workerGroup = new MultiThreadIoEventLoopGroup(threads, ioHandlerFactory); + var defaultFactory = Thread.ofVirtual().factory(); + threadFactorySupplier = () -> defaultFactory; + } + ServerBootstrap b = new ServerBootstrap(); + b.group(workerGroup).channel(serverChannelClass).childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HttpServerCodec()); + p.addLast(new HttpObjectAggregator(65536)); + p.addLast(new HandoffHandler()); + } + }); + + serverChannel = b.bind(port).sync().channel(); + if (!silent) { + System.out.printf("Handoff HTTP Server started on port %d%n", port); + System.out.printf(" Mock URL: %s%n", mockUrl); + System.out.printf(" Threads: %d%n", threads); + System.out.printf(" Custom Scheduler: %s%n", useCustomScheduler); + System.out.printf(" I/O: %s%n", io); + } + } + + public void stop() { + if (serverChannel != null) { + serverChannel.close(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + if (!silent) { + System.out.println("Server stopped"); + } + } + + public void awaitTermination() throws InterruptedException { + serverChannel.closeFuture().sync(); + } + + private class HandoffHandler extends SimpleChannelInboundHandler { + + private final CloseableHttpClient httpClient; + private ExecutorService orderedExecutorService; + + HandoffHandler() { + ConnectionKeepAliveStrategy keepAliveStrategy = (HttpResponse response, + HttpContext context) -> TimeValue.NEG_ONE_MILLISECOND; + BasicHttpClientConnectionManager cm = new BasicHttpClientConnectionManager(); + httpClient = HttpClientBuilder.create().setConnectionManager(cm).setConnectionManagerShared(false) + .setKeepAliveStrategy(keepAliveStrategy).build(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + orderedExecutorService = Executors.newSingleThreadExecutor(threadFactorySupplier.get()); + super.channelActive(ctx); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { + String uri = request.uri(); + boolean keepAlive = HttpUtil.isKeepAlive(request); + IoEventLoop eventLoop = (IoEventLoop) ctx.channel().eventLoop(); + + if (uri.equals("/health")) { + sendResponse(ctx, HEALTH_RESPONSE.duplicate(), HttpHeaderValues.TEXT_PLAIN, keepAlive); + return; + } + + if (uri.equals("/") || uri.startsWith("/fruits")) { + // Hand off to virtual thread for blocking processing + orderedExecutorService.execute(() -> { + doBlockingProcessing(ctx, eventLoop, keepAlive); + }); + return; + } + + // 404 for unknown paths + ByteBuf content = Unpooled.copiedBuffer("Not Found", CharsetUtil.UTF_8); + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, + content); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN); + response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + + private void doBlockingProcessing(ChannelHandlerContext ctx, IoEventLoop eventLoop, boolean keepAlive) { + try { + // 1. Make blocking HTTP call to mock server + HttpGet httpGet = new HttpGet(mockUrl); + try (CloseableHttpResponse httpResponse = httpClient.execute(httpGet)) { + HttpEntity entity = httpResponse.getEntity(); + if (entity == null) + throw new IOException("No response entity"); + try (InputStream is = entity.getContent()) { + // 2. Parse JSON into Fruit objects using Jackson + FruitsResponse fruitsResponse = OBJECT_MAPPER.readValue(is, FruitsResponse.class); + // 3. Re-encode to JSON bytes + byte[] responseBytes = OBJECT_MAPPER.writeValueAsBytes(fruitsResponse); + // 4. Post write back to event loop (non-blocking) + eventLoop.execute(() -> { + ByteBuf content = Unpooled.wrappedBuffer(responseBytes); + sendResponse(ctx, content, HttpHeaderValues.APPLICATION_JSON, keepAlive); + }); + } + EntityUtils.consumeQuietly(entity); + } + } catch (Throwable e) { + eventLoop.execute(() -> { + ByteBuf content = Unpooled.copiedBuffer("{\"error\":\"" + e.getMessage() + "\"}", + CharsetUtil.UTF_8); + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, + HttpResponseStatus.INTERNAL_SERVER_ERROR, content); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON); + response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + }); + } + } + + private void sendResponse(ChannelHandlerContext ctx, ByteBuf content, AsciiString contentType, + boolean keepAlive) { + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, + content); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType); + response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); + + if (keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + ctx.writeAndFlush(response); + } else { + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + try { + orderedExecutorService.execute(() -> { + try { + httpClient.close(); + } catch (IOException e) { + } finally { + orderedExecutorService.shutdown(); + } + }); + } finally { + super.channelInactive(ctx); + } + } + } + + public static void main(String[] args) throws InterruptedException { + // Parse arguments + int port = 8081; + String mockUrl = "http://localhost:8080/fruits"; + int threads = 1; + boolean useCustomScheduler = false; + IO io = IO.EPOLL; + boolean silent = false; + + for (int i = 0; i < args.length; i++) { + switch (args[i]) { + case "--port" -> port = Integer.parseInt(args[++i]); + case "--mock-url" -> mockUrl = args[++i]; + case "--threads" -> threads = Integer.parseInt(args[++i]); + case "--use-custom-scheduler" -> useCustomScheduler = Boolean.parseBoolean(args[++i]); + case "--io" -> io = IO.valueOf(args[++i].toUpperCase()); + case "--silent" -> silent = true; + case "--help" -> { + printUsage(); + return; + } + } + } + + HandoffHttpServer server = new HandoffHttpServer(port, mockUrl, threads, useCustomScheduler, io, silent); + server.start(); + + // Shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(server::stop)); + + server.awaitTermination(); + } + + private static void printUsage() { + System.out.println(""" + Usage: java -cp benchmark-runner.jar io.netty.loom.benchmark.runner.HandoffHttpServer [options] + + Options: + --port HTTP port (default: 8081) + --mock-url Mock server URL (default: http://localhost:8080/fruits) + --threads Number of event loop threads (default: 1) + --use-custom-scheduler Use custom Netty scheduler (default: false) + --io I/O type (default: epoll) + --silent Suppress output messages + --help Show this help + """); + } +} diff --git a/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/MockHttpServer.java b/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/MockHttpServer.java new file mode 100644 index 0000000..f1b0ba7 --- /dev/null +++ b/benchmark-runner/src/main/java/io/netty/loom/benchmark/runner/MockHttpServer.java @@ -0,0 +1,228 @@ +/* + * Copyright 2026 The Netty VirtualThread Scheduler Project + * + * The Netty VirtualThread Scheduler Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ +package io.netty.loom.benchmark.runner; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; + +import java.util.concurrent.TimeUnit; + +/** + * A minimal HTTP 1.1 mock server using plain Netty. Fast startup, configurable + * thread count and think time. + *

+ * Run as JAR: + * + *

+ *   java -cp benchmark-runner.jar io.netty.loom.benchmark.runner.MockHttpServer 8080 100 1
+ * 
+ *

+ * Arguments: [port] [thinkTimeMs] [threads] + *

    + *
  • port - HTTP port (default: 8080)
  • + *
  • thinkTimeMs - delay before response in ms (default: 100)
  • + *
  • threads - number of event loop threads (default: 1)
  • + *
+ */ +public class MockHttpServer { + + // Pre-computed cached response - a JSON list of fruits + private static final ByteBuf CACHED_RESPONSE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(""" + { + "fruits": [ + {"name": "Apple", "color": "Red", "price": 1.20}, + {"name": "Banana", "color": "Yellow", "price": 0.50}, + {"name": "Orange", "color": "Orange", "price": 0.80}, + {"name": "Grape", "color": "Purple", "price": 2.00}, + {"name": "Mango", "color": "Yellow", "price": 1.50}, + {"name": "Strawberry", "color": "Red", "price": 3.00}, + {"name": "Blueberry", "color": "Blue", "price": 4.00}, + {"name": "Pineapple", "color": "Yellow", "price": 2.50}, + {"name": "Watermelon", "color": "Green", "price": 5.00}, + {"name": "Kiwi", "color": "Brown", "price": 1.00} + ] + } + """, CharsetUtil.UTF_8)); + + private static final ByteBuf HEALTH_RESPONSE = Unpooled + .unreleasableBuffer(Unpooled.copiedBuffer("OK", CharsetUtil.UTF_8)); + + private final int port; + private final long thinkTimeMs; + private final int threads; + private final boolean silent; + private EventLoopGroup workerGroup; + private Channel serverChannel; + + public MockHttpServer(int port, long thinkTimeMs, int threads) { + this(port, thinkTimeMs, threads, false); + } + + public MockHttpServer(int port, long thinkTimeMs, int threads, boolean silent) { + this.port = port; + this.thinkTimeMs = thinkTimeMs; + this.threads = threads; + this.silent = silent; + } + + public void start() throws InterruptedException { + workerGroup = new NioEventLoopGroup(threads); + + ServerBootstrap b = new ServerBootstrap(); + b.group(workerGroup).channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HttpServerCodec()); + p.addLast(new HttpObjectAggregator(65536)); + p.addLast(new HttpHandler(thinkTimeMs)); + } + }); + + serverChannel = b.bind(port).sync().channel(); + if (!silent) { + System.out.printf("Mock HTTP Server started on port %d with %dms think time using %d thread(s)%n", port, + thinkTimeMs, threads); + } + } + + public void stop() { + if (serverChannel != null) { + serverChannel.close(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + if (!silent) { + System.out.println("Server stopped"); + } + } + + private static class HttpHandler extends SimpleChannelInboundHandler { + private final long thinkTimeMs; + + HttpHandler(long thinkTimeMs) { + this.thinkTimeMs = thinkTimeMs; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { + String uri = request.uri(); + boolean keepAlive = HttpUtil.isKeepAlive(request); + + if (uri.equals("/health")) { + sendResponse(ctx, HEALTH_RESPONSE.duplicate(), "text/plain", keepAlive); + return; + } + + if (uri.equals("/fruits") || uri.equals("/")) { + if (thinkTimeMs > 0) { + // Schedule response after think time delay + ctx.executor().schedule( + () -> sendResponse(ctx, CACHED_RESPONSE.duplicate(), "application/json", keepAlive), + thinkTimeMs, TimeUnit.MILLISECONDS); + } else { + sendResponse(ctx, CACHED_RESPONSE.duplicate(), "application/json", keepAlive); + } + return; + } + + // 404 for unknown paths + ByteBuf content = Unpooled.copiedBuffer("Not Found", CharsetUtil.UTF_8); + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, + content); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); + response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + + private void sendResponse(ChannelHandlerContext ctx, ByteBuf content, String contentType, boolean keepAlive) { + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, + content); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType); + response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); + + if (keepAlive) { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + ctx.writeAndFlush(response); + } else { + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + } + + public static void main(String[] args) throws InterruptedException { + int port = 8080; + long thinkTimeMs = 100; + int threads = 1; + boolean silent = false; + + for (int i = 0; i < args.length; i++) { + switch (args[i]) { + case "--port" -> port = Integer.parseInt(args[++i]); + case "--think-time" -> thinkTimeMs = Long.parseLong(args[++i]); + case "--threads" -> threads = Integer.parseInt(args[++i]); + case "--silent" -> silent = true; + default -> { + // Legacy positional args support + if (i == 0) + port = Integer.parseInt(args[i]); + else if (i == 1) + thinkTimeMs = Long.parseLong(args[i]); + else if (i == 2) + threads = Integer.parseInt(args[i]); + } + } + } + + MockHttpServer server = new MockHttpServer(port, thinkTimeMs, threads, silent); + server.start(); + + // Add shutdown hook for graceful shutdown + Runtime.getRuntime().addShutdownHook(new Thread(server::stop)); + + // Block main thread + server.serverChannel.closeFuture().sync(); + } +} diff --git a/benchmark-runner/src/test/java/io/netty/loom/benchmark/runner/BenchmarkIntegrationTest.java b/benchmark-runner/src/test/java/io/netty/loom/benchmark/runner/BenchmarkIntegrationTest.java new file mode 100644 index 0000000..91a2dff --- /dev/null +++ b/benchmark-runner/src/test/java/io/netty/loom/benchmark/runner/BenchmarkIntegrationTest.java @@ -0,0 +1,213 @@ +/* + * Copyright 2026 The Netty VirtualThread Scheduler Project + * + * The Netty VirtualThread Scheduler Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ +package io.netty.loom.benchmark.runner; + +import io.restassured.http.ContentType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.*; +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration test that verifies MockHttpServer and HandoffHttpServer work + * correctly together with different configurations. + *

+ * Tests cover: + *

    + *
  • NIO I/O with default scheduler
  • + *
  • NIO I/O with custom scheduler
  • + *
  • EPOLL I/O with default scheduler
  • + *
  • EPOLL I/O with custom scheduler
  • + *
+ */ +class BenchmarkIntegrationTest { + + private static final AtomicInteger PORT_COUNTER = new AtomicInteger(19000); + + private MockHttpServer mockServer; + private HandoffHttpServer handoffServer; + private int mockPort; + private int handoffPort; + + static Stream serverConfigurations() { + return Stream.of( + // IO type, use custom scheduler, description + Arguments.of(HandoffHttpServer.IO.NIO, false, "NIO with default scheduler"), + Arguments.of(HandoffHttpServer.IO.NIO, true, "NIO with custom scheduler"), + Arguments.of(HandoffHttpServer.IO.EPOLL, false, "EPOLL with default scheduler"), + Arguments.of(HandoffHttpServer.IO.EPOLL, true, "EPOLL with custom scheduler")); + } + + void startServers(HandoffHttpServer.IO ioType, boolean useCustomScheduler) throws Exception { + // Use unique ports for each test to avoid conflicts + mockPort = PORT_COUNTER.getAndIncrement(); + handoffPort = PORT_COUNTER.getAndIncrement(); + + // Start mock server with minimal think time for fast tests + mockServer = new MockHttpServer(mockPort, 0, 1); + mockServer.start(); + + // Wait for mock server to be ready + await().atMost(5, TimeUnit.SECONDS).until(() -> { + try { + return given().port(mockPort).when().get("/health").statusCode() == 200; + } catch (Exception e) { + return false; + } + }); + + // Start handoff server with specified configuration + handoffServer = new HandoffHttpServer(handoffPort, "http://localhost:" + mockPort + "/fruits", 1, + useCustomScheduler, ioType); + handoffServer.start(); + + // Wait for handoff server to be ready + await().atMost(5, TimeUnit.SECONDS).until(() -> { + try { + return given().port(handoffPort).when().get("/health").statusCode() == 200; + } catch (Exception e) { + return false; + } + }); + } + + @AfterEach + void stopServers() { + if (handoffServer != null) { + handoffServer.stop(); + handoffServer = null; + } + if (mockServer != null) { + mockServer.stop(); + mockServer = null; + } + } + + @ParameterizedTest(name = "{2}") + @MethodSource("serverConfigurations") + void mockServerHealthEndpoint(HandoffHttpServer.IO ioType, boolean useCustomScheduler, String description) + throws Exception { + startServers(ioType, useCustomScheduler); + given().port(mockPort).when().get("/health").then().statusCode(200).body(equalTo("OK")); + } + + @ParameterizedTest(name = "{2}") + @MethodSource("serverConfigurations") + void mockServerFruitsEndpoint(HandoffHttpServer.IO ioType, boolean useCustomScheduler, String description) + throws Exception { + startServers(ioType, useCustomScheduler); + given().port(mockPort).when().get("/fruits").then().statusCode(200).contentType(ContentType.JSON) + .body("fruits", hasSize(10)).body("fruits[0].name", equalTo("Apple")) + .body("fruits[0].color", equalTo("Red")).body("fruits[0].price", equalTo(1.20f)); + } + + @ParameterizedTest(name = "{2}") + @MethodSource("serverConfigurations") + void handoffServerHealthEndpoint(HandoffHttpServer.IO ioType, boolean useCustomScheduler, String description) + throws Exception { + startServers(ioType, useCustomScheduler); + given().port(handoffPort).when().get("/health").then().statusCode(200).body(equalTo("OK")); + } + + @ParameterizedTest(name = "{2}") + @MethodSource("serverConfigurations") + void handoffServerFruitsEndpoint(HandoffHttpServer.IO ioType, boolean useCustomScheduler, String description) + throws Exception { + startServers(ioType, useCustomScheduler); + given().port(handoffPort).when().get("/fruits").then().statusCode(200).contentType(ContentType.JSON) + .body("fruits", hasSize(10)).body("fruits[0].name", equalTo("Apple")) + .body("fruits[0].color", equalTo("Red")); + } + + @ParameterizedTest(name = "{2}") + @MethodSource("serverConfigurations") + void handoffServerRootEndpoint(HandoffHttpServer.IO ioType, boolean useCustomScheduler, String description) + throws Exception { + startServers(ioType, useCustomScheduler); + given().port(handoffPort).when().get("/").then().statusCode(200).contentType(ContentType.JSON).body("fruits", + hasSize(10)); + } + + @ParameterizedTest(name = "{2}") + @MethodSource("serverConfigurations") + void handoffServer404ForUnknownPath(HandoffHttpServer.IO ioType, boolean useCustomScheduler, String description) + throws Exception { + startServers(ioType, useCustomScheduler); + given().port(handoffPort).when().get("/unknown").then().statusCode(404); + } + + @ParameterizedTest(name = "{2}") + @MethodSource("serverConfigurations") + void handoffServerReturnsAllFruits(HandoffHttpServer.IO ioType, boolean useCustomScheduler, String description) + throws Exception { + startServers(ioType, useCustomScheduler); + + List fruitNames = given().port(handoffPort).when().get("/fruits").then().statusCode(200).extract() + .jsonPath().getList("fruits.name", String.class); + + assertEquals(10, fruitNames.size()); + assertTrue(fruitNames.contains("Apple")); + assertTrue(fruitNames.contains("Banana")); + assertTrue(fruitNames.contains("Kiwi")); + } + + @ParameterizedTest(name = "{2}") + @MethodSource("serverConfigurations") + void handoffServerHandlesMultipleRequests(HandoffHttpServer.IO ioType, boolean useCustomScheduler, + String description) throws Exception { + startServers(ioType, useCustomScheduler); + + // Send multiple requests to verify server handles concurrent load + for (int i = 0; i < 10; i++) { + given().port(handoffPort).when().get("/fruits").then().statusCode(200).body("fruits", hasSize(10)); + } + } + + @ParameterizedTest(name = "{2}") + @MethodSource("serverConfigurations") + void verifyEndToEndJsonParsing(HandoffHttpServer.IO ioType, boolean useCustomScheduler, String description) + throws Exception { + startServers(ioType, useCustomScheduler); + + // This test verifies the complete flow: + // 1. HandoffHttpServer receives request + // 2. Makes blocking call to MockHttpServer + // 3. Parses JSON with Jackson into Fruit objects + // 4. Re-encodes and returns + + FruitsResponse response = given().port(handoffPort).when().get("/fruits").then().statusCode(200).extract() + .as(FruitsResponse.class); + + assertNotNull(response); + assertNotNull(response.fruits()); + assertEquals(10, response.fruits().size()); + + Fruit apple = response.fruits().stream().filter(f -> "Apple".equals(f.name())).findFirst().orElse(null); + + assertNotNull(apple); + assertEquals("Red", apple.color()); + assertEquals(1.20, apple.price(), 0.01); + } +} diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index efa129a..23bf59d 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -13,6 +13,11 @@ netty-virtualthread-benchmarks + + org.jctools + jctools-core + 4.0.5 + io.netty.loom netty-virtualthread-core diff --git a/benchmarks/src/main/java/io/netty/loom/benchmark/SchedulerHandoffBenchmark.java b/benchmarks/src/main/java/io/netty/loom/benchmark/SchedulerHandoffBenchmark.java new file mode 100644 index 0000000..a156b1a --- /dev/null +++ b/benchmarks/src/main/java/io/netty/loom/benchmark/SchedulerHandoffBenchmark.java @@ -0,0 +1,281 @@ +/* + * Copyright 2026 The Netty VirtualThread Scheduler Project + * + * The Netty VirtualThread Scheduler Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ +package io.netty.loom.benchmark; + +import io.netty.channel.IoEventLoop; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.epoll.EpollIoHandler; +import io.netty.channel.nio.NioIoHandler; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.loom.VirtualMultithreadIoEventLoopGroup; +import io.netty.util.concurrent.FastThreadLocal; +import org.HdrHistogram.Histogram; +import org.jctools.queues.MpscArrayQueue; +import org.jctools.util.Pow2; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.BenchmarkParams; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.*; +import java.util.function.Supplier; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 10, time = 1) +@Measurement(iterations = 10, time = 1) +@State(Scope.Benchmark) +public class SchedulerHandoffBenchmark { + + public enum IO { + EPOLL, NIO, IO_URING + } + + // default is 1000 rps per user + @Param({"100"}) + int serviceTimeUs; + + @Param({"200000"}) + int rate; + + @Param({"512"}) + int requestBytes; + + @Param({"1024"}) + int responseBytes; + + // the total throughput will be roughly concurrency * 1000 / serviceTimeMs + @Param({"50"}) + int concurrency; + + private static final int EL_THREADS = Integer.getInteger("elThreads", -1); + + @Param({"EPOLL"}) + IO io; + + MultiThreadIoEventLoopGroup group; + + Supplier threadFactory; + + record RequestData(byte[] request, byte[] response) { + } + + // let's make + Queue requestData; + + private static final CopyOnWriteArrayList histograms = new CopyOnWriteArrayList<>(); + + private static final FastThreadLocal rttHistogram = new FastThreadLocal<>() { + @Override + public Histogram initialValue() { + var histo = new Histogram(3); + histograms.add(histo); + return histo; + } + }; + + private long fireTimePeriodNs; + private boolean allOutThroughput; + private long nextFireTime; + + @Setup(Level.Trial) + public void resetHistograms() { + // TODO verify if the trial is the right level + histograms.forEach(Histogram::reset); + } + + @Setup + public void setup(BenchmarkParams params) throws ExecutionException, InterruptedException { + if (rate < 0) { + allOutThroughput = true; + } else { + fireTimePeriodNs = (long) (1000_000_000d / rate); + } + if (EL_THREADS <= 0) { + throw new IllegalStateException("Please set the elThreads system property to a positive integer"); + } + var ioFactory = switch (io) { + case NIO -> NioIoHandler.newFactory(); + case IO_URING -> IoUringIoHandler.newFactory(); + case EPOLL -> EpollIoHandler.newFactory(); + }; + if (params.getBenchmark().contains("custom")) { + var group = new VirtualMultithreadIoEventLoopGroup(EL_THREADS, ioFactory); + threadFactory = group::vThreadFactory; + this.group = group; + } else { + group = new MultiThreadIoEventLoopGroup(EL_THREADS, ioFactory); + var sameFactory = Thread.ofVirtual().factory(); + threadFactory = () -> sameFactory; + } + if (concurrency > 0) { + requestData = new MpscArrayQueue<>(Pow2.roundToPowerOfTwo(concurrency)); + for (int i = 0; i < concurrency; i++) { + requestData.offer(new RequestData(new byte[requestBytes], new byte[responseBytes])); + } + } else { + requestData = null; + } + nextFireTime = System.nanoTime(); + } + + private static void spinUntil(long targetTimeNs) { + while (System.nanoTime() < targetTimeNs) { + Thread.onSpinWait(); + } + } + + @Benchmark + @Fork(value = 2, jvmArgs = {"--add-opens=java.base/java.lang=ALL-UNNAMED", "-XX:+UnlockExperimentalVMOptions", + "-XX:-DoJVMTIVirtualThreadTransitions", "-Djdk.trackAllThreads=false", + "-Djdk.virtualThreadScheduler.implClass=io.netty.loom.NettyScheduler", "-Djdk.pollerMode=3", + "-DelThreads=1"}) + public void customScheduler(Blackhole bh) throws InterruptedException { + doRequest(bh); + } + + @Benchmark + @Fork(value = 2, jvmArgs = {"--add-opens=java.base/java.lang=ALL-UNNAMED", "-XX:+UnlockExperimentalVMOptions", + "-XX:-DoJVMTIVirtualThreadTransitions", "-Djdk.trackAllThreads=false", + "-Djdk.virtualThreadScheduler.implClass=io.netty.loom.NettyScheduler", "-Djdk.pollerMode=3", + "-DelThreads=2"}) + public void customSchedulerTwoEL(Blackhole bh) throws InterruptedException { + doRequest(bh); + } + + @Benchmark + @Fork(value = 2, jvmArgs = {"--add-opens=java.base/java.lang=ALL-UNNAMED", "-XX:+UnlockExperimentalVMOptions", + "-XX:-DoJVMTIVirtualThreadTransitions", "-Djdk.trackAllThreads=false", "-DelThreads=1", + "-Djdk.virtualThreadScheduler.parallelism=1"}) + public void defaultScheduler(Blackhole bh) throws InterruptedException { + doRequest(bh); + } + + // just burn a full core on this! + private void doRequest(Blackhole bh) { + if (!allOutThroughput) { + spinUntil(nextFireTime); + } + var data = spinWaitRequest(); + // avoid coordinated omission! + long startRequest = allOutThroughput ? System.nanoTime() : nextFireTime; + if (!allOutThroughput) { + nextFireTime += fireTimePeriodNs; + } + // write some data into the request + byte[] request = data == null ? new byte[requestBytes] : data.request; + byte[] response = data == null ? null : data.response; + + Arrays.fill(request, (byte) 1); + + // this is handing off this to the loom scheduler + var el = group.next(); + el.execute(() -> { + // This is running in a Netty event loop thread + // process the request by reading it + for (byte b : request) { + bh.consume(b); + } + // off-load the actual (blocking) processing to a virtual thread + threadFactory.get().newThread(() -> { + blockingProcess(bh, el, startRequest, data, response); + }).start(); + }); + } + + // this is required to make sure NONE of the fine grain ops like writeByte won't + // be inlined + @CompilerControl(CompilerControl.Mode.DONT_INLINE) + private void blockingProcess(Blackhole bh, IoEventLoop el, long startRequest, RequestData data, byte[] response) { + try { + // simulate processing time: + // NOTE: if we're using sleep here, the built-in scheduler will use the FJ + // built-in one + // but the custom scheduler, nope, see + // https://github.com/openjdk/loom/blob/3d9e866f60bdebc55b59b9fd40a4898002c35e96/src/java.base/share/classes/java/lang/VirtualThread.java#L1629 + if (serviceTimeUs > 0) { + TimeUnit.MICROSECONDS.sleep(serviceTimeUs); + } + // write the response content + if (response == null) { + response = new byte[responseBytes]; + } + for (int i = 0; i < responseBytes; i++) { + response[i] = (byte) 42; + } + byte[] responseCreated = response; + el.execute(() -> { + nonBlockingCompleteProcessing(bh, startRequest, data, responseCreated); + }); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + // this is required to make sure NONE of the fine grain ops like writeByte won't + // be inlined + @CompilerControl(CompilerControl.Mode.DONT_INLINE) + private void nonBlockingCompleteProcessing(Blackhole bh, long startRequest, RequestData data, byte[] response) { + // read the response + int toRead = this.responseBytes; + for (int i = 0; i < toRead; i++) { + bh.consume(response[i]); + } + // record RTT + long rttNs = System.nanoTime() - startRequest; + Histogram histogram = rttHistogram.get(); + histogram.recordValue(rttNs); + // offer it just at the end + if (data != null) { + requestData.add(data); + } + } + + private RequestData spinWaitRequest() { + Queue requestData = this.requestData; + if (requestData == null) { + return null; + } + var request = requestData.poll(); + while (request == null) { + Thread.onSpinWait(); + request = requestData.poll(); + } + return request; + } + + @TearDown + public void shutdown() throws ExecutionException, InterruptedException { + // wait for all tasks to complete + if (concurrency > 0) { + for (int i = 0; i < concurrency; i++) { + spinWaitRequest(); + } + } else { + // TODO enqueue for each EL a request waiting to complete? + } + group.shutdownGracefully().get(); + // print percentiles of RTT + Histogram combined = new Histogram(3); + histograms.forEach(combined::add); + histograms.clear(); + // Print percentile distribution + System.out.printf("RTT (µs) - Avg: %.2f, P50: %.2f, P90: %.2f, P99: %.2f, Max: %.2f%n", + combined.getMean() / 1000.0, combined.getValueAtPercentile(50) / 1000.0, + combined.getValueAtPercentile(90) / 1000.0, combined.getValueAtPercentile(99) / 1000.0, + combined.getMaxValue() / 1000.0); + } +} diff --git a/core/src/main/java/io/netty/loom/EventLoopScheduler.java b/core/src/main/java/io/netty/loom/EventLoopScheduler.java index 4e54987..1276880 100644 --- a/core/src/main/java/io/netty/loom/EventLoopScheduler.java +++ b/core/src/main/java/io/netty/loom/EventLoopScheduler.java @@ -149,15 +149,21 @@ private void nettyEventLoop() { boolean canBlock = false; while (!ioEventLoop.isShuttingDown()) { canBlock = runIO(canBlock); - Thread.yield(); + if (!runQueue.isEmpty()) { + Thread.yield(); + } // try running leftover write tasks before checking for I/O tasks canBlock &= ioEventLoop.runNonBlockingTasks(RUNNING_YIELD_US) == 0; - Thread.yield(); + if (!runQueue.isEmpty()) { + Thread.yield(); + } } // we are shutting down, it shouldn't take long so let's spin a bit :P while (!ioEventLoop.isTerminated()) { ioEventLoop.runNow(); - Thread.yield(); + if (!runQueue.isEmpty()) { + Thread.yield(); + } } } @@ -284,7 +290,9 @@ public boolean execute(Thread.VirtualThreadTask task) { // continuations // whilst is just woken up for I/O assert eventLoopContinuatioToRun == null; - Thread.yield(); + if (!runQueue.isEmpty()) { + Thread.yield(); + } } return true; } diff --git a/pom.xml b/pom.xml index bc9a66b..7a5097e 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,7 @@ core benchmarks + benchmark-runner example-echo