Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import io.openmessaging.benchmark.utils.PaddingDecimalFormat;
import io.openmessaging.benchmark.utils.RandomGenerator;
import io.openmessaging.benchmark.utils.Timer;
import io.openmessaging.benchmark.utils.payload.FilePayloadReader;
import io.openmessaging.benchmark.utils.payload.PayloadReader;
import io.openmessaging.benchmark.worker.Worker;
import io.openmessaging.benchmark.worker.commands.ConsumerAssignment;
import io.openmessaging.benchmark.worker.commands.CountersStats;
import io.openmessaging.benchmark.worker.commands.CumulativeLatencies;
import io.openmessaging.benchmark.worker.commands.Payload;
import io.openmessaging.benchmark.worker.commands.PeriodStats;
import io.openmessaging.benchmark.worker.commands.ProducerWorkAssignment;
import io.openmessaging.benchmark.worker.commands.TopicSubscription;
Expand Down Expand Up @@ -95,8 +95,6 @@ public TestResult run() throws Exception {
});
}

final PayloadReader payloadReader = new FilePayloadReader(workload.messageSize);

ProducerWorkAssignment producerWorkAssignment = new ProducerWorkAssignment();
producerWorkAssignment.keyDistributorType = workload.keyDistributor;
producerWorkAssignment.publishRate = targetPublishRate;
Expand All @@ -113,10 +111,11 @@ public TestResult run() throws Exception {
r.nextBytes(randArray);
byte[] zerodArray = new byte[zerodBytes];
byte[] combined = ArrayUtils.addAll(randArray, zerodArray);
producerWorkAssignment.payloadData.add(combined);
producerWorkAssignment.payloadData.add(new Payload(combined));
}
} else {
producerWorkAssignment.payloadData.add(payloadReader.load(workload.payloadFile));
final PayloadReader payloadReader = new PayloadReader(workload.messageSize);
producerWorkAssignment.payloadData = payloadReader.load(workload.payloadFile);
}

worker.startLoad(producerWorkAssignment);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,7 +26,94 @@
*/
package io.openmessaging.benchmark.utils.payload;

public interface PayloadReader {
import static java.nio.file.Files.readAllBytes;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.openmessaging.benchmark.worker.commands.Payload;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PayloadReader {
private static final Logger log = LoggerFactory.getLogger(PayloadReader.class);
public final int expectedLength;
private static final ObjectMapper mapper =
new ObjectMapper(
new YAMLFactory().configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false))
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

static {
mapper.enable(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE);
}

public PayloadReader(int expectedLength) {
this.expectedLength = expectedLength;
}

private void checkPayloadLength(List<Payload> payload) {
if (payload.size() == 1 && payload.get(0).data.length != expectedLength) {
throw new PayloadException(
MessageFormat.format(
"payload length mismatch. Actual payload size is: {0}, but expected: {1} ",
payload.get(0).data.length, expectedLength));
}
int total = 0;
for (Payload p : payload) {
total += p.data.length;
}
int avg = total / payload.size();
int expect10p = (int) (expectedLength * 0.1);
if (expectedLength - expect10p < avg || expectedLength + expect10p > avg) {
log.warn(
"Average payload length {} differs from expected length {} by over 10% "
+ "this means that throughput target maybe incorrect",
avg, expectedLength);
}
}

byte[] load(String resourceName);
public List<Payload> load(String payloadFile) {
List<Payload> out = new ArrayList<>();
try {
File f = new File(payloadFile);
if (Files.isDirectory(f.toPath())) {
File[] files = f.listFiles();
if (files == null) {
throw new PayloadException("list files returned null for file " + payloadFile);
}
for (File file : files) {
if (!file.isFile()) {
continue;
}
if (file.getName().endsWith(".payload.yaml")) {
Payload p = mapper.readValue(file, Payload.class);
log.info(
"Loaded payload from file file='{}', headers='{}', data='{}'",
file.getAbsolutePath(),
p.headers,
new String(p.data, StandardCharsets.UTF_8));
out.add(p);
} else {
byte[] payload = readAllBytes(file.toPath());
out.add(new Payload(payload));
}
}
} else {
byte[] payload = readAllBytes(f.toPath());
out.add(new Payload(payload));
}
checkPayloadLength(out);
return out;
} catch (IOException e) {
throw new PayloadException(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.openmessaging.benchmark.worker.commands.ConsumerAssignment;
import io.openmessaging.benchmark.worker.commands.CountersStats;
import io.openmessaging.benchmark.worker.commands.CumulativeLatencies;
import io.openmessaging.benchmark.worker.commands.Payload;
import io.openmessaging.benchmark.worker.commands.PeriodStats;
import io.openmessaging.benchmark.worker.commands.ProducerWorkAssignment;
import io.openmessaging.benchmark.worker.commands.TopicsInfo;
Expand Down Expand Up @@ -194,7 +195,7 @@ public void probeProducers() throws IOException {
}

private void submitProducersToExecutor(
List<BenchmarkProducer> producers, KeyDistributor keyDistributor, List<byte[]> payloads) {
List<BenchmarkProducer> producers, KeyDistributor keyDistributor, List<Payload> payloads) {
ThreadLocalRandom r = ThreadLocalRandom.current();
int payloadCount = payloads.size();
executor.submit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.openmessaging.benchmark.driver.BenchmarkProducer;
import io.openmessaging.benchmark.utils.UniformRateLimiter;
import io.openmessaging.benchmark.worker.commands.Payload;
import java.util.Optional;
import java.util.function.Supplier;
import org.slf4j.Logger;
Expand All @@ -38,13 +39,13 @@ public class MessageProducer {
this.stats = stats;
}

public void sendMessage(BenchmarkProducer producer, Optional<String> key, byte[] payload) {
public void sendMessage(BenchmarkProducer producer, Optional<String> key, Payload payload) {
final long intendedSendTime = rateLimiter.acquire();
uninterruptibleSleepNs(intendedSendTime);
final long sendTime = nanoClock.get();
producer
.sendAsync(key, payload)
.thenRun(() -> success(payload.length, intendedSendTime, sendTime))
.sendAsync(key, payload.data, payload.headers)
.thenRun(() -> success(payload.data.length, intendedSendTime, sendTime))
.exceptionally(this::failure);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private void handleStartLoad(Context ctx) throws Exception {
log.info(
"Start load publish-rate: {} msg/s -- payload-size: {}",
producerWorkAssignment.publishRate,
producerWorkAssignment.payloadData.get(0).length);
producerWorkAssignment.payloadData.get(0).data.length);

localWorker.startLoad(producerWorkAssignment);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.benchmark.worker.commands;


import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.json.JsonMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import javax.annotation.Nullable;

public class Payload {
@JsonDeserialize(using = FlexibleByteArrayDeserializer.class)
public byte[] data;

@Nullable public Map<String, String> headers;

public Payload() {}

public Payload(byte[] data) {
this.data = data;
}
}

class FlexibleByteArrayDeserializer extends JsonDeserializer<byte[]> {
@Override
public byte[] deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
JsonToken token = p.currentToken();

if (token == JsonToken.VALUE_STRING) {
try {
return Base64.getDecoder().decode(p.getValueAsString());
} catch (IllegalArgumentException e) {
// Not Base64, treat as UTF-8 string
}
return p.getValueAsString().getBytes(StandardCharsets.UTF_8);
} else if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) {
// Handle as JSON object/array - serialize it
JsonNode node = p.readValueAsTree();
return new JsonMapper().writeValueAsBytes(node);
}

throw new IOException("Cannot deserialize byte[] from " + token);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class ProducerWorkAssignment {

public List<byte[]> payloadData;
public List<Payload> payloadData;

public double publishRate;

Expand Down
Loading