Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified agent/build/agent.phar
Binary file not shown.
2 changes: 1 addition & 1 deletion agent/build/signature.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
859B5CB714BE47686CD645444022C0336586C814B1C783F48E8AD843750845E011E19E53D7CAC9C92382E1242D4A83933DF9A0686B5A9A6E0330D69100C8A047
EEFAD35214A42F62A817D01F5805400DCA2E15A50631BEE1AFF51DED19103C51460A44D5D4F496521EA85EA3D1893CEF0311CEE55B13229DEFC770936B5783A5
30 changes: 14 additions & 16 deletions agent/src/Ingest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Ingest
*/
private array $concurrentRequests = [];

private ?TimerInterface $sendBufferAfterDelayTimer = null;
private ?TimerInterface $digestDelayTimer = null;

private StreamBuffer|NullBuffer $buffer;

Expand Down Expand Up @@ -59,28 +59,20 @@ public function __construct(
public function write(string $payload): void
{
if ($this->buffer->isNotEmpty() && $this->buffer->willExceedThresholdWith($payload)) {
if ($this->sendBufferAfterDelayTimer !== null) {
$this->loop->cancelTimer($this->sendBufferAfterDelayTimer);

$this->sendBufferAfterDelayTimer = null;
}
$this->cancelDigestDelay();

$this->digest();
}

$this->buffer->write($payload);

if ($this->buffer->reachedThreshold()) {
if ($this->sendBufferAfterDelayTimer !== null) {
$this->loop->cancelTimer($this->sendBufferAfterDelayTimer);

$this->sendBufferAfterDelayTimer = null;
}
$this->cancelDigestDelay();

$this->digest();
} elseif ($this->buffer->isNotEmpty()) {
$this->sendBufferAfterDelayTimer ??= $this->loop->addTimer($this->maxBufferDurationInSeconds, function () {
$this->sendBufferAfterDelayTimer = null;
$this->digestDelayTimer ??= $this->loop->addTimer($this->maxBufferDurationInSeconds, function () {
$this->digestDelayTimer = null;

$this->digest();
});
Expand All @@ -92,9 +84,7 @@ public function write(string $payload): void
*/
public function forceDigest(): PromiseInterface
{
if ($this->sendBufferAfterDelayTimer !== null) {
$this->loop->cancelTimer($this->sendBufferAfterDelayTimer);
}
$this->cancelDigestDelay();

if ($this->buffer->isNotEmpty()) {
$this->digest();
Expand Down Expand Up @@ -233,4 +223,12 @@ private function parseResponse(ResponseInterface $response): array
$refreshIn,
];
}

private function cancelDigestDelay(): void
{
if ($this->digestDelayTimer !== null) {
$this->loop->cancelTimer($this->digestDelayTimer);
$this->digestDelayTimer = null;
}
}
}