1: <?php
2:
3: namespace React\EventLoop;
4:
5: use Event;
6: use EventBase;
7: use React\EventLoop\Tick\FutureTickQueue;
8: use React\EventLoop\Tick\NextTickQueue;
9: use React\EventLoop\Timer\Timer;
10: use React\EventLoop\Timer\TimerInterface;
11: use SplObjectStorage;
12:
13: 14: 15:
16: class LibEventLoop implements LoopInterface
17: {
18: const MICROSECONDS_PER_SECOND = 1000000;
19:
20: private $eventBase;
21: private $nextTickQueue;
22: private $futureTickQueue;
23: private $timerCallback;
24: private $timerEvents;
25: private $streamCallback;
26: private $streamEvents = [];
27: private $streamFlags = [];
28: private $readListeners = [];
29: private $writeListeners = [];
30: private $running;
31:
32: public function __construct()
33: {
34: $this->eventBase = event_base_new();
35: $this->nextTickQueue = new NextTickQueue($this);
36: $this->futureTickQueue = new FutureTickQueue($this);
37: $this->timerEvents = new SplObjectStorage();
38:
39: $this->createTimerCallback();
40: $this->createStreamCallback();
41: }
42:
43: 44: 45:
46: public function addReadStream($stream, callable $listener)
47: {
48: $key = (int) $stream;
49:
50: if (!isset($this->readListeners[$key])) {
51: $this->readListeners[$key] = $listener;
52: $this->subscribeStreamEvent($stream, EV_READ);
53: }
54: }
55:
56: 57: 58:
59: public function addWriteStream($stream, callable $listener)
60: {
61: $key = (int) $stream;
62:
63: if (!isset($this->writeListeners[$key])) {
64: $this->writeListeners[$key] = $listener;
65: $this->subscribeStreamEvent($stream, EV_WRITE);
66: }
67: }
68:
69: 70: 71:
72: public function removeReadStream($stream)
73: {
74: $key = (int) $stream;
75:
76: if (isset($this->readListeners[$key])) {
77: unset($this->readListeners[$key]);
78: $this->unsubscribeStreamEvent($stream, EV_READ);
79: }
80: }
81:
82: 83: 84:
85: public function removeWriteStream($stream)
86: {
87: $key = (int) $stream;
88:
89: if (isset($this->writeListeners[$key])) {
90: unset($this->writeListeners[$key]);
91: $this->unsubscribeStreamEvent($stream, EV_WRITE);
92: }
93: }
94:
95: 96: 97:
98: public function removeStream($stream)
99: {
100: $key = (int) $stream;
101:
102: if (isset($this->streamEvents[$key])) {
103: $event = $this->streamEvents[$key];
104:
105: event_del($event);
106: event_free($event);
107:
108: unset(
109: $this->streamFlags[$key],
110: $this->streamEvents[$key],
111: $this->readListeners[$key],
112: $this->writeListeners[$key]
113: );
114: }
115: }
116:
117: 118: 119:
120: public function addTimer($interval, callable $callback)
121: {
122: $timer = new Timer($this, $interval, $callback, false);
123:
124: $this->scheduleTimer($timer);
125:
126: return $timer;
127: }
128:
129: 130: 131:
132: public function addPeriodicTimer($interval, callable $callback)
133: {
134: $timer = new Timer($this, $interval, $callback, true);
135:
136: $this->scheduleTimer($timer);
137:
138: return $timer;
139: }
140:
141: 142: 143:
144: public function cancelTimer(TimerInterface $timer)
145: {
146: if ($this->isTimerActive($timer)) {
147: $event = $this->timerEvents[$timer];
148:
149: event_del($event);
150: event_free($event);
151:
152: $this->timerEvents->detach($timer);
153: }
154: }
155:
156: 157: 158:
159: public function isTimerActive(TimerInterface $timer)
160: {
161: return $this->timerEvents->contains($timer);
162: }
163:
164: 165: 166:
167: public function nextTick(callable $listener)
168: {
169: $this->nextTickQueue->add($listener);
170: }
171:
172: 173: 174:
175: public function futureTick(callable $listener)
176: {
177: $this->futureTickQueue->add($listener);
178: }
179:
180: 181: 182:
183: public function tick()
184: {
185: $this->nextTickQueue->tick();
186:
187: $this->futureTickQueue->tick();
188:
189: event_base_loop($this->eventBase, EVLOOP_ONCE | EVLOOP_NONBLOCK);
190: }
191:
192: 193: 194:
195: public function run()
196: {
197: $this->running = true;
198:
199: while ($this->running) {
200: $this->nextTickQueue->tick();
201:
202: $this->futureTickQueue->tick();
203:
204: $flags = EVLOOP_ONCE;
205: if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
206: $flags |= EVLOOP_NONBLOCK;
207: } elseif (!$this->streamEvents && !$this->timerEvents->count()) {
208: break;
209: }
210:
211: event_base_loop($this->eventBase, $flags);
212: }
213: }
214:
215: 216: 217:
218: public function stop()
219: {
220: $this->running = false;
221: }
222:
223: 224: 225: 226: 227:
228: private function scheduleTimer(TimerInterface $timer)
229: {
230: $this->timerEvents[$timer] = $event = event_timer_new();
231:
232: event_timer_set($event, $this->timerCallback, $timer);
233: event_base_set($event, $this->eventBase);
234: event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND);
235: }
236:
237: 238: 239: 240: 241: 242:
243: private function subscribeStreamEvent($stream, $flag)
244: {
245: $key = (int) $stream;
246:
247: if (isset($this->streamEvents[$key])) {
248: $event = $this->streamEvents[$key];
249: $flags = $this->streamFlags[$key] |= $flag;
250:
251: event_del($event);
252: event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
253: } else {
254: $event = event_new();
255:
256: event_set($event, $stream, EV_PERSIST | $flag, $this->streamCallback);
257: event_base_set($event, $this->eventBase);
258:
259: $this->streamEvents[$key] = $event;
260: $this->streamFlags[$key] = $flag;
261: }
262:
263: event_add($event);
264: }
265:
266: 267: 268: 269: 270: 271: 272:
273: private function unsubscribeStreamEvent($stream, $flag)
274: {
275: $key = (int) $stream;
276:
277: $flags = $this->streamFlags[$key] &= ~$flag;
278:
279: if (0 === $flags) {
280: $this->removeStream($stream);
281:
282: return;
283: }
284:
285: $event = $this->streamEvents[$key];
286:
287: event_del($event);
288: event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
289: event_add($event);
290: }
291:
292: 293: 294: 295: 296: 297: 298:
299: private function createTimerCallback()
300: {
301: $this->timerCallback = function ($_, $__, $timer) {
302: call_user_func($timer->getCallback(), $timer);
303:
304:
305: if (!$this->isTimerActive($timer)) {
306: return;
307:
308:
309: } elseif ($timer->isPeriodic()) {
310: event_add(
311: $this->timerEvents[$timer],
312: $timer->getInterval() * self::MICROSECONDS_PER_SECOND
313: );
314:
315:
316: } else {
317: $this->cancelTimer($timer);
318: }
319: };
320: }
321:
322: 323: 324: 325: 326: 327: 328:
329: private function createStreamCallback()
330: {
331: $this->streamCallback = function ($stream, $flags) {
332: $key = (int) $stream;
333:
334: if (EV_READ === (EV_READ & $flags) && isset($this->readListeners[$key])) {
335: call_user_func($this->readListeners[$key], $stream, $this);
336: }
337:
338: if (EV_WRITE === (EV_WRITE & $flags) && isset($this->writeListeners[$key])) {
339: call_user_func($this->writeListeners[$key], $stream, $this);
340: }
341: };
342: }
343: }
344: