16
16
*/
17
17
package org .apache .kafka .storage .internals .log ;
18
18
19
+ import org .apache .kafka .common .Uuid ;
20
+ import org .apache .kafka .common .config .TopicConfig ;
19
21
import org .apache .kafka .common .record .FileRecords ;
20
22
import org .apache .kafka .common .utils .Time ;
23
+ import org .apache .kafka .coordinator .transaction .TransactionLogConfig ;
24
+ import org .apache .kafka .server .config .ServerLogConfigs ;
25
+ import org .apache .kafka .server .log .remote .storage .RemoteLogManager ;
26
+ import org .apache .kafka .server .util .Scheduler ;
27
+ import org .apache .kafka .storage .log .metrics .BrokerTopicStats ;
21
28
22
29
import java .io .File ;
23
30
import java .io .IOException ;
31
+ import java .util .Optional ;
32
+ import java .util .Properties ;
33
+ import java .util .concurrent .ConcurrentHashMap ;
34
+ import java .util .concurrent .ConcurrentMap ;
24
35
25
36
public class LogTestUtils {
26
37
public static LogSegment createSegment (long offset , File logDir , int indexIntervalBytes , Time time ) throws IOException {
@@ -33,4 +44,179 @@ public static LogSegment createSegment(long offset, File logDir, int indexInterv
33
44
// Create and return the LogSegment instance
34
45
return new LogSegment (ms , idx , timeIdx , txnIndex , offset , indexIntervalBytes , 0 , time );
35
46
}
47
+
48
+ public static UnifiedLog createLog (File dir ,
49
+ LogConfig config ,
50
+ BrokerTopicStats brokerTopicStats ,
51
+ Scheduler scheduler ,
52
+ Time time ) throws IOException {
53
+ return createLog (
54
+ dir ,
55
+ config ,
56
+ brokerTopicStats ,
57
+ scheduler ,
58
+ time ,
59
+ 0L , // logStartOffset
60
+ 0L , // recoveryPoint
61
+ 5 * 60 * 1000 , // maxTransactionTimeoutMs
62
+ new ProducerStateManagerConfig (TransactionLogConfig .PRODUCER_ID_EXPIRATION_MS_DEFAULT , false ),
63
+ TransactionLogConfig .PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT , true , // lastShutdownClean
64
+ Optional .empty (), // topicId
65
+ new ConcurrentHashMap <>(), // numRemainingSegments
66
+ false , // remoteStorageSystemEnable
67
+ Optional .empty (), // remoteLogManager
68
+ LogOffsetsListener .NO_OP_OFFSETS_LISTENER
69
+ );
70
+ }
71
+
72
+ public static UnifiedLog createLog (File dir ,
73
+ LogConfig config ,
74
+ BrokerTopicStats brokerTopicStats ,
75
+ Scheduler scheduler ,
76
+ Time time ,
77
+ long logStartOffset ,
78
+ long recoveryPoint ,
79
+ int maxTransactionTimeoutMs ,
80
+ ProducerStateManagerConfig producerStateManagerConfig ,
81
+ int producerIdExpirationCheckIntervalMs ,
82
+ boolean lastShutdownClean ,
83
+ Optional <Uuid > topicId ,
84
+ ConcurrentMap <String , Integer > numRemainingSegments ,
85
+ boolean remoteStorageSystemEnable ,
86
+ Optional <RemoteLogManager > remoteLogManager ,
87
+ LogOffsetsListener logOffsetsListener ) throws IOException {
88
+ return UnifiedLog .create (
89
+ dir ,
90
+ config ,
91
+ logStartOffset ,
92
+ recoveryPoint ,
93
+ scheduler ,
94
+ brokerTopicStats ,
95
+ time ,
96
+ maxTransactionTimeoutMs ,
97
+ producerStateManagerConfig ,
98
+ producerIdExpirationCheckIntervalMs ,
99
+ new LogDirFailureChannel (10 ),
100
+ lastShutdownClean ,
101
+ topicId , // 直接傳入 Java Optional
102
+ numRemainingSegments ,
103
+ remoteStorageSystemEnable ,
104
+ logOffsetsListener
105
+ );
106
+ }
107
+
108
+ public static class LogConfigBuilder {
109
+ private long segmentMs = LogConfig .DEFAULT_SEGMENT_MS ;
110
+ private int segmentBytes = LogConfig .DEFAULT_SEGMENT_BYTES ;
111
+ private long retentionMs = LogConfig .DEFAULT_RETENTION_MS ;
112
+ private long localRetentionMs = LogConfig .DEFAULT_LOCAL_RETENTION_MS ;
113
+ private long retentionBytes = ServerLogConfigs .LOG_RETENTION_BYTES_DEFAULT ;
114
+ private long localRetentionBytes = LogConfig .DEFAULT_LOCAL_RETENTION_BYTES ;
115
+ private long segmentJitterMs = LogConfig .DEFAULT_SEGMENT_JITTER_MS ;
116
+ private String cleanupPolicy = ServerLogConfigs .LOG_CLEANUP_POLICY_DEFAULT ;
117
+ private int maxMessageBytes = ServerLogConfigs .MAX_MESSAGE_BYTES_DEFAULT ;
118
+ private int indexIntervalBytes = ServerLogConfigs .LOG_INDEX_INTERVAL_BYTES_DEFAULT ;
119
+ private int segmentIndexBytes = ServerLogConfigs .LOG_INDEX_SIZE_MAX_BYTES_DEFAULT ;
120
+ private long fileDeleteDelayMs = ServerLogConfigs .LOG_DELETE_DELAY_MS_DEFAULT ;
121
+ private boolean remoteLogStorageEnable = LogConfig .DEFAULT_REMOTE_STORAGE_ENABLE ;
122
+ private boolean remoteLogCopyDisable = false ;
123
+ private boolean remoteLogDeleteOnDisable = false ;
124
+
125
+ // 2. 為每個參數建立一個 "with" 方法,用於設定值並回傳 builder 本身 (fluent interface)
126
+ public LogConfigBuilder withSegmentMs (long segmentMs ) {
127
+ this .segmentMs = segmentMs ;
128
+ return this ;
129
+ }
130
+
131
+ public LogConfigBuilder withSegmentBytes (int segmentBytes ) {
132
+ this .segmentBytes = segmentBytes ;
133
+ return this ;
134
+ }
135
+
136
+ public LogConfigBuilder withRetentionMs (long retentionMs ) {
137
+ this .retentionMs = retentionMs ;
138
+ return this ;
139
+ }
140
+
141
+ public LogConfigBuilder withLocalRetentionMs (long localRetentionMs ) {
142
+ this .localRetentionMs = localRetentionMs ;
143
+ return this ;
144
+ }
145
+
146
+ public LogConfigBuilder withRetentionBytes (long retentionBytes ) {
147
+ this .retentionBytes = retentionBytes ;
148
+ return this ;
149
+ }
150
+
151
+ public LogConfigBuilder withLocalRetentionBytes (long localRetentionBytes ) {
152
+ this .localRetentionBytes = localRetentionBytes ;
153
+ return this ;
154
+ }
155
+
156
+ public LogConfigBuilder withSegmentJitterMs (long segmentJitterMs ) {
157
+ this .segmentJitterMs = segmentJitterMs ;
158
+ return this ;
159
+ }
160
+
161
+ public LogConfigBuilder withCleanupPolicy (String cleanupPolicy ) {
162
+ this .cleanupPolicy = cleanupPolicy ;
163
+ return this ;
164
+ }
165
+
166
+ public LogConfigBuilder withMaxMessageBytes (int maxMessageBytes ) {
167
+ this .maxMessageBytes = maxMessageBytes ;
168
+ return this ;
169
+ }
170
+
171
+ public LogConfigBuilder withIndexIntervalBytes (int indexIntervalBytes ) {
172
+ this .indexIntervalBytes = indexIntervalBytes ;
173
+ return this ;
174
+ }
175
+
176
+ public LogConfigBuilder withSegmentIndexBytes (int segmentIndexBytes ) {
177
+ this .segmentIndexBytes = segmentIndexBytes ;
178
+ return this ;
179
+ }
180
+
181
+ public LogConfigBuilder withFileDeleteDelayMs (long fileDeleteDelayMs ) {
182
+ this .fileDeleteDelayMs = fileDeleteDelayMs ;
183
+ return this ;
184
+ }
185
+
186
+ public LogConfigBuilder withRemoteLogStorageEnable (boolean remoteLogStorageEnable ) {
187
+ this .remoteLogStorageEnable = remoteLogStorageEnable ;
188
+ return this ;
189
+ }
190
+
191
+ public LogConfigBuilder withRemoteLogCopyDisable (boolean remoteLogCopyDisable ) {
192
+ this .remoteLogCopyDisable = remoteLogCopyDisable ;
193
+ return this ;
194
+ }
195
+
196
+ public LogConfigBuilder withRemoteLogDeleteOnDisable (boolean remoteLogDeleteOnDisable ) {
197
+ this .remoteLogDeleteOnDisable = remoteLogDeleteOnDisable ;
198
+ return this ;
199
+ }
200
+
201
+ // 3. 建立一個 build() 方法,它使用 builder 中設定的值來建立最終的 LogConfig 物件
202
+ public LogConfig build () {
203
+ Properties logProps = new Properties ();
204
+ logProps .put (TopicConfig .SEGMENT_MS_CONFIG , String .valueOf (segmentMs ));
205
+ logProps .put (LogConfig .INTERNAL_SEGMENT_BYTES_CONFIG , String .valueOf (segmentBytes ));
206
+ logProps .put (TopicConfig .RETENTION_MS_CONFIG , String .valueOf (retentionMs ));
207
+ logProps .put (TopicConfig .LOCAL_LOG_RETENTION_MS_CONFIG , String .valueOf (localRetentionMs ));
208
+ logProps .put (TopicConfig .RETENTION_BYTES_CONFIG , String .valueOf (retentionBytes ));
209
+ logProps .put (TopicConfig .LOCAL_LOG_RETENTION_BYTES_CONFIG , String .valueOf (localRetentionBytes ));
210
+ logProps .put (TopicConfig .SEGMENT_JITTER_MS_CONFIG , String .valueOf (segmentJitterMs ));
211
+ logProps .put (TopicConfig .CLEANUP_POLICY_CONFIG , cleanupPolicy );
212
+ logProps .put (TopicConfig .MAX_MESSAGE_BYTES_CONFIG , String .valueOf (maxMessageBytes ));
213
+ logProps .put (TopicConfig .INDEX_INTERVAL_BYTES_CONFIG , String .valueOf (indexIntervalBytes ));
214
+ logProps .put (TopicConfig .SEGMENT_INDEX_BYTES_CONFIG , String .valueOf (segmentIndexBytes ));
215
+ logProps .put (TopicConfig .FILE_DELETE_DELAY_MS_CONFIG , String .valueOf (fileDeleteDelayMs ));
216
+ logProps .put (TopicConfig .REMOTE_LOG_STORAGE_ENABLE_CONFIG , String .valueOf (remoteLogStorageEnable ));
217
+ logProps .put (TopicConfig .REMOTE_LOG_COPY_DISABLE_CONFIG , String .valueOf (remoteLogCopyDisable ));
218
+ logProps .put (TopicConfig .REMOTE_LOG_DELETE_ON_DISABLE_CONFIG , String .valueOf (remoteLogDeleteOnDisable ));
219
+ return new LogConfig (logProps );
220
+ }
221
+ }
36
222
}
0 commit comments