Skip to content

Commit a6b1d06

Browse files
authored
Add MultiLockHandler and MultiSemaphore helpers (#13)
1 parent 4a83c0d commit a6b1d06

File tree

8 files changed

+538
-1
lines changed

8 files changed

+538
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ be given to not break the BC and to provide a nice upgrade path.
1717
* Add support for missing scheme in DSN
1818
* Switch from Travis to GitHub Action
1919
* Add some internal tooling (php-cs-fixer, phpstan, phpunit, Makefile)
20+
* Add MultiLockHandler and MultiSemaphore helpers
2021

2122
---
2223

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,48 @@ $kv->delete('tests/session/a-lock');
7878
$session->destroy($sessionId);
7979
```
8080

81+
### How to use MultiLockHandler?
82+
83+
```php
84+
$resources = ['resource1', 'resource2'];
85+
86+
$multiLockHandler = new MultiLockHandler($resources, 60, new Session(), new KV(), 'my/lock/');
87+
88+
if ($multiLockHandler->lock()) {
89+
try {
90+
echo "Do you jobs here....";
91+
} finally {
92+
$multiLockHandler->release();
93+
}
94+
}
95+
```
96+
97+
98+
### How to use MultiSemaphore?
99+
100+
```php
101+
$resources = [
102+
new Resource('resource1', 2, 7),
103+
new Resource('resource2', 3, 6),
104+
new Resource('resource3', 1, 1),
105+
];
106+
107+
$semaphore = new MultiSemaphore($resources, 60, new Session(), new KV(), 'my/semaphore');
108+
109+
if ($semaphore->acquire()) {
110+
try {
111+
echo "Do you jobs here....";
112+
} finally {
113+
$semaphore->release();
114+
}
115+
}
116+
```
117+
81118
## Some utilities
82119

83120
* `Consul\Helper\LockHandler`: Simple class that implement a distributed lock
121+
* `Consul\Helper\MultiLockHandler`: Simple class that implements a distributed lock for many resources
122+
* `Consul\Helper\MultiSemaphore`: Simple class that implements a distributed semaphore for many resources
84123

85124
## Run the test suite
86125

src/ConsulResponse.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,13 @@ public function getStatusCode(): int
3030
return $this->status;
3131
}
3232

33-
public function json(): ?array
33+
public function json()
3434
{
3535
return json_decode($this->body, true, 512, \JSON_THROW_ON_ERROR);
3636
}
37+
38+
public function isSuccessful(): bool
39+
{
40+
return $this->status >= 200 && $this->status < 300;
41+
}
3742
}

src/Helper/MultiLockHandler.php

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
3+
namespace Consul\Helper;
4+
5+
use Consul\Services\KV;
6+
use Consul\Services\Session;
7+
use Exception;
8+
9+
class MultiLockHandler
10+
{
11+
private array $resources;
12+
private int $ttl;
13+
private Session $session;
14+
private KV $kv;
15+
private string $sessionId;
16+
private string $lockPath;
17+
18+
public function __construct(array $resources, int $ttl, Session $session, KV $kv, string $lockPath)
19+
{
20+
$this->resources = $resources;
21+
$this->ttl = $ttl;
22+
$this->session = $session;
23+
$this->kv = $kv;
24+
$this->lockPath = $lockPath;
25+
}
26+
27+
public function lock(): bool
28+
{
29+
// Start a session
30+
$this->sessionId = $this->session->create(['LockDelay' => 0, 'TTL' => "{$this->ttl}s"])->json()['ID'];
31+
32+
$result = true;
33+
$lockedResources = [];
34+
35+
try {
36+
foreach ($this->resources as $resource) {
37+
// Lock a key / value with the current session
38+
$lockAcquired = $this->kv->put($this->lockPath.$resource, '', ['acquire' => $this->sessionId])->json();
39+
40+
if (false === $lockAcquired) {
41+
$result = false;
42+
43+
break;
44+
}
45+
46+
$lockedResources[] = $resource;
47+
}
48+
} catch (Exception $e) {
49+
$result = false;
50+
} finally {
51+
if (!$result) {
52+
$this->releaseResources($lockedResources);
53+
}
54+
}
55+
56+
return $result;
57+
}
58+
59+
public function release(): void
60+
{
61+
$this->releaseResources($this->resources);
62+
}
63+
64+
public function renew(): bool
65+
{
66+
return $this->session->renew($this->sessionId)->isSuccessful();
67+
}
68+
69+
public function getResources(): array
70+
{
71+
return $this->resources;
72+
}
73+
74+
private function releaseResources(array $resources): void
75+
{
76+
foreach ($resources as $resource) {
77+
$this->kv->delete($this->lockPath.$resource);
78+
}
79+
80+
$this->session->destroy($this->sessionId);
81+
}
82+
}

src/Helper/MultiSemaphore.php

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
<?php
2+
3+
namespace Consul\Helper;
4+
5+
use Consul\Helper\MultiSemaphore\Resource;
6+
use Consul\Services\KV;
7+
use Consul\Services\Session;
8+
use RuntimeException;
9+
10+
class MultiSemaphore
11+
{
12+
private Session $session;
13+
private KV $kv;
14+
private ?string $sessionId = null;
15+
private array $resources;
16+
private string $keyPrefix;
17+
private int $ttl;
18+
private string $metaDataKey = '.semaphore';
19+
20+
public function __construct(array $resources, int $ttl, Session $session, KV $kv, string $keyPrefix)
21+
{
22+
$this->resources = $resources;
23+
$this->ttl = $ttl;
24+
$this->session = $session;
25+
$this->kv = $kv;
26+
$this->keyPrefix = trim($keyPrefix, '/');
27+
}
28+
29+
public function getResources(): array
30+
{
31+
return $this->resources;
32+
}
33+
34+
public function acquire(): bool
35+
{
36+
if (null !== $this->sessionId) {
37+
throw new RuntimeException('Resources are acquired already');
38+
}
39+
40+
// Start a session
41+
$session = $this->session->create(['Name' => 'semaphore', 'LockDelay' => 0, 'TTL' => "{$this->ttl}s"])->json();
42+
$this->sessionId = $session['ID'];
43+
44+
$result = false;
45+
46+
try {
47+
$result = $this->acquireResources();
48+
} finally {
49+
if (!$result) {
50+
$this->release();
51+
}
52+
}
53+
54+
return $result;
55+
}
56+
57+
public function renew(): bool
58+
{
59+
return $this->session->renew($this->sessionId)->isSuccessful();
60+
}
61+
62+
public function release(): void
63+
{
64+
if ($this->sessionId) {
65+
foreach ($this->resources as $resource) {
66+
$this->kv->delete($this->getResourceKey($resource, $this->sessionId));
67+
}
68+
69+
$this->session->destroy($this->sessionId);
70+
$this->sessionId = null;
71+
}
72+
}
73+
74+
private function acquireResources(): bool
75+
{
76+
$result = true;
77+
78+
foreach ($this->resources as $resource) {
79+
if (false === $this->kv->put($this->getResourceKey($resource, $this->sessionId), '', ['acquire' => $this->sessionId])->json()) {
80+
$result = false;
81+
} else {
82+
$semaphoreMetaDataValue = [
83+
'limit' => $resource->getLimit(),
84+
'sessions' => [],
85+
];
86+
87+
// get actual metadata
88+
$semaphoreDataItems = $this->kv->get($this->getResourceKeyPrefix($resource), ['recurse' => true])->json();
89+
foreach ($semaphoreDataItems as $key => $item) {
90+
if ($item['Key'] == $this->getResourceKey($resource, $this->metaDataKey)) {
91+
$semaphoreMetaDataActual = $item;
92+
$semaphoreMetaDataActual['Value'] = json_decode(base64_decode($semaphoreMetaDataActual['Value']), true);
93+
unset($semaphoreDataItems[$key]);
94+
95+
break;
96+
}
97+
}
98+
99+
// build new metadata
100+
if (isset($semaphoreMetaDataActual)) {
101+
foreach ($semaphoreDataItems as $item) {
102+
if (isset($item['Session'])) {
103+
if (isset($semaphoreMetaDataActual['Value']['sessions'][$item['Session']])) {
104+
$semaphoreMetaDataValue['sessions'][$item['Session']] = $semaphoreMetaDataActual['Value']['sessions'][$item['Session']];
105+
}
106+
} else {
107+
$this->kv->delete($item['Key']);
108+
}
109+
}
110+
}
111+
112+
$resource->setAcquired(
113+
min($resource->getAcquire(), ($semaphoreMetaDataValue['limit'] - array_sum($semaphoreMetaDataValue['sessions'])))
114+
);
115+
116+
// add new element to metadata and save it
117+
if ($resource->getAcquired() > 0) {
118+
$semaphoreMetaDataValue['sessions'][$this->sessionId] = $resource->getAcquired();
119+
$result = $this->kv->put(
120+
$this->getResourceKey($resource, $this->metaDataKey),
121+
$semaphoreMetaDataValue,
122+
['cas' => isset($semaphoreMetaDataActual) ? $semaphoreMetaDataActual['ModifyIndex'] : 0]
123+
)->json();
124+
} else {
125+
$result = false;
126+
}
127+
}
128+
129+
if (!$result) {
130+
break;
131+
}
132+
}
133+
134+
return $result;
135+
}
136+
137+
private function getResourceKeyPrefix(Resource $resource): string
138+
{
139+
return $this->keyPrefix.'/'.$resource->getName();
140+
}
141+
142+
private function getResourceKey(Resource $resource, string $name): string
143+
{
144+
return $this->getResourceKeyPrefix($resource).'/'.$name;
145+
}
146+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
namespace Consul\Helper\MultiSemaphore;
4+
5+
class Resource
6+
{
7+
private string $name;
8+
private int $acquire;
9+
private int $acquired;
10+
private int $limit;
11+
12+
public function __construct(string $name, int $acquire, int $limit)
13+
{
14+
$this->name = $name;
15+
$this->acquire = $acquire;
16+
$this->acquired = 0;
17+
$this->limit = $limit;
18+
}
19+
20+
public function getName(): string
21+
{
22+
return $this->name;
23+
}
24+
25+
public function getAcquire(): int
26+
{
27+
return $this->acquire;
28+
}
29+
30+
public function getAcquired(): int
31+
{
32+
return $this->acquired;
33+
}
34+
35+
public function getLimit(): int
36+
{
37+
return $this->limit;
38+
}
39+
40+
public function setAcquired(int $acquired): void
41+
{
42+
$this->acquired = $acquired;
43+
}
44+
}

0 commit comments

Comments
 (0)