@@ -102,6 +102,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
102
102
103
103
private final AbortedTxnProcessor snapshotAbortedTxnProcessor ;
104
104
105
+ private final AbortedTxnProcessor .SnapshotType snapshotType ;
105
106
private final MaxReadPositionCallBack maxReadPositionCallBack ;
106
107
107
108
private static AbortedTxnProcessor createSnapshotProcessor (PersistentTopic topic ) {
@@ -110,13 +111,19 @@ private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic
110
111
: new SingleSnapshotAbortedTxnProcessorImpl (topic );
111
112
}
112
113
114
+ private static AbortedTxnProcessor .SnapshotType determineSnapshotType (PersistentTopic topic ) {
115
+ return topic .getBrokerService ().getPulsar ().getConfiguration ().isTransactionBufferSegmentedSnapshotEnabled ()
116
+ ? AbortedTxnProcessor .SnapshotType .Segment
117
+ : AbortedTxnProcessor .SnapshotType .Single ;
118
+ }
113
119
114
120
public TopicTransactionBuffer (PersistentTopic topic ) {
115
- this (topic , createSnapshotProcessor (topic ));
121
+ this (topic , createSnapshotProcessor (topic ), determineSnapshotType ( topic ) );
116
122
}
117
123
118
124
@ VisibleForTesting
119
- TopicTransactionBuffer (PersistentTopic topic , AbortedTxnProcessor snapshotAbortedTxnProcessor ) {
125
+ TopicTransactionBuffer (PersistentTopic topic , AbortedTxnProcessor snapshotAbortedTxnProcessor ,
126
+ AbortedTxnProcessor .SnapshotType snapshotType ) {
120
127
super (State .None );
121
128
this .topic = topic ;
122
129
this .timer = topic .getBrokerService ().getPulsar ().getTransactionTimer ();
@@ -126,6 +133,7 @@ public TopicTransactionBuffer(PersistentTopic topic) {
126
133
.getConfiguration ().getTransactionBufferSnapshotMinTimeInMillis ();
127
134
this .maxReadPosition = (PositionImpl ) topic .getManagedLedger ().getLastConfirmedEntry ();
128
135
this .snapshotAbortedTxnProcessor = snapshotAbortedTxnProcessor ;
136
+ this .snapshotType = snapshotType ;
129
137
this .maxReadPositionCallBack = topic .getMaxReadPositionCallBack ();
130
138
this .recover ();
131
139
}
0 commit comments