2222import org .apache .iotdb .commons .pipe .event .EnrichedEvent ;
2323import org .apache .iotdb .db .pipe .event .common .tablet .PipeInsertNodeTabletInsertionEvent ;
2424import org .apache .iotdb .db .pipe .event .common .tablet .PipeRawTabletInsertionEvent ;
25+ import org .apache .iotdb .db .pipe .resource .memory .PipeMemoryWeightUtil ;
2526import org .apache .iotdb .db .pipe .sink .payload .evolvable .request .PipeTransferTabletBatchReqV2 ;
2627import org .apache .iotdb .db .queryengine .plan .planner .plan .node .write .InsertNode ;
27- import org .apache .iotdb .db .storageengine . dataregion . wal . exception . WALPipeException ;
28+ import org .apache .iotdb .db .queryengine . plan . planner . plan . node . write . RelationalInsertTabletNode ;
2829import org .apache .iotdb .pipe .api .event .dml .insertion .TabletInsertionEvent ;
2930
3031import org .apache .tsfile .utils .Pair ;
3132import org .apache .tsfile .utils .PublicBAOS ;
33+ import org .apache .tsfile .utils .RamUsageEstimator ;
3234import org .apache .tsfile .utils .ReadWriteIOUtils ;
35+ import org .apache .tsfile .write .record .Tablet ;
3336
3437import java .io .DataOutputStream ;
3538import java .io .IOException ;
3841import java .util .HashMap ;
3942import java .util .List ;
4043import java .util .Map ;
41- import java .util .Objects ;
44+ import java .util .concurrent . atomic . AtomicLong ;
4245
4346public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
4447
@@ -51,13 +54,13 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
5154 private final List <String > insertNodeDataBases = new ArrayList <>();
5255 private final List <String > tabletDataBases = new ArrayList <>();
5356
57+ // database -> tableName -> Pair<size, tablets to batch>
58+ private final Map <String , Map <String , Pair <Integer , List <Tablet >>>> tableModelTabletMap =
59+ new HashMap <>();
60+
5461 // Used to rate limit when transferring data
5562 private final Map <Pair <String , Long >, Long > pipe2BytesAccumulated = new HashMap <>();
5663
57- PipeTabletEventPlainBatch (final int maxDelayInMs , final long requestMaxBatchSizeInBytes ) {
58- super (maxDelayInMs , requestMaxBatchSizeInBytes , null );
59- }
60-
6164 PipeTabletEventPlainBatch (
6265 final int maxDelayInMs ,
6366 final long requestMaxBatchSizeInBytes ,
@@ -66,9 +69,8 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
6669 }
6770
6871 @ Override
69- protected boolean constructBatch (final TabletInsertionEvent event )
70- throws WALPipeException , IOException {
71- final int bufferSize = buildTabletInsertionBuffer (event );
72+ protected boolean constructBatch (final TabletInsertionEvent event ) throws IOException {
73+ final long bufferSize = buildTabletInsertionBuffer (event );
7274 totalBufferSize += bufferSize ;
7375 pipe2BytesAccumulated .compute (
7476 new Pair <>(
@@ -89,11 +91,45 @@ public synchronized void onSuccess() {
8991 binaryDataBases .clear ();
9092 insertNodeDataBases .clear ();
9193 tabletDataBases .clear ();
94+ tableModelTabletMap .clear ();
9295
9396 pipe2BytesAccumulated .clear ();
9497 }
9598
9699 public PipeTransferTabletBatchReqV2 toTPipeTransferReq () throws IOException {
100+ for (final Map .Entry <String , Map <String , Pair <Integer , List <Tablet >>>> insertTablets :
101+ tableModelTabletMap .entrySet ()) {
102+ final String databaseName = insertTablets .getKey ();
103+ for (final Map .Entry <String , Pair <Integer , List <Tablet >>> tabletEntry :
104+ insertTablets .getValue ().entrySet ()) {
105+ final List <Tablet > batchTablets = new ArrayList <>();
106+ for (final Tablet tablet : tabletEntry .getValue ().getRight ()) {
107+ boolean success = false ;
108+ for (final Tablet batchTablet : batchTablets ) {
109+ if (batchTablet .append (tablet , tabletEntry .getValue ().getLeft ())) {
110+ success = true ;
111+ break ;
112+ }
113+ }
114+ if (!success ) {
115+ batchTablets .add (tablet );
116+ }
117+ }
118+ for (final Tablet batchTablet : batchTablets ) {
119+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS ();
120+ final DataOutputStream outputStream = new DataOutputStream (byteArrayOutputStream )) {
121+ batchTablet .serialize (outputStream );
122+ ReadWriteIOUtils .write (true , outputStream );
123+ tabletBuffers .add (
124+ ByteBuffer .wrap (byteArrayOutputStream .getBuf (), 0 , byteArrayOutputStream .size ()));
125+ }
126+ tabletDataBases .add (databaseName );
127+ }
128+ }
129+ }
130+
131+ tableModelTabletMap .clear ();
132+
97133 return PipeTransferTabletBatchReqV2 .toTPipeTransferReq (
98134 binaryBuffers ,
99135 insertNodeBuffers ,
@@ -111,57 +147,71 @@ public Map<Pair<String, Long>, Long> getPipe2BytesAccumulated() {
111147 return pipe2BytesAccumulated ;
112148 }
113149
114- private int buildTabletInsertionBuffer (final TabletInsertionEvent event )
115- throws IOException , WALPipeException {
116- int databaseEstimateSize = 0 ;
150+ private long buildTabletInsertionBuffer (final TabletInsertionEvent event ) throws IOException {
151+ long estimateSize = 0 ;
117152 final ByteBuffer buffer ;
118153 if (event instanceof PipeInsertNodeTabletInsertionEvent ) {
119154 final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent =
120155 (PipeInsertNodeTabletInsertionEvent ) event ;
121- // Read the bytebuffer from the wal file and transfer it directly without serializing or
122- // deserializing if possible
123156 final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent .getInsertNode ();
124- if (Objects .isNull (insertNode )) {
125- buffer = pipeInsertNodeTabletInsertionEvent .getByteBuffer ();
126- binaryBuffers .add (buffer );
127- if (pipeInsertNodeTabletInsertionEvent .isTableModelEvent ()) {
128- databaseEstimateSize =
129- pipeInsertNodeTabletInsertionEvent .getTableModelDatabaseName ().length ();
130- binaryDataBases .add (pipeInsertNodeTabletInsertionEvent .getTableModelDatabaseName ());
131- } else {
132- databaseEstimateSize = 4 ;
133- binaryDataBases .add (TREE_MODEL_DATABASE_PLACEHOLDER );
134- }
135- } else {
157+ if (!(insertNode instanceof RelationalInsertTabletNode )) {
136158 buffer = insertNode .serializeToByteBuffer ();
137159 insertNodeBuffers .add (buffer );
138160 if (pipeInsertNodeTabletInsertionEvent .isTableModelEvent ()) {
139- databaseEstimateSize =
140- pipeInsertNodeTabletInsertionEvent .getTableModelDatabaseName ().length ();
161+ estimateSize =
162+ RamUsageEstimator .sizeOf (
163+ pipeInsertNodeTabletInsertionEvent .getTableModelDatabaseName ());
141164 insertNodeDataBases .add (pipeInsertNodeTabletInsertionEvent .getTableModelDatabaseName ());
142165 } else {
143- databaseEstimateSize = 4 ;
166+ estimateSize = 4 ;
144167 insertNodeDataBases .add (TREE_MODEL_DATABASE_PLACEHOLDER );
145168 }
169+ estimateSize += buffer .limit ();
170+ } else {
171+ for (final Tablet tablet :
172+ ((PipeInsertNodeTabletInsertionEvent ) event ).convertToTablets ()) {
173+ estimateSize +=
174+ constructTabletBatch (
175+ tablet , pipeInsertNodeTabletInsertionEvent .getTableModelDatabaseName ());
176+ }
146177 }
147178 } else {
148179 final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
149180 (PipeRawTabletInsertionEvent ) event ;
150- try (final PublicBAOS byteArrayOutputStream = new PublicBAOS ();
151- final DataOutputStream outputStream = new DataOutputStream (byteArrayOutputStream )) {
152- pipeRawTabletInsertionEvent .convertToTablet ().serialize (outputStream );
153- ReadWriteIOUtils .write (pipeRawTabletInsertionEvent .isAligned (), outputStream );
154- buffer = ByteBuffer .wrap (byteArrayOutputStream .getBuf (), 0 , byteArrayOutputStream .size ());
155- }
156- tabletBuffers .add (buffer );
157181 if (pipeRawTabletInsertionEvent .isTableModelEvent ()) {
158- databaseEstimateSize = pipeRawTabletInsertionEvent .getTableModelDatabaseName ().length ();
159- tabletDataBases .add (pipeRawTabletInsertionEvent .getTableModelDatabaseName ());
182+ estimateSize =
183+ constructTabletBatch (
184+ pipeRawTabletInsertionEvent .convertToTablet (),
185+ pipeRawTabletInsertionEvent .getTableModelDatabaseName ());
160186 } else {
161- databaseEstimateSize = 4 ;
187+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS ();
188+ final DataOutputStream outputStream = new DataOutputStream (byteArrayOutputStream )) {
189+ pipeRawTabletInsertionEvent .convertToTablet ().serialize (outputStream );
190+ ReadWriteIOUtils .write (pipeRawTabletInsertionEvent .isAligned (), outputStream );
191+ buffer = ByteBuffer .wrap (byteArrayOutputStream .getBuf (), 0 , byteArrayOutputStream .size ());
192+ }
193+ estimateSize = 4 + buffer .limit ();
194+ tabletBuffers .add (buffer );
162195 tabletDataBases .add (TREE_MODEL_DATABASE_PLACEHOLDER );
163196 }
164197 }
165- return buffer .limit () + databaseEstimateSize ;
198+
199+ return estimateSize ;
200+ }
201+
202+ private long constructTabletBatch (final Tablet tablet , final String databaseName ) {
203+ final AtomicLong size = new AtomicLong (0 );
204+ final Pair <Integer , List <Tablet >> currentBatch =
205+ tableModelTabletMap
206+ .computeIfAbsent (
207+ databaseName ,
208+ k -> {
209+ size .addAndGet (RamUsageEstimator .sizeOf (databaseName ));
210+ return new HashMap <>();
211+ })
212+ .computeIfAbsent (tablet .getTableName (), k -> new Pair <>(0 , new ArrayList <>()));
213+ currentBatch .setLeft (currentBatch .getLeft () + tablet .getRowSize ());
214+ currentBatch .getRight ().add (tablet );
215+ return PipeMemoryWeightUtil .calculateTabletSizeInBytes (tablet ) + 4 ;
166216 }
167217}
0 commit comments