1: <?php
2:
3: namespace React\Socket;
4:
5: use Evenement\EventEmitter;
6:
7: /**
8: * The `LimitingServer` decorator wraps a given `ServerInterface` and is responsible
9: * for limiting and keeping track of open connections to this server instance.
10: *
11: * Whenever the underlying server emits a `connection` event, it will check its
12: * limits and then either
13: * - keep track of this connection by adding it to the list of
14: * open connections and then forward the `connection` event
15: * - or reject (close) the connection when its limits are exceeded and will
16: * forward an `error` event instead.
17: *
18: * Whenever a connection closes, it will remove this connection from the list of
19: * open connections.
20: *
21: * ```php
22: * $server = new LimitingServer($server, 100);
23: * $server->on('connection', function (ConnectionInterface $connection) {
24: * $connection->write('hello there!' . PHP_EOL);
25: * …
26: * });
27: * ```
28: *
29: * See also the `ServerInterface` for more details.
30: *
31: * @see ServerInterface
32: * @see ConnectionInterface
33: */
34: class LimitingServer extends EventEmitter implements ServerInterface
35: {
36: private $connections = array();
37: private $server;
38: private $limit;
39:
40: private $pauseOnLimit = false;
41: private $autoPaused = false;
42: private $manuPaused = false;
43:
44: /**
45: * Instantiates a new LimitingServer.
46: *
47: * You have to pass a maximum number of open connections to ensure
48: * the server will automatically reject (close) connections once this limit
49: * is exceeded. In this case, it will emit an `error` event to inform about
50: * this and no `connection` event will be emitted.
51: *
52: * ```php
53: * $server = new LimitingServer($server, 100);
54: * $server->on('connection', function (ConnectionInterface $connection) {
55: * $connection->write('hello there!' . PHP_EOL);
56: * …
57: * });
58: * ```
59: *
60: * You MAY pass a `null` limit in order to put no limit on the number of
61: * open connections and keep accepting new connection until you run out of
62: * operating system resources (such as open file handles). This may be
63: * useful it you do not want to take care of applying a limit but still want
64: * to use the `getConnections()` method.
65: *
66: * You can optionally configure the server to pause accepting new
67: * connections once the connection limit is reached. In this case, it will
68: * pause the underlying server and no longer process any new connections at
69: * all, thus also no longer closing any excessive connections.
70: * The underlying operating system is responsible for keeping a backlog of
71: * pending connections until its limit is reached, at which point it will
72: * start rejecting further connections.
73: * Once the server is below the connection limit, it will continue consuming
74: * connections from the backlog and will process any outstanding data on
75: * each connection.
76: * This mode may be useful for some protocols that are designed to wait for
77: * a response message (such as HTTP), but may be less useful for other
78: * protocols that demand immediate responses (such as a "welcome" message in
79: * an interactive chat).
80: *
81: * ```php
82: * $server = new LimitingServer($server, 100, true);
83: * $server->on('connection', function (ConnectionInterface $connection) {
84: * $connection->write('hello there!' . PHP_EOL);
85: * …
86: * });
87: * ```
88: *
89: * @param ServerInterface $server
90: * @param int|null $connectionLimit
91: * @param bool $pauseOnLimit
92: */
93: public function __construct(ServerInterface $server, $connectionLimit, $pauseOnLimit = false)
94: {
95: $this->server = $server;
96: $this->limit = $connectionLimit;
97: if ($connectionLimit !== null) {
98: $this->pauseOnLimit = $pauseOnLimit;
99: }
100:
101: $this->server->on('connection', array($this, 'handleConnection'));
102: $this->server->on('error', array($this, 'handleError'));
103: }
104:
105: /**
106: * Returns an array with all currently active connections
107: *
108: * ```php
109: * foreach ($server->getConnection() as $connection) {
110: * $connection->write('Hi!');
111: * }
112: * ```
113: *
114: * @return ConnectionInterface[]
115: */
116: public function getConnections()
117: {
118: return $this->connections;
119: }
120:
121: public function getAddress()
122: {
123: return $this->server->getAddress();
124: }
125:
126: public function pause()
127: {
128: if (!$this->manuPaused) {
129: $this->manuPaused = true;
130:
131: if (!$this->autoPaused) {
132: $this->server->pause();
133: }
134: }
135: }
136:
137: public function resume()
138: {
139: if ($this->manuPaused) {
140: $this->manuPaused = false;
141:
142: if (!$this->autoPaused) {
143: $this->server->resume();
144: }
145: }
146: }
147:
148: public function close()
149: {
150: $this->server->close();
151: }
152:
153: /** @internal */
154: public function handleConnection(ConnectionInterface $connection)
155: {
156: // close connection if limit exceeded
157: if ($this->limit !== null && count($this->connections) >= $this->limit) {
158: $this->handleError(new \OverflowException('Connection closed because server reached connection limit'));
159: $connection->close();
160: return;
161: }
162:
163: $this->connections[] = $connection;
164: $that = $this;
165: $connection->on('close', function () use ($that, $connection) {
166: $that->handleDisconnection($connection);
167: });
168:
169: // pause accepting new connections if limit exceeded
170: if ($this->pauseOnLimit && !$this->autoPaused && count($this->connections) >= $this->limit) {
171: $this->autoPaused = true;
172:
173: if (!$this->manuPaused) {
174: $this->server->pause();
175: }
176: }
177:
178: $this->emit('connection', array($connection));
179: }
180:
181: /** @internal */
182: public function handleDisconnection(ConnectionInterface $connection)
183: {
184: unset($this->connections[array_search($connection, $this->connections)]);
185:
186: // continue accepting new connection if below limit
187: if ($this->autoPaused && count($this->connections) < $this->limit) {
188: $this->autoPaused = false;
189:
190: if (!$this->manuPaused) {
191: $this->server->resume();
192: }
193: }
194: }
195:
196: /** @internal */
197: public function handleError(\Exception $error)
198: {
199: $this->emit('error', array($error));
200: }
201: }
202: