1: <?php
2:
3: namespace React\EventLoop;
4:
5: use Event;
6: use EventBase;
7: use EventConfig as EventBaseConfig;
8: use React\EventLoop\Tick\FutureTickQueue;
9: use React\EventLoop\Tick\NextTickQueue;
10: use React\EventLoop\Timer\Timer;
11: use React\EventLoop\Timer\TimerInterface;
12: use SplObjectStorage;
13:
14: 15: 16:
17: class ExtEventLoop implements LoopInterface
18: {
19: private $eventBase;
20: private $nextTickQueue;
21: private $futureTickQueue;
22: private $timerCallback;
23: private $timerEvents;
24: private $streamCallback;
25: private $streamEvents = [];
26: private $streamFlags = [];
27: private $readListeners = [];
28: private $writeListeners = [];
29: private $running;
30:
31: public function __construct(EventBaseConfig $config = null)
32: {
33: $this->eventBase = new EventBase($config);
34: $this->nextTickQueue = new NextTickQueue($this);
35: $this->futureTickQueue = new FutureTickQueue($this);
36: $this->timerEvents = new SplObjectStorage();
37:
38: $this->createTimerCallback();
39: $this->createStreamCallback();
40: }
41:
42: 43: 44:
45: public function addReadStream($stream, callable $listener)
46: {
47: $key = (int) $stream;
48:
49: if (!isset($this->readListeners[$key])) {
50: $this->readListeners[$key] = $listener;
51: $this->subscribeStreamEvent($stream, Event::READ);
52: }
53: }
54:
55: 56: 57:
58: public function addWriteStream($stream, callable $listener)
59: {
60: $key = (int) $stream;
61:
62: if (!isset($this->writeListeners[$key])) {
63: $this->writeListeners[$key] = $listener;
64: $this->subscribeStreamEvent($stream, Event::WRITE);
65: }
66: }
67:
68: 69: 70:
71: public function removeReadStream($stream)
72: {
73: $key = (int) $stream;
74:
75: if (isset($this->readListeners[$key])) {
76: unset($this->readListeners[$key]);
77: $this->unsubscribeStreamEvent($stream, Event::READ);
78: }
79: }
80:
81: 82: 83:
84: public function removeWriteStream($stream)
85: {
86: $key = (int) $stream;
87:
88: if (isset($this->writeListeners[$key])) {
89: unset($this->writeListeners[$key]);
90: $this->unsubscribeStreamEvent($stream, Event::WRITE);
91: }
92: }
93:
94: 95: 96:
97: public function removeStream($stream)
98: {
99: $key = (int) $stream;
100:
101: if (isset($this->streamEvents[$key])) {
102: $this->streamEvents[$key]->free();
103:
104: unset(
105: $this->streamFlags[$key],
106: $this->streamEvents[$key],
107: $this->readListeners[$key],
108: $this->writeListeners[$key]
109: );
110: }
111: }
112:
113: 114: 115:
116: public function addTimer($interval, callable $callback)
117: {
118: $timer = new Timer($this, $interval, $callback, false);
119:
120: $this->scheduleTimer($timer);
121:
122: return $timer;
123: }
124:
125: 126: 127:
128: public function addPeriodicTimer($interval, callable $callback)
129: {
130: $timer = new Timer($this, $interval, $callback, true);
131:
132: $this->scheduleTimer($timer);
133:
134: return $timer;
135: }
136:
137: 138: 139:
140: public function cancelTimer(TimerInterface $timer)
141: {
142: if ($this->isTimerActive($timer)) {
143: $this->timerEvents[$timer]->free();
144: $this->timerEvents->detach($timer);
145: }
146: }
147:
148: 149: 150:
151: public function isTimerActive(TimerInterface $timer)
152: {
153: return $this->timerEvents->contains($timer);
154: }
155:
156: 157: 158:
159: public function nextTick(callable $listener)
160: {
161: $this->nextTickQueue->add($listener);
162: }
163:
164: 165: 166:
167: public function futureTick(callable $listener)
168: {
169: $this->futureTickQueue->add($listener);
170: }
171:
172: 173: 174:
175: public function tick()
176: {
177: $this->nextTickQueue->tick();
178:
179: $this->futureTickQueue->tick();
180:
181:
182: @$this->eventBase->loop(EventBase::LOOP_ONCE | EventBase::LOOP_NONBLOCK);
183: }
184:
185: 186: 187:
188: public function run()
189: {
190: $this->running = true;
191:
192: while ($this->running) {
193: $this->nextTickQueue->tick();
194:
195: $this->futureTickQueue->tick();
196:
197: $flags = EventBase::LOOP_ONCE;
198: if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
199: $flags |= EventBase::LOOP_NONBLOCK;
200: } elseif (!$this->streamEvents && !$this->timerEvents->count()) {
201: break;
202: }
203:
204: $this->eventBase->loop($flags);
205: }
206: }
207:
208: 209: 210:
211: public function stop()
212: {
213: $this->running = false;
214: }
215:
216: 217: 218: 219: 220:
221: private function scheduleTimer(TimerInterface $timer)
222: {
223: $flags = Event::TIMEOUT;
224:
225: if ($timer->isPeriodic()) {
226: $flags |= Event::PERSIST;
227: }
228:
229: $event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
230: $this->timerEvents[$timer] = $event;
231:
232: $event->add($timer->getInterval());
233: }
234:
235: 236: 237: 238: 239: 240:
241: private function subscribeStreamEvent($stream, $flag)
242: {
243: $key = (int) $stream;
244:
245: if (isset($this->streamEvents[$key])) {
246: $event = $this->streamEvents[$key];
247: $flags = ($this->streamFlags[$key] |= $flag);
248:
249: $event->del();
250: $event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
251: } else {
252: $event = new Event($this->eventBase, $stream, Event::PERSIST | $flag, $this->streamCallback);
253:
254: $this->streamEvents[$key] = $event;
255: $this->streamFlags[$key] = $flag;
256: }
257:
258: $event->add();
259: }
260:
261: 262: 263: 264: 265: 266: 267:
268: private function unsubscribeStreamEvent($stream, $flag)
269: {
270: $key = (int) $stream;
271:
272: $flags = $this->streamFlags[$key] &= ~$flag;
273:
274: if (0 === $flags) {
275: $this->removeStream($stream);
276:
277: return;
278: }
279:
280: $event = $this->streamEvents[$key];
281:
282: $event->del();
283: $event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
284: $event->add();
285: }
286:
287: 288: 289: 290: 291: 292: 293:
294: private function createTimerCallback()
295: {
296: $this->timerCallback = function ($_, $__, $timer) {
297: call_user_func($timer->getCallback(), $timer);
298:
299: if (!$timer->isPeriodic() && $this->isTimerActive($timer)) {
300: $this->cancelTimer($timer);
301: }
302: };
303: }
304:
305: 306: 307: 308: 309: 310: 311:
312: private function createStreamCallback()
313: {
314: $this->streamCallback = function ($stream, $flags) {
315: $key = (int) $stream;
316:
317: if (Event::READ === (Event::READ & $flags) && isset($this->readListeners[$key])) {
318: call_user_func($this->readListeners[$key], $stream, $this);
319: }
320:
321: if (Event::WRITE === (Event::WRITE & $flags) && isset($this->writeListeners[$key])) {
322: call_user_func($this->writeListeners[$key], $stream, $this);
323: }
324: };
325: }
326: }
327: