1: <?php
2: namespace Ratchet\WebSocket;
3: use Ratchet\ComponentInterface;
4: use Ratchet\ConnectionInterface;
5: use Ratchet\MessageComponentInterface as DataComponentInterface;
6: use Ratchet\Http\HttpServerInterface;
7: use Ratchet\Http\CloseResponseTrait;
8: use Psr\Http\Message\RequestInterface;
9: use Ratchet\RFC6455\Messaging\MessageInterface;
10: use Ratchet\RFC6455\Messaging\FrameInterface;
11: use Ratchet\RFC6455\Messaging\Frame;
12: use Ratchet\RFC6455\Messaging\MessageBuffer;
13: use Ratchet\RFC6455\Messaging\CloseFrameChecker;
14: use Ratchet\RFC6455\Handshake\ServerNegotiator;
15: use Ratchet\RFC6455\Handshake\RequestVerifier;
16: use React\EventLoop\LoopInterface;
17: use GuzzleHttp\Psr7 as gPsr;
18:
19: 20: 21: 22: 23: 24:
25: class WsServer implements HttpServerInterface {
26: use CloseResponseTrait;
27:
28: 29: 30: 31:
32: private $delegate;
33:
34: 35: 36:
37: protected $connections;
38:
39: 40: 41:
42: private $closeFrameChecker;
43:
44: 45: 46:
47: private $handshakeNegotiator;
48:
49: 50: 51:
52: private $ueFlowFactory;
53:
54: 55: 56:
57: private $pongReceiver;
58:
59: 60: 61:
62: private $msgCb;
63:
64: 65: 66: 67:
68: public function __construct(ComponentInterface $component) {
69: if ($component instanceof MessageComponentInterface) {
70: $this->msgCb = function(ConnectionInterface $conn, MessageInterface $msg) {
71: $this->delegate->onMessage($conn, $msg);
72: };
73: } elseif ($component instanceof DataComponentInterface) {
74: $this->msgCb = function(ConnectionInterface $conn, MessageInterface $msg) {
75: $this->delegate->onMessage($conn, $msg->getPayload());
76: };
77: } else {
78: throw new \UnexpectedValueException('Expected instance of \Ratchet\WebSocket\MessageComponentInterface or \Ratchet\MessageComponentInterface');
79: }
80:
81: if (3 !== strlen('✓')) {
82: throw new \DomainException('Bad encoding, length of unicode character ✓ should be 3. Ensure charset UTF-8 and check ini val mbstring.func_autoload');
83: }
84:
85: $this->delegate = $component;
86: $this->connections = new \SplObjectStorage;
87:
88: $this->closeFrameChecker = new CloseFrameChecker;
89: $this->handshakeNegotiator = new ServerNegotiator(new RequestVerifier);
90: $this->handshakeNegotiator->setStrictSubProtocolCheck(true);
91:
92: if ($component instanceof WsServerInterface) {
93: $this->handshakeNegotiator->setSupportedSubProtocols($component->getSubProtocols());
94: }
95:
96: $this->pongReceiver = function() {};
97:
98: $reusableUnderflowException = new \UnderflowException;
99: $this->ueFlowFactory = function() use ($reusableUnderflowException) {
100: return $reusableUnderflowException;
101: };
102: }
103:
104: 105: 106:
107: public function onOpen(ConnectionInterface $conn, RequestInterface $request = null) {
108: if (null === $request) {
109: throw new \UnexpectedValueException('$request can not be null');
110: }
111:
112: $conn->httpRequest = $request;
113:
114: $conn->WebSocket = new \StdClass;
115: $conn->WebSocket->closing = false;
116:
117: $response = $this->handshakeNegotiator->handshake($request)->withHeader('X-Powered-By', \Ratchet\VERSION);
118:
119: $conn->send(gPsr\str($response));
120:
121: if (101 !== $response->getStatusCode()) {
122: return $conn->close();
123: }
124:
125: $wsConn = new WsConnection($conn);
126:
127: $streamer = new MessageBuffer(
128: $this->closeFrameChecker,
129: function(MessageInterface $msg) use ($wsConn) {
130: $cb = $this->msgCb;
131: $cb($wsConn, $msg);
132: },
133: function(FrameInterface $frame) use ($wsConn) {
134: $this->onControlFrame($frame, $wsConn);
135: },
136: true,
137: $this->ueFlowFactory
138: );
139:
140: $this->connections->attach($conn, new ConnContext($wsConn, $streamer));
141:
142: return $this->delegate->onOpen($wsConn);
143: }
144:
145: 146: 147:
148: public function onMessage(ConnectionInterface $from, $msg) {
149: if ($from->WebSocket->closing) {
150: return;
151: }
152:
153: $this->connections[$from]->buffer->onData($msg);
154: }
155:
156: 157: 158:
159: public function onClose(ConnectionInterface $conn) {
160: if ($this->connections->contains($conn)) {
161: $context = $this->connections[$conn];
162: $this->connections->detach($conn);
163:
164: $this->delegate->onClose($context->connection);
165: }
166: }
167:
168: 169: 170:
171: public function onError(ConnectionInterface $conn, \Exception $e) {
172: if ($this->connections->contains($conn)) {
173: $this->delegate->onError($this->connections[$conn]->connection, $e);
174: } else {
175: $conn->close();
176: }
177: }
178:
179: public function onControlFrame(FrameInterface $frame, WsConnection $conn) {
180: switch ($frame->getOpCode()) {
181: case Frame::OP_CLOSE:
182: $conn->close($frame);
183: break;
184: case Frame::OP_PING:
185: $conn->send(new Frame($frame->getPayload(), true, Frame::OP_PONG));
186: break;
187: case Frame::OP_PONG:
188: $pongReceiver = $this->pongReceiver;
189: $pongReceiver($frame, $conn);
190: break;
191: }
192: }
193:
194: public function setStrictSubProtocolCheck($enable) {
195: $this->handshakeNegotiator->setStrictSubProtocolCheck($enable);
196: }
197:
198: public function enableKeepAlive(LoopInterface $loop, $interval = 30) {
199: $lastPing = new Frame(uniqid(), true, Frame::OP_PING);
200: $pingedConnections = new \SplObjectStorage;
201: $splClearer = new \SplObjectStorage;
202:
203: $this->pongReceiver = function(FrameInterface $frame, $wsConn) use ($pingedConnections, &$lastPing) {
204: if ($frame->getPayload() === $lastPing->getPayload()) {
205: $pingedConnections->detach($wsConn);
206: }
207: };
208:
209: $loop->addPeriodicTimer((int)$interval, function() use ($pingedConnections, &$lastPing, $splClearer) {
210: foreach ($pingedConnections as $wsConn) {
211: $wsConn->close();
212: }
213: $pingedConnections->removeAllExcept($splClearer);
214:
215: $lastPing = new Frame(uniqid(), true, Frame::OP_PING);
216:
217: foreach ($this->connections as $key => $conn) {
218: $wsConn = $this->connections[$conn]->connection;
219:
220: $wsConn->send($lastPing);
221: $pingedConnections->attach($wsConn);
222: }
223: });
224: }
225: }
226: