@@ -30,7 +30,7 @@ class Ingest
3030 */
3131 private array $ concurrentRequests = [];
3232
33- private ?TimerInterface $ sendBufferAfterDelayTimer = null ;
33+ private ?TimerInterface $ digestDelayTimer = null ;
3434
3535 private StreamBuffer |NullBuffer $ buffer ;
3636
@@ -59,28 +59,20 @@ public function __construct(
5959 public function write (string $ payload ): void
6060 {
6161 if ($ this ->buffer ->isNotEmpty () && $ this ->buffer ->willExceedThresholdWith ($ payload )) {
62- if ($ this ->sendBufferAfterDelayTimer !== null ) {
63- $ this ->loop ->cancelTimer ($ this ->sendBufferAfterDelayTimer );
64-
65- $ this ->sendBufferAfterDelayTimer = null ;
66- }
62+ $ this ->cancelDigestDelay ();
6763
6864 $ this ->digest ();
6965 }
7066
7167 $ this ->buffer ->write ($ payload );
7268
7369 if ($ this ->buffer ->reachedThreshold ()) {
74- if ($ this ->sendBufferAfterDelayTimer !== null ) {
75- $ this ->loop ->cancelTimer ($ this ->sendBufferAfterDelayTimer );
76-
77- $ this ->sendBufferAfterDelayTimer = null ;
78- }
70+ $ this ->cancelDigestDelay ();
7971
8072 $ this ->digest ();
8173 } elseif ($ this ->buffer ->isNotEmpty ()) {
82- $ this ->sendBufferAfterDelayTimer ??= $ this ->loop ->addTimer ($ this ->maxBufferDurationInSeconds , function () {
83- $ this ->sendBufferAfterDelayTimer = null ;
74+ $ this ->digestDelayTimer ??= $ this ->loop ->addTimer ($ this ->maxBufferDurationInSeconds , function () {
75+ $ this ->digestDelayTimer = null ;
8476
8577 $ this ->digest ();
8678 });
@@ -92,9 +84,7 @@ public function write(string $payload): void
9284 */
9385 public function forceDigest (): PromiseInterface
9486 {
95- if ($ this ->sendBufferAfterDelayTimer !== null ) {
96- $ this ->loop ->cancelTimer ($ this ->sendBufferAfterDelayTimer );
97- }
87+ $ this ->cancelDigestDelay ();
9888
9989 if ($ this ->buffer ->isNotEmpty ()) {
10090 $ this ->digest ();
@@ -233,4 +223,12 @@ private function parseResponse(ResponseInterface $response): array
233223 $ refreshIn ,
234224 ];
235225 }
226+
227+ private function cancelDigestDelay (): void
228+ {
229+ if ($ this ->digestDelayTimer !== null ) {
230+ $ this ->loop ->cancelTimer ($ this ->digestDelayTimer );
231+ $ this ->digestDelayTimer = null ;
232+ }
233+ }
236234}
0 commit comments