Skip to content

Commit 618e0b6

Browse files
Ingest records immediately when next payload will exceed buffer limit (#336)
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
1 parent 9a8a5d7 commit 618e0b6

File tree

6 files changed

+87
-3
lines changed

6 files changed

+87
-3
lines changed

agent/build/agent.phar

717 Bytes
Binary file not shown.

agent/build/signature.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0389423268858E42BBF5CE5338A2515B812A14A30FE5E79C4B19A7C3D3C45426C1061A5C34CA67300409BA190E583F1D5BFC39B7965E255675DBEA180A75A5C2
1+
859B5CB714BE47686CD645444022C0336586C814B1C783F48E8AD843750845E011E19E53D7CAC9C92382E1242D4A83933DF9A0686B5A9A6E0330D69100C8A047

agent/src/Ingest.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ public function __construct(
5858

5959
public function write(string $payload): void
6060
{
61+
if ($this->buffer->isNotEmpty() && $this->buffer->willExceedThresholdWith($payload)) {
62+
if ($this->sendBufferAfterDelayTimer !== null) {
63+
$this->loop->cancelTimer($this->sendBufferAfterDelayTimer);
64+
65+
$this->sendBufferAfterDelayTimer = null;
66+
}
67+
68+
$this->digest();
69+
}
70+
6171
$this->buffer->write($payload);
6272

6373
if ($this->buffer->reachedThreshold()) {

agent/src/NullBuffer.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ public function reachedThreshold(): bool
1414
return false;
1515
}
1616

17+
public function willExceedThresholdWith(string $payload): bool
18+
{
19+
return false;
20+
}
21+
1722
/**
1823
* @return non-empty-string
1924
*/

agent/src/StreamBuffer.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ public function reachedThreshold(): bool
3131
return strlen($this->buffer) >= $this->threshold;
3232
}
3333

34+
public function willExceedThresholdWith(string $payload): bool
35+
{
36+
// -2 to account for the removal of the `[` and `]` characters when
37+
// appending to the stream.
38+
return (strlen($this->buffer) + (strlen($payload) - 2)) > $this->threshold;
39+
}
40+
3441
/**
3542
* @return non-empty-string
3643
*/

agent/tests/Feature/IngestTest.php

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ public function test_it_ingests_payloads_under_the_threshold_after_10_seconds():
703703
$ingestDetailsBrowser->assertPending([]);
704704
}
705705
706-
public function test_it_ingests_payloads_before_10_seconds_if_the_buffer_exceeds_the_threshold(): void
706+
public function test_it_ingests_payloads_before_10_seconds_if_the_buffer_reaches_the_threshold(): void
707707
{
708708
$loop = new LoopFake(runForSeconds: 11);
709709
$server = new TcpServerFake;
@@ -724,7 +724,7 @@ public function test_it_ingests_payloads_before_10_seconds_if_the_buffer_exceeds
724724
ingestBrowser: $ingestBrowser,
725725
loop: $loop,
726726
server: $server,
727-
maxBufferLength: 63,
727+
maxBufferLength: 62,
728728
);
729729
730730
$this->assertNull($e, $e?->getMessage() ?? '');
@@ -803,6 +803,68 @@ public function test_it_ingests_immediately_when_buffer_is_empty_and_a_payload_o
803803
$ingestDetailsBrowser->assertPending([]);
804804
}
805805
806+
public function test_it_ingests_immediately_when_incoming_payload_will_put_buffer_over_the_threshold(): void
807+
{
808+
$loop = new LoopFake(runForSeconds: 14);
809+
$server = new TcpServerFake;
810+
$ingestDetailsBrowser = new BrowserFake([
811+
Response::jwt(),
812+
]);
813+
$ingestBrowser = new BrowserFake([
814+
Response::ingested(),
815+
Response::ingested(),
816+
]);
817+
$loop->addTimer(0, $server->pendingConnection([['t' => 'request']]));
818+
$loop->addTimer(1, $server->pendingConnection([['t' => 'request']]));
819+
$loop->addTimer(2, $server->pendingConnection([['t' => 'request']]));
820+
$loop->addTimer(3, $server->pendingConnection([['t' => 'request']]));
821+
822+
[$output, $e] = $this->runAgent(
823+
via: 'source',
824+
ingestDetailsBrowser: $ingestDetailsBrowser,
825+
ingestBrowser: $ingestBrowser,
826+
loop: $loop,
827+
server: $server,
828+
maxBufferLength: 61,
829+
);
830+
831+
$this->assertNull($e, $e?->getMessage() ?? '');
832+
$this->assertLogMatches(<<<'OUTPUT'
833+
{date} {info} Authentication successful {duration}
834+
{date} {info} Ingest successful {duration}
835+
{date} {info} Ingest successful {duration}
836+
OUTPUT, $output);
837+
$ingestBrowser->assertSent([
838+
Request::ingest([
839+
['t' => 'request'],
840+
['t' => 'request'],
841+
['t' => 'request'],
842+
]),
843+
Request::ingest([
844+
['t' => 'request'],
845+
]),
846+
]);
847+
$ingestBrowser->assertProcessing([]);
848+
$ingestBrowser->assertPending([]);
849+
$loop->assertRun([
850+
new Timer(interval: 0, runAt: 0, scheduledAt: 0, scheduledBy: $this->functionName()),
851+
new Timer(interval: 1, runAt: 1, scheduledAt: 0, scheduledBy: $this->functionName()),
852+
new Timer(interval: 2, runAt: 2, scheduledAt: 0, scheduledBy: $this->functionName()),
853+
new Timer(interval: 3, runAt: 3, scheduledAt: 0, scheduledBy: $this->functionName()),
854+
new Timer(interval: 10, runAt: 13, scheduledAt: 3, scheduledBy: 'Laravel\NightwatchAgent\Ingest::write'),
855+
]);
856+
$loop->assertCanceled([
857+
new Timer(interval: 10, canceledAt: 3, scheduledAt: 0, scheduledBy: 'Laravel\NightwatchAgent\Ingest::write'),
858+
]);
859+
$loop->assertPending([
860+
new Timer(interval: 3_600, runAt: 3_600, scheduledAt: 0, scheduledBy: 'Laravel\NightwatchAgent\IngestDetailsRepository::scheduleRefreshIn'),
861+
]);
862+
$ingestDetailsBrowser->assertSent([
863+
Request::json('/api/agent-auth'),
864+
]);
865+
$ingestDetailsBrowser->assertPending([]);
866+
}
867+
806868
public function test_it_stops_ingesting_data_when_exceeding_quota_during_request(): void
807869
{
808870
$loop = new LoopFake(runForSeconds: 60);

0 commit comments

Comments
 (0)