Skip to content
This repository was archived by the owner on Sep 6, 2025. It is now read-only.

Commit db948f8

Browse files
authored
Merge pull request #7 from hellofresh/issue/microseconds
Issue/microseconds
2 parents c8c29f2 + d2705d9 commit db948f8

File tree

9 files changed

+326
-23
lines changed

9 files changed

+326
-23
lines changed

src/Domain/DomainMessage.php

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,21 @@ final class DomainMessage
2828
private $version;
2929

3030
/**
31-
* @param string $id
31+
* @param AggregateIdInterface|string $id
3232
* @param $version
3333
* @param mixed $payload
3434
* @param \DateTimeImmutable $recordedOn
3535
*/
3636
public function __construct($id, $version, $payload, \DateTimeImmutable $recordedOn)
3737
{
38-
$this->id = $id;
38+
$this->id = (string) $id;
3939
$this->payload = $payload;
40-
$this->recordedOn = $recordedOn;
40+
$this->recordedOn = $recordedOn->setTimezone(new \DateTimeZone('UTC'));
4141
$this->version = $version;
4242
}
4343

4444
/**
45-
* @return AggregateIdInterface
45+
* @return string
4646
*/
4747
public function getId()
4848
{
@@ -58,15 +58,15 @@ public function getPayload()
5858
}
5959

6060
/**
61-
* {@inheritDoc}
61+
* @inheritDoc
6262
*/
6363
public function getRecordedOn()
6464
{
6565
return $this->recordedOn;
6666
}
6767

6868
/**
69-
* {@inheritDoc}
69+
* @inheritDoc
7070
*/
7171
public function getType()
7272
{
@@ -81,7 +81,9 @@ public function getType()
8181
*/
8282
public static function recordNow($id, $version, DomainEventInterface $payload)
8383
{
84-
return new DomainMessage($id, $version, $payload, new \DateTimeImmutable('now', new \DateTimeZone('UTC')));
84+
$recordedOn = \DateTimeImmutable::createFromFormat('U.u', sprintf('%.6F', microtime(true)));
85+
86+
return new DomainMessage($id, $version, $payload, $recordedOn);
8587
}
8688

8789
/**

src/EventStore/Adapter/RedisAdapter.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public function countEventsFor(StreamName $streamName, AggregateIdInterface $agg
5959
return count($this->redis->lrange($this->getNamespaceKey($streamName, $aggregateId), 0, -1));
6060
}
6161

62-
private function getNamespaceKey(StreamName $streamName, AggregateIdInterface $aggregateId)
62+
private function getNamespaceKey(StreamName $streamName, $aggregateId)
6363
{
64-
return "events:{$streamName}:{$aggregateId}";
64+
return sprintf('events:%s:%s', $streamName, $aggregateId);
6565
}
6666
}

src/EventStore/Snapshot/Adapter/RedisSnapshotAdapter.php

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public function byId(AggregateIdInterface $id)
4343
'json'
4444
);
4545

46-
if (null === $metadata) {
46+
if (!is_array($metadata)) {
4747
return null;
4848
}
4949

@@ -54,11 +54,14 @@ public function byId(AggregateIdInterface $id)
5454
'json'
5555
);
5656

57+
$createdAt = \DateTimeImmutable::createFromFormat('U.u', $metadata['created_at']);
58+
$createdAt->setTimezone(new \DateTimeZone('UTC'));
59+
5760
return new Snapshot(
5861
$aggregate->getAggregateRootId(),
5962
$aggregate,
6063
$metadata['version'],
61-
new \DateTimeImmutable("@" . $metadata['created_at'])
64+
$createdAt
6265
);
6366
}
6467

@@ -69,7 +72,7 @@ public function save(Snapshot $snapshot)
6972
{
7073
$data = [
7174
'version' => $snapshot->getVersion(),
72-
'created_at' => $snapshot->getCreatedAt()->getTimestamp(),
75+
'created_at' => $snapshot->getCreatedAt()->format('U.u'),
7376
'snapshot' => [
7477
'type' => $snapshot->getType(),
7578
'payload' => $this->serializer->serialize($snapshot->getAggregate(), 'json')
@@ -89,14 +92,12 @@ public function save(Snapshot $snapshot)
8992
*/
9093
public function has(AggregateIdInterface $id, $version)
9194
{
92-
$data = $this->redis->hget(static::KEY_NAMESPACE, (string)$id);
93-
94-
if (!$data) {
95+
if (!$this->redis->hexists(static::KEY_NAMESPACE, (string)$id)) {
9596
return false;
9697
}
9798

9899
$snapshot = $this->serializer->deserialize(
99-
$data,
100+
$this->redis->hget(static::KEY_NAMESPACE, (string)$id),
100101
'array',
101102
'json'
102103
);

src/EventStore/Snapshot/Snapshot.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public function __construct(
4343
$this->aggregateId = $aggregateId;
4444
$this->aggregate = $aggregate;
4545
$this->version = $version;
46-
$this->createdAt = $createdAt;
46+
$this->createdAt = $createdAt->setTimezone(new \DateTimeZone('UTC'));
4747
}
4848

4949
/**
@@ -55,7 +55,9 @@ public function __construct(
5555
*/
5656
public static function take(AggregateIdInterface $aggregateId, AggregateRootInterface $aggregate, $version)
5757
{
58-
return new static($aggregateId, $aggregate, $version, new \DateTimeImmutable());
58+
$dateTime = \DateTimeImmutable::createFromFormat('U.u', sprintf('%.6F', microtime(true)));
59+
60+
return new static($aggregateId, $aggregate, $version, $dateTime);
5961
}
6062

6163
/**

tests/Domain/DomainMessageTest.php

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@ public function itShouldCreateAUuidFromConstructor(
2424
\DateTimeImmutable $date
2525
) {
2626
$message = new DomainMessage($aggregateId, $version, $payload, $date);
27+
2728
$this->assertInstanceOf(DomainMessage::class, $message);
28-
$this->assertEquals($aggregateId, $message->getId());
29-
$this->assertEquals($version, $message->getVersion());
30-
$this->assertEquals($payload, $message->getPayload());
29+
$this->assertSame((string) $aggregateId, $message->getId());
30+
$this->assertSame($version, $message->getVersion());
31+
$this->assertSame($payload, $message->getPayload());
3132
$this->assertEquals($date, $message->getRecordedOn());
33+
$this->assertEquals(new \DateTimeZone('UTC'), $message->getRecordedOn()->getTimezone());
3234
}
3335

3436
/**
@@ -46,15 +48,25 @@ public function itShouldCreateAUuidFromNamedConstructor(
4648
\DateTimeImmutable $date
4749
) {
4850
$message = DomainMessage::recordNow($aggregateId, $version, $payload);
51+
4952
$this->assertInstanceOf(DomainMessage::class, $message);
53+
54+
$this->assertNotEmpty((int)$message->getRecordedOn()->format('u'), 'Expected microseconds to be set');
55+
$this->assertEquals(new \DateTimeZone('UTC'), $message->getRecordedOn()->getTimezone());
5056
}
5157

5258
public function messageProvider()
5359
{
5460
return [
5561
[AggregateId::generate(), 1, new SomethingHappened(), new \DateTimeImmutable()],
5662
[AggregateId::generate(), 100, new SomethingHappened(), new \DateTimeImmutable()],
57-
[AggregateId::generate(), 9999999, new SomethingHappened(), new \DateTimeImmutable()]
63+
[AggregateId::generate(), 9999999, new SomethingHappened(), new \DateTimeImmutable()],
64+
[
65+
AggregateId::generate(),
66+
9999999,
67+
new SomethingHappened(),
68+
\DateTimeImmutable::createFromFormat('U.u', sprintf('%.6F', microtime(true)))
69+
]
5870
];
5971
}
6072
}

tests/EventStore/EventStoreTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace HelloFresh\Tests\Engine\EventStore;
44

55
use HelloFresh\Engine\Domain\AggregateId;
6+
use HelloFresh\Engine\Domain\AggregateIdInterface;
67
use HelloFresh\Engine\Domain\DomainMessage;
78
use HelloFresh\Engine\Domain\EventStream;
89
use HelloFresh\Engine\Domain\StreamName;
@@ -79,7 +80,7 @@ public function idDataProvider()
7980
{
8081
return [
8182
'Simple String' => [
82-
'Yolntbyaac', //You only live nine times because you are a cat
83+
'Yolntbyaac', // You only live nine times because you are a cat
8384
],
8485
'Identitiy' => [
8586
AggregateId::generate(),
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
<?php
2+
3+
namespace HelloFresh\Tests\Engine\EventStore\Snapshot\Adapter;
4+
5+
use HelloFresh\Engine\Domain\AggregateId;
6+
use HelloFresh\Engine\EventStore\Snapshot\Adapter\RedisSnapshotAdapter;
7+
use HelloFresh\Engine\EventStore\Snapshot\Snapshot;
8+
use HelloFresh\Engine\Serializer\SerializerInterface;
9+
use HelloFresh\Tests\Engine\Mock\AggregateRoot;
10+
use HelloFresh\Tests\Engine\Mock\PredisClient;
11+
use PHPUnit\Framework\TestCase;
12+
use Predis\ClientInterface;
13+
14+
class RedisSnapshotAdapterTest extends TestCase
15+
{
16+
/**
17+
* @var ClientInterface|\Prophecy\Prophecy\ObjectProphecy
18+
*/
19+
private $client;
20+
/**
21+
* @var SerializerInterface|\Prophecy\Prophecy\ObjectProphecy
22+
*/
23+
private $serializer;
24+
25+
protected function setUp()
26+
{
27+
$this->client = $this->prophesize(PredisClient::class);
28+
$this->serializer = $this->prophesize(SerializerInterface::class);
29+
}
30+
31+
/**
32+
* @test
33+
*/
34+
public function itCanSaveASnapshot()
35+
{
36+
$id = AggregateId::generate();
37+
$aggregate = AggregateRoot::create($id, 'test');
38+
39+
$snapshot = Snapshot::take($id, $aggregate, '10');
40+
41+
$expectedSerializedAggregate = sprintf('["serialized": "%s"]', spl_object_hash($snapshot));
42+
$expectedStorageArray = [
43+
'version' => '10',
44+
'created_at' => $snapshot->getCreatedAt()->format('U.u'),
45+
'snapshot' => [
46+
'type' => AggregateRoot::class,
47+
'payload' => $expectedSerializedAggregate,
48+
]
49+
];
50+
$expectedStoredData = '["version etc..."]';
51+
52+
$this->serializer->serialize($aggregate, 'json')
53+
->willReturn($expectedSerializedAggregate)
54+
->shouldBeCalledTimes(1);
55+
$this->serializer->serialize($expectedStorageArray, 'json')
56+
->willReturn($expectedStoredData)
57+
->shouldBeCalledTimes(1);
58+
59+
$this->client->hset(RedisSnapshotAdapter::KEY_NAMESPACE, (string)$id, $expectedStoredData)
60+
->shouldBeCalledTimes(1);
61+
62+
$adapter = $this->createAdapter();
63+
$adapter->save($snapshot);
64+
}
65+
66+
/**
67+
* @test
68+
*/
69+
public function aSnapshotCanBeRetrievedById()
70+
{
71+
$id = AggregateId::generate();
72+
73+
$expectedAggregate = AggregateRoot::create($id, 'testing');
74+
75+
$snapshotMetadata = [
76+
'version' => '15',
77+
'created_at' => '1468847497.332610',
78+
'snapshot' => [
79+
'type' => AggregateRoot::class,
80+
'payload' => 'aggregate_data',
81+
]
82+
];
83+
84+
$this->mockRedisHasAndGetData($id, $snapshotMetadata);
85+
86+
$this->serializer->deserialize('aggregate_data', AggregateRoot::class, 'json')
87+
->willReturn($expectedAggregate);
88+
89+
$adapter = $this->createAdapter();
90+
$result = $adapter->byId($id);
91+
92+
$this->assertInstanceOf(Snapshot::class, $result);
93+
$this->assertSame($id, $result->getAggregateId());
94+
$this->assertSame($expectedAggregate, $result->getAggregate());
95+
$this->assertSame('15', $result->getVersion());
96+
$this->assertSame('1468847497.332610', $result->getCreatedAt()->format('U.u'));
97+
$this->assertEquals(new \DateTimeZone('UTC'), $result->getCreatedAt()->getTimezone());
98+
}
99+
100+
/**
101+
* @test
102+
*/
103+
public function aSnapshotCanNotBeRetrievedWhenTheIdIsUnknown()
104+
{
105+
$id = AggregateId::generate();
106+
107+
$this->client->hexists(RedisSnapshotAdapter::KEY_NAMESPACE, (string)$id)
108+
->willReturn(false)
109+
->shouldBeCalledTimes(1);
110+
111+
$adapter = $this->createAdapter();
112+
$result = $adapter->byId($id);
113+
114+
$this->assertNull($result);
115+
}
116+
117+
/**
118+
* @test
119+
*/
120+
public function itIndicatedIfASnapshotOfAggregateWithVersionExists()
121+
{
122+
$id = AggregateId::generate();
123+
$expectedDeserializedRedisData = ['version' => 20];
124+
125+
$this->mockRedisHasAndGetData($id, $expectedDeserializedRedisData);
126+
127+
$adapter = $this->createAdapter();
128+
$result = $adapter->has($id, 20);
129+
130+
$this->assertTrue($result);
131+
}
132+
133+
/**
134+
* @test
135+
*/
136+
public function itIndicatedThatASnapshotOfAggregateIsUnknown()
137+
{
138+
$id = AggregateId::generate();
139+
140+
$this->client->hexists(RedisSnapshotAdapter::KEY_NAMESPACE, (string)$id)
141+
->willReturn(false)
142+
->shouldBeCalledTimes(1);
143+
144+
$adapter = $this->createAdapter();
145+
$result = $adapter->has($id, 15);
146+
147+
$this->assertFalse($result);
148+
}
149+
150+
/**
151+
* @test
152+
*/
153+
public function itIndicatedThatASnapshotOfAggregateIsUnknownWhenTheVersionIsIncorrect()
154+
{
155+
$id = AggregateId::generate();
156+
157+
$this->mockRedisHasAndGetData($id, 20);
158+
159+
$adapter = $this->createAdapter();
160+
$result = $adapter->has($id, 15);
161+
162+
$this->assertFalse($result);
163+
}
164+
165+
166+
/**
167+
* @return RedisSnapshotAdapter
168+
*/
169+
protected function createAdapter()
170+
{
171+
$adapter = new RedisSnapshotAdapter($this->client->reveal(), $this->serializer->reveal());
172+
return $adapter;
173+
}
174+
175+
/**
176+
* @param $id
177+
* @param $expectedDeserializedRedisData
178+
*/
179+
protected function mockRedisHasAndGetData($id, $expectedDeserializedRedisData)
180+
{
181+
$this->client->hexists(RedisSnapshotAdapter::KEY_NAMESPACE, (string)$id)
182+
->willReturn(true)
183+
->shouldBeCalledTimes(1);
184+
185+
$this->client->hget(RedisSnapshotAdapter::KEY_NAMESPACE, (string)$id)
186+
->willReturn('redis_data')
187+
->shouldBeCalledTimes(1);
188+
189+
$this->serializer->deserialize('redis_data', 'array', 'json')
190+
->willReturn($expectedDeserializedRedisData);
191+
}
192+
}

0 commit comments

Comments
 (0)