Skip to content
Open
87 changes: 87 additions & 0 deletions clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.feature.Features;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiMessageType;
Expand All @@ -31,6 +32,12 @@
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.ByteBufferChannel;
Expand Down Expand Up @@ -742,4 +749,84 @@ public static ApiVersionsResponse createApiVersionsResponse(
setZkMigrationEnabled(zkMigrationEnabled).
build();
}

public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
return singletonRecords(value, key, Compression.NONE, RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE);
}

public static MemoryRecords singletonRecords(byte[] value, long timestamp) {
return singletonRecords(value, null, Compression.NONE, timestamp, RecordBatch.CURRENT_MAGIC_VALUE);
}

public static MemoryRecords singletonRecords(
byte[] value
) {
return records(List.of(new SimpleRecord(RecordBatch.NO_TIMESTAMP, null, value)),
RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE,
0,
RecordBatch.NO_PARTITION_LEADER_EPOCH
);
}

public static MemoryRecords singletonRecords(
byte[] value,
byte[] key,
Compression codec,
long timestamp,
byte magicValue
) {
return records(List.of(new SimpleRecord(timestamp, key, value)),
magicValue, codec,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE,
0,
RecordBatch.NO_PARTITION_LEADER_EPOCH
);
}

public static MemoryRecords singletonRecords(byte[] value, byte[] key, long timestamp) {
return singletonRecords(value, key, Compression.NONE, timestamp, RecordBatch.CURRENT_MAGIC_VALUE);
}

public static MemoryRecords records(List<SimpleRecord> records) {
return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH);
}

public static MemoryRecords records(List<SimpleRecord> records, long baseOffset) {
return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH);
}

public static MemoryRecords records(List<SimpleRecord> records, long baseOffset, int partitionLeaderEpoch) {
return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch);
}

public static MemoryRecords records(List<SimpleRecord> records, byte magicValue, Compression compression) {
return records(records, magicValue, compression, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH);
}

public static MemoryRecords records(List<SimpleRecord> records,
byte magicValue,
Compression compression,
long producerId,
short producerEpoch,
int sequence,
long baseOffset,
int partitionLeaderEpoch) {
ByteBuffer buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, compression, TimestampType.CREATE_TIME, baseOffset,
System.currentTimeMillis(), producerId, producerEpoch, sequence, false, partitionLeaderEpoch);
for (SimpleRecord record : records) {
builder.append(record);
}
return builder.build();
}
}
Loading
Loading