1: <?php
2: namespace Ratchet\Wamp;
3: use Ratchet\ConnectionInterface;
4: use Ratchet\WebSocket\WsServerInterface;
5:
6: class TopicManager implements WsServerInterface, WampServerInterface {
7: 8: 9:
10: protected $app;
11:
12: 13: 14:
15: protected $topicLookup = array();
16:
17: public function __construct(WampServerInterface $app) {
18: $this->app = $app;
19: }
20:
21: 22: 23:
24: public function onOpen(ConnectionInterface $conn) {
25: $conn->WAMP->subscriptions = new \SplObjectStorage;
26: $this->app->onOpen($conn);
27: }
28:
29: 30: 31:
32: public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
33: $this->app->onCall($conn, $id, $this->getTopic($topic), $params);
34: }
35:
36: 37: 38:
39: public function onSubscribe(ConnectionInterface $conn, $topic) {
40: $topicObj = $this->getTopic($topic);
41:
42: if ($conn->WAMP->subscriptions->contains($topicObj)) {
43: return;
44: }
45:
46: $this->topicLookup[$topic]->add($conn);
47: $conn->WAMP->subscriptions->attach($topicObj);
48: $this->app->onSubscribe($conn, $topicObj);
49: }
50:
51: 52: 53:
54: public function onUnsubscribe(ConnectionInterface $conn, $topic) {
55: $topicObj = $this->getTopic($topic);
56:
57: if (!$conn->WAMP->subscriptions->contains($topicObj)) {
58: return;
59: }
60:
61: $this->cleanTopic($topicObj, $conn);
62:
63: $this->app->onUnsubscribe($conn, $topicObj);
64: }
65:
66: 67: 68:
69: public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
70: $this->app->onPublish($conn, $this->getTopic($topic), $event, $exclude, $eligible);
71: }
72:
73: 74: 75:
76: public function onClose(ConnectionInterface $conn) {
77: $this->app->onClose($conn);
78:
79: foreach ($this->topicLookup as $topic) {
80: $this->cleanTopic($topic, $conn);
81: }
82: }
83:
84: 85: 86:
87: public function onError(ConnectionInterface $conn, \Exception $e) {
88: $this->app->onError($conn, $e);
89: }
90:
91: 92: 93:
94: public function getSubProtocols() {
95: if ($this->app instanceof WsServerInterface) {
96: return $this->app->getSubProtocols();
97: }
98:
99: return array();
100: }
101:
102: 103: 104: 105:
106: protected function getTopic($topic) {
107: if (!array_key_exists($topic, $this->topicLookup)) {
108: $this->topicLookup[$topic] = new Topic($topic);
109: }
110:
111: return $this->topicLookup[$topic];
112: }
113:
114: protected function cleanTopic(Topic $topic, ConnectionInterface $conn) {
115: if ($conn->WAMP->subscriptions->contains($topic)) {
116: $conn->WAMP->subscriptions->detach($topic);
117: }
118:
119: $this->topicLookup[$topic->getId()]->remove($conn);
120:
121: if ($topic->autoDelete && 0 === $topic->count()) {
122: unset($this->topicLookup[$topic->getId()]);
123: }
124: }
125: }
126: