Skip to content

Commit f4ff4eb

Browse files
committed
Merge pull request #4 from clue/ref
Refactor to ease overwriting socket operations
2 parents d67650c + fd12ccd commit f4ff4eb

File tree

2 files changed

+77
-32
lines changed

2 files changed

+77
-32
lines changed

Datagram/Buffer.php

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88

99
class Buffer extends EventEmitter
1010
{
11-
private $loop;
12-
private $socket;
11+
protected $loop;
12+
protected $socket;
13+
1314
private $listening = false;
1415
private $outgoing = array();
1516
private $writable = true;
@@ -29,32 +30,27 @@ public function send($data, $remoteAddress = null)
2930
$this->outgoing []= array($data, $remoteAddress);
3031

3132
if (!$this->listening) {
32-
$this->loop->addWriteStream($this->socket, array($this, 'handleWrite'));
33+
$this->handleResume();
3334
$this->listening = true;
3435
}
3536
}
3637

37-
public function handleWrite()
38+
public function onWritable()
3839
{
3940
list($data, $remoteAddress) = array_shift($this->outgoing);
4041

41-
if ($remoteAddress === null) {
42-
// do not use fwrite() as it obeys the stream buffer size and
43-
// packets are not to be split at 8kb
44-
$ret = @stream_socket_sendto($this->socket, $data);
45-
} else {
46-
$ret = @stream_socket_sendto($this->socket, $data, 0, $remoteAddress);
42+
try {
43+
$this->handleWrite($data, $remoteAddress);
4744
}
48-
49-
if ($ret < 0) {
50-
$error = error_get_last();
51-
$message = 'Unable to send packet: ' . trim($error['message']);
52-
$this->emit('error', array(new Exception($message)));
45+
catch (Exception $e) {
46+
$this->emit('error', array($e, $this));
5347
}
5448

5549
if (!$this->outgoing) {
56-
$this->loop->removeWriteStream($this->socket);
57-
$this->listening = false;
50+
if ($this->listening) {
51+
$this->handlePause();
52+
$this->listening = false;
53+
}
5854

5955
if (!$this->writable) {
6056
$this->close();
@@ -71,7 +67,7 @@ public function close()
7167
$this->emit('close', array($this));
7268

7369
if ($this->listening) {
74-
$this->loop->removeWriteStream($this->socket);
70+
$this->handlePause();
7571
$this->listening = false;
7672
}
7773

@@ -89,8 +85,34 @@ public function end()
8985

9086
$this->writable = false;
9187

92-
if (!$this->listening) {
88+
if (!$this->outgoing) {
9389
$this->close();
9490
}
9591
}
92+
93+
protected function handlePause()
94+
{
95+
$this->loop->removeWriteStream($this->socket);
96+
}
97+
98+
protected function handleResume()
99+
{
100+
$this->loop->addWriteStream($this->socket, array($this, 'onWritable'));
101+
}
102+
103+
protected function handleWrite($data, $remoteAddress)
104+
{
105+
if ($remoteAddress === null) {
106+
// do not use fwrite() as it obeys the stream buffer size and
107+
// packets are not to be split at 8kb
108+
$ret = @stream_socket_sendto($this->socket, $data);
109+
} else {
110+
$ret = @stream_socket_sendto($this->socket, $data, 0, $remoteAddress);
111+
}
112+
113+
if ($ret < 0) {
114+
$error = error_get_last();
115+
throw new Exception('Unable to send packet: ' . trim($error['message']));
116+
}
117+
}
96118
}

Datagram/Socket.php

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,16 @@ class Socket extends EventEmitter implements SocketInterface
1414

1515
public $bufferSize = 65536;
1616

17-
public function __construct(LoopInterface $loop, $socket)
17+
public function __construct(LoopInterface $loop, $socket, Buffer $buffer = null)
1818
{
1919
$this->loop = $loop;
2020
$this->socket = $socket;
2121

22-
$this->buffer = new Buffer($loop, $socket);
22+
if ($buffer === null) {
23+
$buffer = new Buffer($loop, $socket);
24+
}
25+
$this->buffer = $buffer;
26+
2327
$that = $this;
2428
$this->buffer->on('error', function ($error) use ($that) {
2529
$that->emit('error', array($error, $that));
@@ -65,21 +69,18 @@ public function resume()
6569
}
6670
}
6771

68-
public function onReceive($message)
72+
public function onReceive()
6973
{
70-
$data = stream_socket_recvfrom($this->socket, $this->bufferSize, 0, $peer);
71-
72-
if ($data === false) {
73-
// receiving data failed => remote side rejected one of our packets
74-
// due to the nature of UDP, there's no way to tell which one exactly
75-
// $peer is not filled either
76-
74+
try {
75+
$data = $this->handleReceive($peer);
76+
}
77+
catch (Exception $e) {
7778
// emit error message and local socket
78-
$this->emit('error', array(new \Exception('Invalid message'), $this));
79+
$this->emit('error', array($e, $this));
7980
return;
8081
}
8182

82-
$this->emit('message', array($data, $this->sanitizeAddress($peer), $this));
83+
$this->emit('message', array($data, $peer, $this));
8384
}
8485

8586
public function close()
@@ -91,7 +92,7 @@ public function close()
9192
$this->emit('close', array($this));
9293
$this->pause();
9394

94-
fclose($this->socket);
95+
$this->handleClose();
9596
$this->socket = false;
9697
$this->buffer->close();
9798

@@ -118,4 +119,26 @@ private function sanitizeAddress($address)
118119
}
119120
return $address;
120121
}
122+
123+
protected function handleReceive(&$peerAddress)
124+
{
125+
$data = stream_socket_recvfrom($this->socket, $this->bufferSize, 0, $peerAddress);
126+
127+
if ($data === false) {
128+
// receiving data failed => remote side rejected one of our packets
129+
// due to the nature of UDP, there's no way to tell which one exactly
130+
// $peer is not filled either
131+
132+
throw new \Exception('Invalid message');
133+
}
134+
135+
$peerAddress = $this->sanitizeAddress($peerAddress);
136+
137+
return $data;
138+
}
139+
140+
protected function handleClose()
141+
{
142+
fclose($this->socket);
143+
}
121144
}

0 commit comments

Comments
 (0)