18
18
19
19
import org .apache .kafka .common .Uuid ;
20
20
import org .apache .kafka .common .config .TopicConfig ;
21
+ import org .apache .kafka .common .record .ControlRecordType ;
22
+ import org .apache .kafka .common .record .EndTransactionMarker ;
21
23
import org .apache .kafka .common .record .FileRecords ;
24
+ import org .apache .kafka .common .record .MemoryRecords ;
22
25
import org .apache .kafka .common .utils .Time ;
23
- import org .apache .kafka .coordinator . transaction . TransactionLogConfig ;
26
+ import org .apache .kafka .server . common . RequestLocal ;
24
27
import org .apache .kafka .server .config .ServerLogConfigs ;
25
28
import org .apache .kafka .server .util .Scheduler ;
26
29
import org .apache .kafka .storage .log .metrics .BrokerTopicStats ;
29
32
import java .io .IOException ;
30
33
import java .util .Optional ;
31
34
import java .util .Properties ;
32
- import java .util .concurrent .ConcurrentHashMap ;
33
35
import java .util .concurrent .ConcurrentMap ;
34
36
35
37
public class LogTestUtils {
@@ -44,27 +46,27 @@ public static LogSegment createSegment(long offset, File logDir, int indexInterv
44
46
return new LogSegment (ms , idx , timeIdx , txnIndex , offset , indexIntervalBytes , 0 , time );
45
47
}
46
48
47
- public static UnifiedLog createLog ( File dir ,
48
- LogConfig config ,
49
- BrokerTopicStats brokerTopicStats ,
50
- Scheduler scheduler ,
51
- Time time ) throws IOException {
52
- return createLog (
53
- dir ,
54
- config ,
55
- brokerTopicStats ,
56
- scheduler ,
57
- time ,
58
- 0L , // logStartOffset
59
- 0L , // recoveryPoint
60
- 5 * 60 * 1000 , // maxTransactionTimeoutMs
61
- new ProducerStateManagerConfig ( TransactionLogConfig . PRODUCER_ID_EXPIRATION_MS_DEFAULT , false ) ,
62
- TransactionLogConfig . PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT , true , // lastShutdownClean
63
- Optional . empty (), // topicId
64
- new ConcurrentHashMap <>(), // numRemainingSegments
65
- false , // remoteStorageSystemEnable
66
- LogOffsetsListener . NO_OP_OFFSETS_LISTENER
67
- );
49
+ public static LogAppendInfo appendEndTxnMarkerAsLeader ( UnifiedLog log ,
50
+ long producerId ,
51
+ short producerEpoch ,
52
+ ControlRecordType controlType ,
53
+ long timestamp ,
54
+ int coordinatorEpoch ,
55
+ int leaderEpoch ) {
56
+ MemoryRecords records = endTxnRecords ( controlType , producerId , producerEpoch , 0L , coordinatorEpoch , leaderEpoch , timestamp );
57
+
58
+ return log . appendAsLeader ( records , leaderEpoch , AppendOrigin . COORDINATOR , RequestLocal . noCaching (), VerificationGuard . SENTINEL );
59
+ }
60
+
61
+ public static MemoryRecords endTxnRecords ( ControlRecordType controlRecordType ,
62
+ long producerId ,
63
+ short epoch ,
64
+ long offset ,
65
+ int coordinatorEpoch ,
66
+ int partitionLeaderEpoch ,
67
+ long timestamp ) {
68
+ EndTransactionMarker marker = new EndTransactionMarker ( controlRecordType , coordinatorEpoch );
69
+ return MemoryRecords . withEndTransactionMarker ( offset , timestamp , partitionLeaderEpoch , producerId , epoch , marker );
68
70
}
69
71
70
72
@ SuppressWarnings ("ParameterNumber" )
@@ -195,7 +197,6 @@ public LogConfigBuilder withRemoteLogDeleteOnDisable(boolean remoteLogDeleteOnDi
195
197
return this ;
196
198
}
197
199
198
- // 3. 建立一個 build() 方法,它使用 builder 中設定的值來建立最終的 LogConfig 物件
199
200
public LogConfig build () {
200
201
Properties logProps = new Properties ();
201
202
logProps .put (TopicConfig .SEGMENT_MS_CONFIG , String .valueOf (segmentMs ));
0 commit comments