Skip to content
This repository was archived by the owner on Jun 4, 2024. It is now read-only.

Commit 148206b

Browse files
Fix delay when a job is not pushed to disk (#3)
1 parent a996a8a commit 148206b

File tree

2 files changed

+39
-11
lines changed

2 files changed

+39
-11
lines changed

src/SqsDiskQueue.php

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,21 +92,24 @@ public function __construct(
9292
*/
9393
public function pushRaw($payload, $queue = null, array $options = [], $delay = 0)
9494
{
95-
$payloadLength = strlen($payload);
95+
$message = [
96+
'QueueUrl' => $this->getQueue($queue),
97+
'MessageBody' => $payload,
98+
];
9699

97-
if ($payloadLength >= self::MAX_SQS_LENGTH || Arr::get($this->diskOptions, 'always_store')) {
100+
if (strlen($payload) >= self::MAX_SQS_LENGTH || Arr::get($this->diskOptions, 'always_store')) {
98101
$uuid = json_decode($payload)->uuid;
99102
$filepath = Arr::get($this->diskOptions, 'prefix', '') . "/{$uuid}.json";
100103
$this->resolveDisk()->put($filepath, $payload);
101104

102-
return $this->sqs->sendMessage([
103-
'QueueUrl' => $this->getQueue($queue),
104-
'MessageBody' => json_encode(['pointer' => $filepath]),
105-
'DelaySeconds' => $this->secondsUntil($delay),
106-
])->get('MessageId');
105+
$message['MessageBody'] = json_encode(['pointer' => $filepath]);
107106
}
108107

109-
return parent::pushRaw($payload, $queue, $options);
108+
if ($delay) {
109+
$message['DelaySeconds'] = $this->secondsUntil($delay);
110+
}
111+
112+
return $this->sqs->sendMessage($message)->get('MessageId');
110113
}
111114

112115
/**

tests/SqsDiskQueueTest.php

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ public function testItPushesLargePayloadsToADisk()
8282
->with([
8383
'QueueUrl' => '/default',
8484
'MessageBody' => $this->mockedPointerPayload,
85-
'DelaySeconds' => 0,
8685
])
8786
->once()
8887
->andReturnSelf();
@@ -120,7 +119,6 @@ public function testItAlwaysPushesPayloadsToADiskIfAlwaysPushIsEnabled()
120119
->with([
121120
'QueueUrl' => '/default',
122121
'MessageBody' => $this->mockedPointerPayload,
123-
'DelaySeconds' => 0,
124122
])
125123
->once()
126124
->andReturnSelf();
@@ -140,7 +138,7 @@ public function testItAlwaysPushesPayloadsToADiskIfAlwaysPushIsEnabled()
140138
$sqsDiskQueue->pushRaw($this->mockedPayload);
141139
}
142140

143-
public function testItDelaysAJob()
141+
public function testItCanDelayAJobWhenPushedToDisk()
144142
{
145143
$this->mockedFilesystemAdapter->shouldReceive('disk')
146144
->with('s3')
@@ -175,6 +173,33 @@ public function testItDelaysAJob()
175173
$sqsDiskQueue->later(10, 'foo');
176174
}
177175

176+
public function testItCanDelayAJobWhenNotPushedToDisk()
177+
{
178+
$this->mockedFilesystemAdapter->shouldReceive('disk')
179+
->never();
180+
181+
$this->mockedSqsClient->shouldReceive('sendMessage')
182+
->with(Mockery::on(function ($arguments) {
183+
return $arguments['DelaySeconds'] === 10;
184+
}))
185+
->once()
186+
->andReturnSelf();
187+
188+
$this->mockedSqsClient->shouldReceive('get')
189+
->once();
190+
191+
$diskOptions = [
192+
'always_store' => false,
193+
'cleanup' => true,
194+
'disk' => 's3',
195+
'prefix' => 'prefix',
196+
];
197+
198+
$sqsDiskQueue = new SqsDiskQueue($this->mockedSqsClient, 'default', $diskOptions);
199+
$sqsDiskQueue->setContainer($this->mockedContainer);
200+
$sqsDiskQueue->later(10, 'foo');
201+
}
202+
178203
public function testItCreatesANewSqsDiskJobWhenPopped()
179204
{
180205
$this->mockedSqsClient->shouldReceive('receiveMessage')

0 commit comments

Comments
 (0)