diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java index dc91353ee..600619404 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java @@ -19,12 +19,12 @@ import io.openmessaging.benchmark.utils.PaddingDecimalFormat; import io.openmessaging.benchmark.utils.RandomGenerator; import io.openmessaging.benchmark.utils.Timer; -import io.openmessaging.benchmark.utils.payload.FilePayloadReader; import io.openmessaging.benchmark.utils.payload.PayloadReader; import io.openmessaging.benchmark.worker.Worker; import io.openmessaging.benchmark.worker.commands.ConsumerAssignment; import io.openmessaging.benchmark.worker.commands.CountersStats; import io.openmessaging.benchmark.worker.commands.CumulativeLatencies; +import io.openmessaging.benchmark.worker.commands.Payload; import io.openmessaging.benchmark.worker.commands.PeriodStats; import io.openmessaging.benchmark.worker.commands.ProducerWorkAssignment; import io.openmessaging.benchmark.worker.commands.TopicSubscription; @@ -95,8 +95,6 @@ public TestResult run() throws Exception { }); } - final PayloadReader payloadReader = new FilePayloadReader(workload.messageSize); - ProducerWorkAssignment producerWorkAssignment = new ProducerWorkAssignment(); producerWorkAssignment.keyDistributorType = workload.keyDistributor; producerWorkAssignment.publishRate = targetPublishRate; @@ -113,10 +111,11 @@ public TestResult run() throws Exception { r.nextBytes(randArray); byte[] zerodArray = new byte[zerodBytes]; byte[] combined = ArrayUtils.addAll(randArray, zerodArray); - producerWorkAssignment.payloadData.add(combined); + producerWorkAssignment.payloadData.add(new Payload(combined)); } } else { - producerWorkAssignment.payloadData.add(payloadReader.load(workload.payloadFile)); + final PayloadReader payloadReader = new PayloadReader(workload.messageSize); + producerWorkAssignment.payloadData = payloadReader.load(workload.payloadFile); } worker.startLoad(producerWorkAssignment); diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/FilePayloadReader.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/FilePayloadReader.java deleted file mode 100644 index 2fe60e052..000000000 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/FilePayloadReader.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.openmessaging.benchmark.utils.payload; - -import static java.nio.file.Files.readAllBytes; - -import java.io.File; -import java.io.IOException; -import java.text.MessageFormat; - -public class FilePayloadReader implements PayloadReader { - - private final int expectedLength; - - public FilePayloadReader(int expectedLength) { - this.expectedLength = expectedLength; - } - - @Override - public byte[] load(String resourceName) { - byte[] payload; - try { - payload = readAllBytes(new File(resourceName).toPath()); - checkPayloadLength(payload); - return payload; - } catch (IOException e) { - throw new PayloadException(e.getMessage()); - } - } - - private void checkPayloadLength(byte[] payload) { - if (expectedLength != payload.length) { - throw new PayloadException( - MessageFormat.format( - "Payload length mismatch. Actual is: {0}, but expected: {1} ", - payload.length, expectedLength)); - } - } -} diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/PayloadReader.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/PayloadReader.java index 97d82aa80..e835ec995 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/PayloadReader.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/payload/PayloadReader.java @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ /* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,7 +26,94 @@ */ package io.openmessaging.benchmark.utils.payload; -public interface PayloadReader { +import static java.nio.file.Files.readAllBytes; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.openmessaging.benchmark.worker.commands.Payload; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PayloadReader { + private static final Logger log = LoggerFactory.getLogger(PayloadReader.class); + public final int expectedLength; + private static final ObjectMapper mapper = + new ObjectMapper( + new YAMLFactory().configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false)) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + static { + mapper.enable(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE); + } + + public PayloadReader(int expectedLength) { + this.expectedLength = expectedLength; + } + + private void checkPayloadLength(List payload) { + if (payload.size() == 1 && payload.get(0).data.length != expectedLength) { + throw new PayloadException( + MessageFormat.format( + "payload length mismatch. Actual payload size is: {0}, but expected: {1} ", + payload.get(0).data.length, expectedLength)); + } + int total = 0; + for (Payload p : payload) { + total += p.data.length; + } + int avg = total / payload.size(); + int expect10p = (int) (expectedLength * 0.1); + if (expectedLength - expect10p < avg || expectedLength + expect10p > avg) { + log.warn( + "Average payload length {} differs from expected length {} by over 10% " + + "this means that throughput target maybe incorrect", + avg, expectedLength); + } + } - byte[] load(String resourceName); + public List load(String payloadFile) { + List out = new ArrayList<>(); + try { + File f = new File(payloadFile); + if (Files.isDirectory(f.toPath())) { + File[] files = f.listFiles(); + if (files == null) { + throw new PayloadException("list files returned null for file " + payloadFile); + } + for (File file : files) { + if (!file.isFile()) { + continue; + } + if (file.getName().endsWith(".payload.yaml")) { + Payload p = mapper.readValue(file, Payload.class); + log.info( + "Loaded payload from file file='{}', headers='{}', data='{}'", + file.getAbsolutePath(), + p.headers, + new String(p.data, StandardCharsets.UTF_8)); + out.add(p); + } else { + byte[] payload = readAllBytes(file.toPath()); + out.add(new Payload(payload)); + } + } + } else { + byte[] payload = readAllBytes(f.toPath()); + out.add(new Payload(payload)); + } + checkPayloadLength(out); + return out; + } catch (IOException e) { + throw new PayloadException(e.getMessage()); + } + } } diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java index b968b5ce7..1faaeb856 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java @@ -36,6 +36,7 @@ import io.openmessaging.benchmark.worker.commands.ConsumerAssignment; import io.openmessaging.benchmark.worker.commands.CountersStats; import io.openmessaging.benchmark.worker.commands.CumulativeLatencies; +import io.openmessaging.benchmark.worker.commands.Payload; import io.openmessaging.benchmark.worker.commands.PeriodStats; import io.openmessaging.benchmark.worker.commands.ProducerWorkAssignment; import io.openmessaging.benchmark.worker.commands.TopicsInfo; @@ -194,7 +195,7 @@ public void probeProducers() throws IOException { } private void submitProducersToExecutor( - List producers, KeyDistributor keyDistributor, List payloads) { + List producers, KeyDistributor keyDistributor, List payloads) { ThreadLocalRandom r = ThreadLocalRandom.current(); int payloadCount = payloads.size(); executor.submit( diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/MessageProducer.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/MessageProducer.java index bf1191ef9..6c208f1d1 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/MessageProducer.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/MessageProducer.java @@ -17,6 +17,7 @@ import io.openmessaging.benchmark.driver.BenchmarkProducer; import io.openmessaging.benchmark.utils.UniformRateLimiter; +import io.openmessaging.benchmark.worker.commands.Payload; import java.util.Optional; import java.util.function.Supplier; import org.slf4j.Logger; @@ -38,13 +39,13 @@ public class MessageProducer { this.stats = stats; } - public void sendMessage(BenchmarkProducer producer, Optional key, byte[] payload) { + public void sendMessage(BenchmarkProducer producer, Optional key, Payload payload) { final long intendedSendTime = rateLimiter.acquire(); uninterruptibleSleepNs(intendedSendTime); final long sendTime = nanoClock.get(); producer - .sendAsync(key, payload) - .thenRun(() -> success(payload.length, intendedSendTime, sendTime)) + .sendAsync(key, payload.data, payload.headers) + .thenRun(() -> success(payload.data.length, intendedSendTime, sendTime)) .exceptionally(this::failure); } diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/WorkerHandler.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/WorkerHandler.java index f7142cf9f..352903d60 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/WorkerHandler.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/WorkerHandler.java @@ -116,7 +116,7 @@ private void handleStartLoad(Context ctx) throws Exception { log.info( "Start load publish-rate: {} msg/s -- payload-size: {}", producerWorkAssignment.publishRate, - producerWorkAssignment.payloadData.get(0).length); + producerWorkAssignment.payloadData.get(0).data.length); localWorker.startLoad(producerWorkAssignment); } diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/Payload.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/Payload.java new file mode 100644 index 000000000..bb0d1e008 --- /dev/null +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/Payload.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.benchmark.worker.commands; + + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.json.JsonMapper; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Map; +import javax.annotation.Nullable; + +public class Payload { + @JsonDeserialize(using = FlexibleByteArrayDeserializer.class) + public byte[] data; + + @Nullable public Map headers; + + public Payload() {} + + public Payload(byte[] data) { + this.data = data; + } +} + +class FlexibleByteArrayDeserializer extends JsonDeserializer { + @Override + public byte[] deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonToken token = p.currentToken(); + + if (token == JsonToken.VALUE_STRING) { + try { + return Base64.getDecoder().decode(p.getValueAsString()); + } catch (IllegalArgumentException e) { + // Not Base64, treat as UTF-8 string + } + return p.getValueAsString().getBytes(StandardCharsets.UTF_8); + } else if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) { + // Handle as JSON object/array - serialize it + JsonNode node = p.readValueAsTree(); + return new JsonMapper().writeValueAsBytes(node); + } + + throw new IOException("Cannot deserialize byte[] from " + token); + } +} diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/ProducerWorkAssignment.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/ProducerWorkAssignment.java index 9506ef7cb..725104054 100644 --- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/ProducerWorkAssignment.java +++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/commands/ProducerWorkAssignment.java @@ -19,7 +19,7 @@ public class ProducerWorkAssignment { - public List payloadData; + public List payloadData; public double publishRate; diff --git a/bin/generate_payloads.sh b/bin/generate_payloads.sh new file mode 100755 index 000000000..314ec523b --- /dev/null +++ b/bin/generate_payloads.sh @@ -0,0 +1,357 @@ +#!/usr/bin/env bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -euo pipefail + +# Generate benchmark payload files with controlled variability and headers +# +# Dependencies: jq + +show_help() { + cat << EOF +Usage: $(basename "$0") [OPTIONS] + +Generate benchmark payload files with controlled variability and headers. + +OPTIONS: + -o, --output-dir DIR Output directory for payload files (required) + -n, --num-files NUM Number of payload files to generate (required) + -s, --size BYTES Target size of each payload in bytes (required) + -H, --headers HEADERS Headers to add to each payload + Format: "header1=value1:percentage1,value2:percentage2;header2=value3:percentage3,value4:percentage4" + Example: "environment=production:70,staging:30;region=us-east:50,us-west:50" + Percentages for each header should sum to 100 + -e, --entropy INT Entropy/variability level (0-100, default: 50) + 0 = identical payloads + 100 = maximum variability + -h, --help Show this help message + +EXAMPLES: + # Generate 100 files with 1KB payloads and medium entropy + $(basename "$0") -o payloads -n 100 -s 1024 -e 50 + + # Generate with header distribution (70% production, 30% staging) + $(basename "$0") -o payloads -n 100 -s 1024 -H "environment=production:70,staging:30" -e 50 + + # Generate with multiple headers + $(basename "$0") -o payloads -n 100 -s 1024 -H "environment=production:70,staging:30;region=us-east:50,us-west:50" -e 50 + + # Generate with low variability (almost identical payloads) + $(basename "$0") -o payloads -n 50 -s 512 -e 10 + + # Generate with high variability + $(basename "$0") -o payloads -n 50 -s 512 -e 90 + +EOF +} + +# Default values +OUTPUT_DIR="" +NUM_FILES="" +SIZE="" +HEADERS="" +ENTROPY="50" + +# Parse arguments +while [[ $# -gt 0 ]]; do + case $1 in + -o|--output-dir) + OUTPUT_DIR="$2" + shift 2 + ;; + -n|--num-files) + NUM_FILES="$2" + shift 2 + ;; + -s|--size) + SIZE="$2" + shift 2 + ;; + -H|--headers) + HEADERS="$2" + shift 2 + ;; + -e|--entropy) + ENTROPY="$2" + shift 2 + ;; + -h|--help) + show_help + exit 0 + ;; + *) + echo "Error: Unknown option: $1" >&2 + show_help + exit 1 + ;; + esac +done + +# Validate required arguments +if [[ -z "$OUTPUT_DIR" ]] || [[ -z "$NUM_FILES" ]] || [[ -z "$SIZE" ]]; then + echo "Error: Missing required arguments" >&2 + show_help + exit 1 +fi + +# Validate numeric arguments +if ! [[ "$NUM_FILES" =~ ^[0-9]+$ ]] || [[ "$NUM_FILES" -lt 1 ]]; then + echo "Error: Number of files must be a positive integer" >&2 + exit 1 +fi + +if ! [[ "$SIZE" =~ ^[0-9]+$ ]] || [[ "$SIZE" -lt 1 ]]; then + echo "Error: Size must be a positive integer" >&2 + exit 1 +fi + +# Validate entropy +if ! [[ "$ENTROPY" =~ ^[0-9]+$ ]] || [[ "$ENTROPY" -lt 0 ]] || [[ "$ENTROPY" -gt 100 ]]; then + echo "Error: Entropy must be an integer between 0 and 100" >&2 + exit 1 +fi + +# Check dependencies +if ! command -v jq &> /dev/null; then + echo "Error: jq is required but not installed" >&2 + exit 1 +fi + +# Create output directory +mkdir -p "$OUTPUT_DIR" + +echo "Generating $NUM_FILES payload files in $OUTPUT_DIR" +echo "Target payload size: $SIZE bytes" +echo "Entropy: $ENTROPY" +if [[ -n "$HEADERS" ]]; then + echo "Headers: $HEADERS" +fi +echo + +# Parse headers into parallel arrays +# Format: "header1=value1:percentage1,value2:percentage2;header2=value3:percentage3,value4:percentage4" +declare -a HEADER_NAMES +declare -a HEADER_VALUE_SPECS + +if [[ -n "$HEADERS" ]]; then + IFS=';' read -ra HEADER_SPECS <<< "$HEADERS" + for header_spec in "${HEADER_SPECS[@]}"; do + IFS='=' read -ra HEADER_PARTS <<< "$header_spec" + header_name="${HEADER_PARTS[0]}" + values_spec="${HEADER_PARTS[1]}" + HEADER_NAMES+=("$header_name") + HEADER_VALUE_SPECS+=("$values_spec") + done +fi + +# Function to select weighted random value for a given header's value spec +select_header_value() { + local values_spec="$1" + + declare -a value_list + declare -a percentage_list + + IFS=',' read -ra PAIRS <<< "$values_spec" + for pair in "${PAIRS[@]}"; do + IFS=':' read -ra PARTS <<< "$pair" + value="${PARTS[0]}" + percentage="${PARTS[1]:-50}" + value_list+=("$value") + percentage_list+=("$percentage") + done + + local rand=$((RANDOM % 100)) + local cumulative=0 + + for i in "${!value_list[@]}"; do + cumulative=$((cumulative + percentage_list[i])) + + if [[ $rand -lt $cumulative ]]; then + echo "${value_list[$i]}" + return + fi + done + + # Fallback + echo "${value_list[0]}" +} + +# Function to generate random string +random_string() { + local length=$1 + cat /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c "$length" +} + +# Function to generate JSON payload +generate_json_payload() { + local target_size=$1 + local entropy=$2 # Now 0-100 + local file_index=$3 + + # Start with base fields + local timestamp + local id + + if [[ $entropy -gt 0 ]]; then + timestamp=$((1000000000 + RANDOM * 10000)) + else + timestamp=1234567890 + fi + + if [[ $entropy -gt 50 ]]; then + id=$(random_string 16) + else + local id_variation=$((entropy * 10)) + id_variation=$((id_variation > 0 ? id_variation : 1)) + id="fixed-id-$((RANDOM % id_variation))" + fi + + # Create base JSON + local json=$(jq -n \ + --argjson ts "$timestamp" \ + --arg id "$id" \ + '{timestamp: $ts, id: $id}') + + # Calculate current size + local current_size=$(echo -n "$json" | wc -c | tr -d ' ') + + if [[ $current_size -ge $target_size ]]; then + echo "$json" + return + fi + + local remaining_size=$((target_size - current_size)) + + # Determine number of fields based on entropy + local num_fields + if [[ $entropy -lt 30 ]]; then + num_fields=1 + elif [[ $entropy -lt 70 ]]; then + num_fields=$((1 + RANDOM % 3)) + else + num_fields=$((2 + RANDOM % 4)) + fi + + # Calculate chars per field + local chars_per_field=$((remaining_size / (num_fields + 1))) + + # Add data fields + for ((i=0; i 1 ? string_length : 1)) + fi + + local field_value=$(random_string "$string_length") + json=$(echo "$json" | jq --arg key "$field_name" --arg val "$field_value" '. + {($key): $val}') + done + + # Fine-tune size with padding + current_size=$(echo -n "$json" | wc -c | tr -d ' ') + if [[ $current_size -lt $target_size ]]; then + local padding_size=$((target_size - current_size - 15)) + if [[ $padding_size -gt 0 ]]; then + local padding=$(random_string "$padding_size") + json=$(echo "$json" | jq --arg pad "$padding" '. + {_padding: $pad}') + fi + fi + + echo "$json" +} + +# Track sizes for statistics +declare -a SIZES + +# Generate payload files +for ((i=0; i "$filepath" + else + jq -n \ + --argjson data "$payload" \ + '{value: $data}' > "$filepath" + fi + + # Calculate actual size + actual_size=$(echo -n "$payload" | wc -c | tr -d ' ') + SIZES+=("$actual_size") + + # Progress update + if [[ $((i + 1)) -eq 1 ]] || [[ $(((i + 1) % 100)) -eq 0 ]] || [[ $((i + 1)) -eq $NUM_FILES ]]; then + echo "Generated $((i + 1))/$NUM_FILES files... (last file data size: $actual_size bytes)" + fi +done + +# Calculate statistics +total=0 +min_size=${SIZES[0]} +max_size=${SIZES[0]} + +for size in "${SIZES[@]}"; do + total=$((total + size)) + if [[ $size -lt $min_size ]]; then + min_size=$size + fi + if [[ $size -gt $max_size ]]; then + max_size=$size + fi +done + +avg_size=$(echo "scale=1; $total / $NUM_FILES" | bc) +size_variance=$((max_size - min_size)) +variance_percent=$(echo "scale=1; ($size_variance / $avg_size) * 100" | bc) + +echo +echo "Generation complete!" +echo "Average payload size: $avg_size bytes" +echo "Min payload size: $min_size bytes" +echo "Max payload size: $max_size bytes" +echo "Size variance: $size_variance bytes ($variance_percent%)" \ No newline at end of file diff --git a/driver-api/src/main/java/io/openmessaging/benchmark/driver/BenchmarkProducer.java b/driver-api/src/main/java/io/openmessaging/benchmark/driver/BenchmarkProducer.java index 1662cf1c6..1681cec71 100644 --- a/driver-api/src/main/java/io/openmessaging/benchmark/driver/BenchmarkProducer.java +++ b/driver-api/src/main/java/io/openmessaging/benchmark/driver/BenchmarkProducer.java @@ -14,6 +14,7 @@ package io.openmessaging.benchmark.driver; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -27,4 +28,18 @@ public interface BenchmarkProducer extends AutoCloseable { * @return a future that will be triggered when the message is successfully published */ CompletableFuture sendAsync(Optional key, byte[] payload); + + /** + * Same as sendAsync but can add headers to the message when supported by the driver. When not + * supported, the default implementation ignores the headers. + * + * @param key the key associated with this message + * @param payload the message payload + * @param headers the message headers + * @return a future that will be triggered when the message is successfully published + */ + default CompletableFuture sendAsync( + Optional key, byte[] payload, Map headers) { + return sendAsync(key, payload); + } } diff --git a/driver-kafka/kafka-compression-lz4.yaml b/driver-kafka/kafka-compression-lz4.yaml index 8fdccc982..60923d3d9 100644 --- a/driver-kafka/kafka-compression-lz4.yaml +++ b/driver-kafka/kafka-compression-lz4.yaml @@ -23,7 +23,7 @@ topicConfig: | min.insync.replicas=2 commonConfig: | - bootstrap.servers=localhost:9092 + bootstrap.servers=localhost:19092 producerConfig: | acks=all diff --git a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkProducer.java b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkProducer.java index 6c62d9fbc..0cb162bd8 100644 --- a/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkProducer.java +++ b/driver-kafka/src/main/java/io/openmessaging/benchmark/driver/kafka/KafkaBenchmarkProducer.java @@ -28,6 +28,8 @@ import io.openmessaging.benchmark.driver.BenchmarkProducer; +import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.producer.Producer; @@ -44,9 +46,12 @@ public KafkaBenchmarkProducer(Producer producer, String topic) { } @Override - public CompletableFuture sendAsync(Optional key, byte[] payload) { + public CompletableFuture sendAsync( + Optional key, byte[] payload, Map headers) { ProducerRecord record = new ProducerRecord<>(topic, key.orElse(null), payload); - + if (headers != null) { + headers.forEach((k, v) -> record.headers().add(k, v.getBytes(StandardCharsets.UTF_8))); + } CompletableFuture future = new CompletableFuture<>(); producer.send( @@ -62,6 +67,11 @@ public CompletableFuture sendAsync(Optional key, byte[] payload) { return future; } + @Override + public CompletableFuture sendAsync(Optional key, byte[] payload) { + return sendAsync(key, payload, null); + } + @Override public void close() throws Exception { producer.close(); diff --git a/payload/multiple/base b/payload/multiple/base new file mode 100644 index 000000000..b95490766 --- /dev/null +++ b/payload/multiple/base @@ -0,0 +1 @@ +fewhifewhufewhiufhew \ No newline at end of file diff --git a/payload/multiple/base64_with_headers.payload.yaml b/payload/multiple/base64_with_headers.payload.yaml new file mode 100644 index 000000000..359137f77 --- /dev/null +++ b/payload/multiple/base64_with_headers.payload.yaml @@ -0,0 +1,4 @@ +# Pass in a base64-encoded if you want JSON data along with custom headers. +data: eyJiaW0iOiJiYXIiLCAiYmFtIjoiYm9tIn0K +headers: + cool: beans \ No newline at end of file diff --git a/payload/multiple/base64nonjson_with_headers.payload.yaml b/payload/multiple/base64nonjson_with_headers.payload.yaml new file mode 100644 index 000000000..943afec51 --- /dev/null +++ b/payload/multiple/base64nonjson_with_headers.payload.yaml @@ -0,0 +1,4 @@ +# Pass in a base64-encoded if you want to send non-JSON data along with custom headers. +data: b2JzY3VyZV9zdHJpbmcK +headers: + cool: beans \ No newline at end of file diff --git a/payload/multiple/simple.payload.yaml b/payload/multiple/simple.payload.yaml new file mode 100644 index 000000000..4a33ba9af --- /dev/null +++ b/payload/multiple/simple.payload.yaml @@ -0,0 +1 @@ +data: "fooz_baz" \ No newline at end of file diff --git a/payload/multiple/with_headers.payload.yaml b/payload/multiple/with_headers.payload.yaml new file mode 100644 index 000000000..865b4b92e --- /dev/null +++ b/payload/multiple/with_headers.payload.yaml @@ -0,0 +1,4 @@ +data: "foo_bar" +headers: # Add your headers + header1: "value1" + header2: "value2" \ No newline at end of file diff --git a/payload/multiple/with_headers_and_json.payload.yaml b/payload/multiple/with_headers_and_json.payload.yaml new file mode 100644 index 000000000..d0210c972 --- /dev/null +++ b/payload/multiple/with_headers_and_json.payload.yaml @@ -0,0 +1,5 @@ +data: # You can send in structured data as well + fooz: baz +headers: + header1: "value1" + header2: "value2" \ No newline at end of file diff --git a/workloads/multi-payloads.yaml b/workloads/multi-payloads.yaml new file mode 100644 index 000000000..5a0860229 --- /dev/null +++ b/workloads/multi-payloads.yaml @@ -0,0 +1,29 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +name: Simple Workload 1 producer on 1 topic + +topics: 1 +partitionsPerTopic: 10 +messageSize: 16 +payloadFile: "payload/multiple" +subscriptionsPerTopic: 1 +producersPerTopic: 1 +consumerPerSubscription: 1 +producerRate: 10000 +consumerBacklogSizeGB: 0 +testDurationMinutes: 5 + +