Commit f70b0f2
committed
Fix PubsubUnboundedSink not following 1000 messages per batch limit
This commit fixes the issue where PubsubUnboundedSink's WriterFn was not
properly enforcing the message count limit per publish batch. The
publishBatchSize parameter was being passed to the constructor but not
stored or used in the processElement method.
Changes:
- Add publishBatchSize field to WriterFn class
- Store publishBatchSize in both WriterFn constructors
- Add message count check in processElement alongside existing byte size check
- Update both PubsubSinkDynamicDestinations and PubsubSink to pass
publishBatchSize when creating WriterFn instances
The fix ensures that batches are split when they reach either the message
count limit or the byte size limit, preventing Pubsub from rejecting
batches that exceed the 1000 messages per request limit.
Fixes #368851 parent c883631 commit f70b0f2
File tree
1 file changed
+12
-5
lines changed- sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub
1 file changed
+12
-5
lines changedLines changed: 12 additions & 5 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
248 | 248 | | |
249 | 249 | | |
250 | 250 | | |
| 251 | + | |
251 | 252 | | |
252 | 253 | | |
253 | 254 | | |
| |||
270 | 271 | | |
271 | 272 | | |
272 | 273 | | |
| 274 | + | |
273 | 275 | | |
274 | 276 | | |
275 | 277 | | |
| |||
279 | 281 | | |
280 | 282 | | |
281 | 283 | | |
| 284 | + | |
282 | 285 | | |
283 | 286 | | |
284 | 287 | | |
285 | 288 | | |
286 | 289 | | |
287 | 290 | | |
| 291 | + | |
288 | 292 | | |
289 | 293 | | |
290 | 294 | | |
| |||
354 | 358 | | |
355 | 359 | | |
356 | 360 | | |
357 | | - | |
| 361 | + | |
| 362 | + | |
358 | 363 | | |
359 | 364 | | |
360 | | - | |
361 | | - | |
362 | | - | |
363 | | - | |
| 365 | + | |
| 366 | + | |
| 367 | + | |
| 368 | + | |
364 | 369 | | |
365 | 370 | | |
366 | 371 | | |
| |||
659 | 664 | | |
660 | 665 | | |
661 | 666 | | |
| 667 | + | |
662 | 668 | | |
663 | 669 | | |
664 | 670 | | |
| |||
711 | 717 | | |
712 | 718 | | |
713 | 719 | | |
| 720 | + | |
714 | 721 | | |
715 | 722 | | |
716 | 723 | | |
| |||
0 commit comments