Skip to content

Commit 127e3ca

Browse files
committed
Clean up producer code
1 parent 817d57a commit 127e3ca

File tree

1 file changed

+1
-136
lines changed

1 file changed

+1
-136
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

Lines changed: 1 addition & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ class StreamProducer implements Producer {
5656
private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {};
5757
private final long id;
5858
private final MessageAccumulator accumulator;
59-
// private final DynamicBatch<Object> dynamicBatch;
60-
private final Clock clock;
6159
private final ToLongFunction<Message> accumulatorPublishSequenceFunction;
6260
// FIXME investigate a more optimized data structure to handle pending messages
6361
private final ConcurrentMap<Long, AccumulatedEntity> unconfirmedMessages;
@@ -119,41 +117,14 @@ class StreamProducer implements Producer {
119117
return publishingSequence.getAndIncrement();
120118
}
121119
};
122-
this.clock = environment.clock();
123120

124121
if (subEntrySize <= 1) {
125-
// this.accumulator =
126-
// new SimpleMessageAccumulator(
127-
// batchSize,
128-
// environment.codec(),
129-
// client.maxFrameSize(),
130-
// accumulatorPublishSequenceFunction,
131-
// filterValueExtractor,
132-
// this.environment.clock(),
133-
// stream,
134-
// this.environment.observationCollector(),
135-
// this);
136122
if (filterValueExtractor == null) {
137123
delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK;
138124
} else {
139125
delegateWriteCallback = OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK;
140126
}
141127
} else {
142-
// this.accumulator =
143-
// new SubEntryMessageAccumulator(
144-
// subEntrySize,
145-
// batchSize,
146-
// compression == Compression.NONE
147-
// ? null
148-
// : environment.compressionCodecFactory().get(compression),
149-
// environment.codec(),
150-
// this.environment.byteBufAllocator(),
151-
// client.maxFrameSize(),
152-
// accumulatorPublishSequenceFunction,
153-
// this.environment.clock(),
154-
// stream,
155-
// environment.observationCollector(),
156-
// this);
157128
delegateWriteCallback = Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK;
158129
}
159130

@@ -217,88 +188,6 @@ public int fragmentLength(Object entity) {
217188
environment.observationCollector(),
218189
this);
219190

220-
/*
221-
if (subEntrySize <= 1) {
222-
this.dynamicBatch =
223-
new DynamicBatch<>(
224-
items -> {
225-
client.publishInternal(
226-
this.publishVersion,
227-
this.publisherId,
228-
items,
229-
this.writeCallback,
230-
this.publishSequenceFunction);
231-
},
232-
batchSize);
233-
} else {
234-
CompressionCodec compressionCodec =
235-
compression == Compression.NONE
236-
? null
237-
: environment.compressionCodecFactory().get(compression);
238-
byte compressionCode =
239-
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
240-
this.dynamicBatch =
241-
new DynamicBatch<>(
242-
items -> {
243-
List<Object> subBatches = new ArrayList<>();
244-
int count = 0;
245-
ProducerUtils.Batch batch =
246-
new ProducerUtils.Batch(
247-
Client.EncodedMessageBatch.create(
248-
this.environment.byteBufAllocator(),
249-
compressionCode,
250-
compressionCodec,
251-
subEntrySize),
252-
new ProducerUtils.CompositeConfirmationCallback(
253-
new ArrayList<>(subEntrySize)));
254-
AccumulatedEntity lastMessageInBatch = null;
255-
for (Object msg : items) {
256-
AccumulatedEntity message = (AccumulatedEntity) msg;
257-
this.observationCollector.published(
258-
message.observationContext(), message.confirmationCallback().message());
259-
lastMessageInBatch = message;
260-
batch.add(
261-
(Codec.EncodedMessage) message.encodedEntity(),
262-
message.confirmationCallback());
263-
count++;
264-
if (count == subEntrySize) {
265-
batch.time = lastMessageInBatch.time();
266-
batch.publishingId = lastMessageInBatch.publishingId();
267-
batch.encodedMessageBatch.close();
268-
subBatches.add(batch);
269-
lastMessageInBatch = null;
270-
batch =
271-
new ProducerUtils.Batch(
272-
Client.EncodedMessageBatch.create(
273-
this.environment.byteBufAllocator(),
274-
compressionCode,
275-
compressionCodec,
276-
subEntrySize),
277-
new ProducerUtils.CompositeConfirmationCallback(
278-
new ArrayList<>(subEntrySize)));
279-
count = 0;
280-
}
281-
}
282-
283-
if (!batch.isEmpty() && count < subEntrySize) {
284-
batch.time = lastMessageInBatch.time();
285-
batch.publishingId = lastMessageInBatch.publishingId();
286-
batch.encodedMessageBatch.close();
287-
subBatches.add(batch);
288-
}
289-
290-
client.publishInternal(
291-
this.publishVersion,
292-
this.publisherId,
293-
subBatches,
294-
this.writeCallback,
295-
this.publishSequenceFunction);
296-
297-
},
298-
batchSize * subEntrySize);
299-
}
300-
*/
301-
302191
if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) {
303192
AtomicReference<Runnable> taskReference = new AtomicReference<>();
304193
Runnable task =
@@ -388,7 +277,7 @@ private Runnable confirmTimeoutTask(Duration confirmTimeout) {
388277
error(unconfirmedEntry.getKey(), Constants.CODE_PUBLISH_CONFIRM_TIMEOUT);
389278
count++;
390279
} else {
391-
// everything else is after, so we can stop
280+
// everything else is after, we can stop
392281
break;
393282
}
394283
}
@@ -571,30 +460,6 @@ private void cancelConfirmTimeoutTask() {
571460
}
572461
}
573462

574-
/*
575-
private void publishBatch(boolean stateCheck) {
576-
if ((!stateCheck || canSend()) && !accumulator.isEmpty()) {
577-
List<Object> messages = new ArrayList<>(this.batchSize);
578-
int batchCount = 0;
579-
while (batchCount != this.batchSize) {
580-
AccumulatedEntity accMessage = accumulator.get();
581-
if (accMessage == null) {
582-
break;
583-
}
584-
messages.add(accMessage);
585-
batchCount++;
586-
}
587-
client.publishInternal(
588-
this.publishVersion,
589-
this.publisherId,
590-
messages,
591-
this.writeCallback,
592-
this.publishSequenceFunction);
593-
}
594-
}
595-
596-
*/
597-
598463
void publishInternal(List<Object> messages) {
599464
client.publishInternal(
600465
this.publishVersion,

0 commit comments

Comments
 (0)