16
16
*/
17
17
package org .apache .kafka .common .test ;
18
18
19
- import org .apache .kafka .common .compress .Compression ;
20
19
import org .apache .kafka .common .errors .TimeoutException ;
21
- import org .apache .kafka .common .record .DefaultRecordBatch ;
22
- import org .apache .kafka .common .record .MemoryRecords ;
23
- import org .apache .kafka .common .record .MemoryRecordsBuilder ;
24
- import org .apache .kafka .common .record .RecordBatch ;
25
- import org .apache .kafka .common .record .SimpleRecord ;
26
- import org .apache .kafka .common .record .TimestampType ;
27
20
import org .apache .kafka .common .utils .Exit ;
28
21
import org .apache .kafka .common .utils .Utils ;
29
22
32
25
33
26
import java .io .File ;
34
27
import java .io .IOException ;
35
- import java .nio .ByteBuffer ;
36
28
import java .nio .file .Files ;
37
- import java .util .List ;
38
29
import java .util .Random ;
39
30
import java .util .function .Supplier ;
40
31
@@ -57,7 +48,6 @@ class TestUtils {
57
48
58
49
private static final long DEFAULT_POLL_INTERVAL_MS = 100 ;
59
50
private static final long DEFAULT_MAX_WAIT_MS = 15_000 ;
60
- private static final Random RANDOM = new Random ();
61
51
62
52
/**
63
53
* Create an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the
@@ -155,97 +145,4 @@ public static void waitForCondition(final Supplier<Boolean> testCondition,
155
145
String conditionDetails ) throws InterruptedException {
156
146
waitForCondition (testCondition , maxWaitMs , () -> conditionDetails );
157
147
}
158
-
159
- public static File randomPartitionLogDir (File parentDir ) {
160
- int attempts = 1000 ;
161
- while (attempts > 0 ) {
162
- File f = new File (parentDir , "kafka-" + RANDOM .nextInt (1000000 ));
163
- if (f .mkdir ()) {
164
- f .deleteOnExit ();
165
- return f ;
166
- }
167
- attempts --;
168
- }
169
- throw new RuntimeException ("Failed to create directory after 1000 attempts" );
170
- }
171
-
172
- public static MemoryRecords singletonRecords (byte [] value , byte [] key ) {
173
- return singletonRecords (value , key , Compression .NONE , RecordBatch .NO_TIMESTAMP , RecordBatch .CURRENT_MAGIC_VALUE );
174
- }
175
-
176
- public static MemoryRecords singletonRecords (byte [] value , long timestamp ) {
177
- return singletonRecords (value , null , Compression .NONE , timestamp , RecordBatch .CURRENT_MAGIC_VALUE );
178
- }
179
-
180
- public static MemoryRecords singletonRecords (
181
- byte [] value
182
- ) {
183
- return records (List .of (new SimpleRecord (RecordBatch .NO_TIMESTAMP , null , value )),
184
- RecordBatch .CURRENT_MAGIC_VALUE ,
185
- Compression .NONE ,
186
- RecordBatch .NO_PRODUCER_ID ,
187
- RecordBatch .NO_PRODUCER_EPOCH ,
188
- RecordBatch .NO_SEQUENCE ,
189
- 0 ,
190
- RecordBatch .NO_PARTITION_LEADER_EPOCH
191
- );
192
- }
193
-
194
- public static MemoryRecords singletonRecords (
195
- byte [] value ,
196
- byte [] key ,
197
- Compression codec ,
198
- long timestamp ,
199
- byte magicValue
200
- ) {
201
- return records (List .of (new SimpleRecord (timestamp , key , value )),
202
- magicValue , codec ,
203
- RecordBatch .NO_PRODUCER_ID ,
204
- RecordBatch .NO_PRODUCER_EPOCH ,
205
- RecordBatch .NO_SEQUENCE ,
206
- 0 ,
207
- RecordBatch .NO_PARTITION_LEADER_EPOCH
208
- );
209
- }
210
-
211
- public static MemoryRecords singletonRecords (byte [] value , byte [] key , long timestamp ) {
212
- return singletonRecords (value , key , Compression .NONE , timestamp , RecordBatch .CURRENT_MAGIC_VALUE );
213
- }
214
-
215
- public static MemoryRecords records (List <SimpleRecord > records ) {
216
- return records (records , RecordBatch .CURRENT_MAGIC_VALUE , Compression .NONE , RecordBatch .NO_PRODUCER_ID ,
217
- RecordBatch .NO_PRODUCER_EPOCH , RecordBatch .NO_SEQUENCE , 0L , RecordBatch .NO_PARTITION_LEADER_EPOCH );
218
- }
219
-
220
- public static MemoryRecords records (List <SimpleRecord > records , long baseOffset ) {
221
- return records (records , RecordBatch .CURRENT_MAGIC_VALUE , Compression .NONE , RecordBatch .NO_PRODUCER_ID ,
222
- RecordBatch .NO_PRODUCER_EPOCH , RecordBatch .NO_SEQUENCE , baseOffset , RecordBatch .NO_PARTITION_LEADER_EPOCH );
223
- }
224
-
225
- public static MemoryRecords records (List <SimpleRecord > records , long baseOffset , int partitionLeaderEpoch ) {
226
- return records (records , RecordBatch .CURRENT_MAGIC_VALUE , Compression .NONE , RecordBatch .NO_PRODUCER_ID ,
227
- RecordBatch .NO_PRODUCER_EPOCH , RecordBatch .NO_SEQUENCE , baseOffset , partitionLeaderEpoch );
228
- }
229
-
230
- public static MemoryRecords records (List <SimpleRecord > records , byte magicValue , Compression compression ) {
231
- return records (records , magicValue , compression , RecordBatch .NO_PRODUCER_ID ,
232
- RecordBatch .NO_PRODUCER_EPOCH , RecordBatch .NO_SEQUENCE , 0L , RecordBatch .NO_PARTITION_LEADER_EPOCH );
233
- }
234
-
235
- public static MemoryRecords records (List <SimpleRecord > records ,
236
- byte magicValue ,
237
- Compression compression ,
238
- long producerId ,
239
- short producerEpoch ,
240
- int sequence ,
241
- long baseOffset ,
242
- int partitionLeaderEpoch ) {
243
- ByteBuffer buf = ByteBuffer .allocate (DefaultRecordBatch .sizeInBytes (records ));
244
- MemoryRecordsBuilder builder = MemoryRecords .builder (buf , magicValue , compression , TimestampType .CREATE_TIME , baseOffset ,
245
- System .currentTimeMillis (), producerId , producerEpoch , sequence , false , partitionLeaderEpoch );
246
- for (SimpleRecord record : records ) {
247
- builder .append (record );
248
- }
249
- return builder .build ();
250
- }
251
148
}
0 commit comments