1: <?php
2:
3: namespace React\EventLoop;
4:
5: use React\EventLoop\Tick\FutureTickQueue;
6: use React\EventLoop\Tick\NextTickQueue;
7: use React\EventLoop\Timer\Timer;
8: use React\EventLoop\Timer\TimerInterface;
9: use React\EventLoop\Timer\Timers;
10:
11: 12: 13:
14: class StreamSelectLoop implements LoopInterface
15: {
16: const MICROSECONDS_PER_SECOND = 1000000;
17:
18: private $nextTickQueue;
19: private $futureTickQueue;
20: private $timers;
21: private $readStreams = [];
22: private $readListeners = [];
23: private $writeStreams = [];
24: private $writeListeners = [];
25: private $running;
26:
27: public function __construct()
28: {
29: $this->nextTickQueue = new NextTickQueue($this);
30: $this->futureTickQueue = new FutureTickQueue($this);
31: $this->timers = new Timers();
32: }
33:
34: 35: 36:
37: public function addReadStream($stream, callable $listener)
38: {
39: $key = (int) $stream;
40:
41: if (!isset($this->readStreams[$key])) {
42: $this->readStreams[$key] = $stream;
43: $this->readListeners[$key] = $listener;
44: }
45: }
46:
47: 48: 49:
50: public function addWriteStream($stream, callable $listener)
51: {
52: $key = (int) $stream;
53:
54: if (!isset($this->writeStreams[$key])) {
55: $this->writeStreams[$key] = $stream;
56: $this->writeListeners[$key] = $listener;
57: }
58: }
59:
60: 61: 62:
63: public function removeReadStream($stream)
64: {
65: $key = (int) $stream;
66:
67: unset(
68: $this->readStreams[$key],
69: $this->readListeners[$key]
70: );
71: }
72:
73: 74: 75:
76: public function removeWriteStream($stream)
77: {
78: $key = (int) $stream;
79:
80: unset(
81: $this->writeStreams[$key],
82: $this->writeListeners[$key]
83: );
84: }
85:
86: 87: 88:
89: public function removeStream($stream)
90: {
91: $this->removeReadStream($stream);
92: $this->removeWriteStream($stream);
93: }
94:
95: 96: 97:
98: public function addTimer($interval, callable $callback)
99: {
100: $timer = new Timer($this, $interval, $callback, false);
101:
102: $this->timers->add($timer);
103:
104: return $timer;
105: }
106:
107: 108: 109:
110: public function addPeriodicTimer($interval, callable $callback)
111: {
112: $timer = new Timer($this, $interval, $callback, true);
113:
114: $this->timers->add($timer);
115:
116: return $timer;
117: }
118:
119: 120: 121:
122: public function cancelTimer(TimerInterface $timer)
123: {
124: $this->timers->cancel($timer);
125: }
126:
127: 128: 129:
130: public function isTimerActive(TimerInterface $timer)
131: {
132: return $this->timers->contains($timer);
133: }
134:
135: 136: 137:
138: public function nextTick(callable $listener)
139: {
140: $this->nextTickQueue->add($listener);
141: }
142:
143: 144: 145:
146: public function futureTick(callable $listener)
147: {
148: $this->futureTickQueue->add($listener);
149: }
150:
151: 152: 153:
154: public function tick()
155: {
156: $this->nextTickQueue->tick();
157:
158: $this->futureTickQueue->tick();
159:
160: $this->timers->tick();
161:
162: $this->waitForStreamActivity(0);
163: }
164:
165: 166: 167:
168: public function run()
169: {
170: $this->running = true;
171:
172: while ($this->running) {
173: $this->nextTickQueue->tick();
174:
175: $this->futureTickQueue->tick();
176:
177: $this->timers->tick();
178:
179:
180: if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
181: $timeout = 0;
182:
183:
184: } elseif ($scheduledAt = $this->timers->getFirst()) {
185: $timeout = $scheduledAt - $this->timers->getTime();
186: if ($timeout < 0) {
187: $timeout = 0;
188: } else {
189: 190: 191: 192:
193: $timeout = round($timeout * self::MICROSECONDS_PER_SECOND);
194: }
195:
196:
197: } elseif ($this->readStreams || $this->writeStreams) {
198: $timeout = null;
199:
200:
201: } else {
202: break;
203: }
204:
205: $this->waitForStreamActivity($timeout);
206: }
207: }
208:
209: 210: 211:
212: public function stop()
213: {
214: $this->running = false;
215: }
216:
217: 218: 219:
220: private function waitForStreamActivity($timeout)
221: {
222: $read = $this->readStreams;
223: $write = $this->writeStreams;
224:
225: $available = $this->streamSelect($read, $write, $timeout);
226: if (false === $available) {
227:
228:
229: return;
230: }
231:
232: foreach ($read as $stream) {
233: $key = (int) $stream;
234:
235: if (isset($this->readListeners[$key])) {
236: call_user_func($this->readListeners[$key], $stream, $this);
237: }
238: }
239:
240: foreach ($write as $stream) {
241: $key = (int) $stream;
242:
243: if (isset($this->writeListeners[$key])) {
244: call_user_func($this->writeListeners[$key], $stream, $this);
245: }
246: }
247: }
248:
249: 250: 251: 252: 253: 254: 255: 256: 257: 258: 259:
260: protected function streamSelect(array &$read, array &$write, $timeout)
261: {
262: if ($read || $write) {
263: $except = null;
264:
265:
266: return @stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
267: }
268:
269: $timeout && usleep($timeout);
270:
271: return 0;
272: }
273: }
274: