Skip to content

Commit d982d3f

Browse files
committed
adding websocket for pubsub
1 parent a9fbbfd commit d982d3f

File tree

2 files changed

+103
-1
lines changed

2 files changed

+103
-1
lines changed

composer.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
"pdsinterop/solid-auth": "dev-master",
3333
"pdsinterop/solid-crud": "dev-master",
3434
"php-http/httplug": "^2.1",
35-
"phptal/phptal": "^1.4"
35+
"phptal/phptal": "^1.4",
36+
"cboden/ratchet": "^0.4",
37+
"textalk/websocket": "^1.4"
3638
},
3739
"require-dev": {
3840
"phpunit/phpunit": "*"

websocket.php

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?php
2+
3+
define("SOCKET_PORT", 8080);
4+
5+
use Ratchet\Server\IoServer;
6+
use Ratchet\Http\HttpServer;
7+
use Ratchet\WebSocket\WsServer;
8+
use Ratchet\MessageComponentInterface;
9+
use Ratchet\ConnectionInterface;
10+
require dirname( __FILE__ ) . '/vendor/autoload.php';
11+
12+
class Socket implements MessageComponentInterface {
13+
14+
public function __construct()
15+
{
16+
$this->clients = new \SplObjectStorage;
17+
$this->subscriptions = array();
18+
}
19+
20+
public function onOpen(ConnectionInterface $conn) {
21+
22+
// Store the new connection in $this->clients
23+
$this->clients->attach($conn);
24+
echo "New connection! ({$conn->resourceId})\n";
25+
}
26+
27+
public function onMessage(ConnectionInterface $from, $message) {
28+
$messageInfo = explode(" ", $message);
29+
$command = $messageInfo[0];
30+
$body = trim($messageInfo[1]);
31+
32+
switch ($command) {
33+
case "auth":
34+
case "dpop":
35+
// FIXME: we should check that the client is allowed to listen
36+
break;
37+
case "sub":
38+
echo "Client sub for $body\n";
39+
if (!isset($this->subscriptions[$body])) {
40+
$this->subscriptions[$body] = array();
41+
}
42+
$this->subscriptions[$body][] = $from;
43+
$from->send("ack $body");
44+
break;
45+
case "pub":
46+
echo "Client pub for $body\n";
47+
if (isset($this->subscriptions[$body])) {
48+
foreach ( $this->subscriptions[$body] as $client ) {
49+
$client->send("pub $body");
50+
}
51+
}
52+
break;
53+
default:
54+
echo "Client $from->resourceId said $message\n";
55+
foreach ( $this->clients as $client ) {
56+
57+
if ( $from->resourceId == $client->resourceId ) {
58+
continue;
59+
}
60+
61+
$client->send("Client $from->resourceId said $message\n");
62+
}
63+
break;
64+
}
65+
}
66+
67+
public function onClose(ConnectionInterface $conn) {
68+
echo "Client $conn->resourceId left\n";
69+
foreach ($this->subscriptions as $url => $subscribers) {
70+
foreach ($subscribers as $key => $client) {
71+
if ($client->resourceId == $conn->resourceId) {
72+
echo "Removing subscription for $url\n";
73+
unset($subscribers[$url][$key]);
74+
}
75+
}
76+
}
77+
}
78+
79+
public function onError(ConnectionInterface $conn, \Exception $e) {
80+
foreach ($this->subscriptions as $url => $subscribers) {
81+
foreach ($subscribers as $key => $client) {
82+
if ($client->resourceId == $conn->resourceId) {
83+
echo "Removing subscription for $url\n";
84+
unset($subscribers[$url][$key]);
85+
}
86+
}
87+
}
88+
}
89+
}
90+
91+
$server = IoServer::factory(
92+
new HttpServer(
93+
new WsServer(
94+
new Socket()
95+
)
96+
),
97+
SOCKET_PORT
98+
);
99+
100+
$server->run();

0 commit comments

Comments
 (0)