Overview

Namespaces

  • Evenement
  • None
  • PHP
  • Psr
    • Http
      • Message
  • Ratchet
    • Http
    • RFC6455
      • Handshake
      • Messaging
    • Server
    • Session
      • Serialize
      • Storage
        • Proxy
    • Wamp
    • WebSocket
  • React
    • EventLoop
      • Tick
      • Timer
    • Socket
    • Stream
  • Symfony
    • Component
      • HttpFoundation
        • Session
          • Attribute
          • Flash
          • Storage
            • Handler
            • Proxy
      • Routing
        • Annotation
        • Exception
        • Generator
          • Dumper
        • Loader
          • DependencyInjection
        • Matcher
          • Dumper
        • Tests
          • Annotation
          • Fixtures
            • AnnotatedClasses
            • OtherAnnotatedClasses
          • Generator
            • Dumper
          • Loader
          • Matcher
            • Dumper

Classes

  • ExtEventLoop
  • Factory
  • LibEventLoop
  • LibEvLoop
  • StreamSelectLoop

Interfaces

  • LoopInterface
  • Overview
  • Namespace
  • Class
  • Tree
  1: <?php
  2: 
  3: namespace React\EventLoop;
  4: 
  5: use Event;
  6: use EventBase;
  7: use React\EventLoop\Tick\FutureTickQueue;
  8: use React\EventLoop\Tick\NextTickQueue;
  9: use React\EventLoop\Timer\Timer;
 10: use React\EventLoop\Timer\TimerInterface;
 11: use SplObjectStorage;
 12: 
 13: /**
 14:  * An ext-libevent based event-loop.
 15:  */
 16: class LibEventLoop implements LoopInterface
 17: {
 18:     const MICROSECONDS_PER_SECOND = 1000000;
 19: 
 20:     private $eventBase;
 21:     private $nextTickQueue;
 22:     private $futureTickQueue;
 23:     private $timerCallback;
 24:     private $timerEvents;
 25:     private $streamCallback;
 26:     private $streamEvents = [];
 27:     private $streamFlags = [];
 28:     private $readListeners = [];
 29:     private $writeListeners = [];
 30:     private $running;
 31: 
 32:     public function __construct()
 33:     {
 34:         $this->eventBase = event_base_new();
 35:         $this->nextTickQueue = new NextTickQueue($this);
 36:         $this->futureTickQueue = new FutureTickQueue($this);
 37:         $this->timerEvents = new SplObjectStorage();
 38: 
 39:         $this->createTimerCallback();
 40:         $this->createStreamCallback();
 41:     }
 42: 
 43:     /**
 44:      * {@inheritdoc}
 45:      */
 46:     public function addReadStream($stream, callable $listener)
 47:     {
 48:         $key = (int) $stream;
 49: 
 50:         if (!isset($this->readListeners[$key])) {
 51:             $this->readListeners[$key] = $listener;
 52:             $this->subscribeStreamEvent($stream, EV_READ);
 53:         }
 54:     }
 55: 
 56:     /**
 57:      * {@inheritdoc}
 58:      */
 59:     public function addWriteStream($stream, callable $listener)
 60:     {
 61:         $key = (int) $stream;
 62: 
 63:         if (!isset($this->writeListeners[$key])) {
 64:             $this->writeListeners[$key] = $listener;
 65:             $this->subscribeStreamEvent($stream, EV_WRITE);
 66:         }
 67:     }
 68: 
 69:     /**
 70:      * {@inheritdoc}
 71:      */
 72:     public function removeReadStream($stream)
 73:     {
 74:         $key = (int) $stream;
 75: 
 76:         if (isset($this->readListeners[$key])) {
 77:             unset($this->readListeners[$key]);
 78:             $this->unsubscribeStreamEvent($stream, EV_READ);
 79:         }
 80:     }
 81: 
 82:     /**
 83:      * {@inheritdoc}
 84:      */
 85:     public function removeWriteStream($stream)
 86:     {
 87:         $key = (int) $stream;
 88: 
 89:         if (isset($this->writeListeners[$key])) {
 90:             unset($this->writeListeners[$key]);
 91:             $this->unsubscribeStreamEvent($stream, EV_WRITE);
 92:         }
 93:     }
 94: 
 95:     /**
 96:      * {@inheritdoc}
 97:      */
 98:     public function removeStream($stream)
 99:     {
100:         $key = (int) $stream;
101: 
102:         if (isset($this->streamEvents[$key])) {
103:             $event = $this->streamEvents[$key];
104: 
105:             event_del($event);
106:             event_free($event);
107: 
108:             unset(
109:                 $this->streamFlags[$key],
110:                 $this->streamEvents[$key],
111:                 $this->readListeners[$key],
112:                 $this->writeListeners[$key]
113:             );
114:         }
115:     }
116: 
117:     /**
118:      * {@inheritdoc}
119:      */
120:     public function addTimer($interval, callable $callback)
121:     {
122:         $timer = new Timer($this, $interval, $callback, false);
123: 
124:         $this->scheduleTimer($timer);
125: 
126:         return $timer;
127:     }
128: 
129:     /**
130:      * {@inheritdoc}
131:      */
132:     public function addPeriodicTimer($interval, callable $callback)
133:     {
134:         $timer = new Timer($this, $interval, $callback, true);
135: 
136:         $this->scheduleTimer($timer);
137: 
138:         return $timer;
139:     }
140: 
141:     /**
142:      * {@inheritdoc}
143:      */
144:     public function cancelTimer(TimerInterface $timer)
145:     {
146:         if ($this->isTimerActive($timer)) {
147:             $event = $this->timerEvents[$timer];
148: 
149:             event_del($event);
150:             event_free($event);
151: 
152:             $this->timerEvents->detach($timer);
153:         }
154:     }
155: 
156:     /**
157:      * {@inheritdoc}
158:      */
159:     public function isTimerActive(TimerInterface $timer)
160:     {
161:         return $this->timerEvents->contains($timer);
162:     }
163: 
164:     /**
165:      * {@inheritdoc}
166:      */
167:     public function nextTick(callable $listener)
168:     {
169:         $this->nextTickQueue->add($listener);
170:     }
171: 
172:     /**
173:      * {@inheritdoc}
174:      */
175:     public function futureTick(callable $listener)
176:     {
177:         $this->futureTickQueue->add($listener);
178:     }
179: 
180:     /**
181:      * {@inheritdoc}
182:      */
183:     public function tick()
184:     {
185:         $this->nextTickQueue->tick();
186: 
187:         $this->futureTickQueue->tick();
188: 
189:         event_base_loop($this->eventBase, EVLOOP_ONCE | EVLOOP_NONBLOCK);
190:     }
191: 
192:     /**
193:      * {@inheritdoc}
194:      */
195:     public function run()
196:     {
197:         $this->running = true;
198: 
199:         while ($this->running) {
200:             $this->nextTickQueue->tick();
201: 
202:             $this->futureTickQueue->tick();
203: 
204:             $flags = EVLOOP_ONCE;
205:             if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
206:                 $flags |= EVLOOP_NONBLOCK;
207:             } elseif (!$this->streamEvents && !$this->timerEvents->count()) {
208:                 break;
209:             }
210: 
211:             event_base_loop($this->eventBase, $flags);
212:         }
213:     }
214: 
215:     /**
216:      * {@inheritdoc}
217:      */
218:     public function stop()
219:     {
220:         $this->running = false;
221:     }
222: 
223:     /**
224:      * Schedule a timer for execution.
225:      *
226:      * @param TimerInterface $timer
227:      */
228:     private function scheduleTimer(TimerInterface $timer)
229:     {
230:         $this->timerEvents[$timer] = $event = event_timer_new();
231: 
232:         event_timer_set($event, $this->timerCallback, $timer);
233:         event_base_set($event, $this->eventBase);
234:         event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND);
235:     }
236: 
237:     /**
238:      * Create a new ext-libevent event resource, or update the existing one.
239:      *
240:      * @param resource $stream
241:      * @param integer  $flag   EV_READ or EV_WRITE
242:      */
243:     private function subscribeStreamEvent($stream, $flag)
244:     {
245:         $key = (int) $stream;
246: 
247:         if (isset($this->streamEvents[$key])) {
248:             $event = $this->streamEvents[$key];
249:             $flags = $this->streamFlags[$key] |= $flag;
250: 
251:             event_del($event);
252:             event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
253:         } else {
254:             $event = event_new();
255: 
256:             event_set($event, $stream, EV_PERSIST | $flag, $this->streamCallback);
257:             event_base_set($event, $this->eventBase);
258: 
259:             $this->streamEvents[$key] = $event;
260:             $this->streamFlags[$key] = $flag;
261:         }
262: 
263:         event_add($event);
264:     }
265: 
266:     /**
267:      * Update the ext-libevent event resource for this stream to stop listening to
268:      * the given event type, or remove it entirely if it's no longer needed.
269:      *
270:      * @param resource $stream
271:      * @param integer  $flag   EV_READ or EV_WRITE
272:      */
273:     private function unsubscribeStreamEvent($stream, $flag)
274:     {
275:         $key = (int) $stream;
276: 
277:         $flags = $this->streamFlags[$key] &= ~$flag;
278: 
279:         if (0 === $flags) {
280:             $this->removeStream($stream);
281: 
282:             return;
283:         }
284: 
285:         $event = $this->streamEvents[$key];
286: 
287:         event_del($event);
288:         event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
289:         event_add($event);
290:     }
291: 
292:     /**
293:      * Create a callback used as the target of timer events.
294:      *
295:      * A reference is kept to the callback for the lifetime of the loop
296:      * to prevent "Cannot destroy active lambda function" fatal error from
297:      * the event extension.
298:      */
299:     private function createTimerCallback()
300:     {
301:         $this->timerCallback = function ($_, $__, $timer) {
302:             call_user_func($timer->getCallback(), $timer);
303: 
304:             // Timer already cancelled ...
305:             if (!$this->isTimerActive($timer)) {
306:                 return;
307: 
308:             // Reschedule periodic timers ...
309:             } elseif ($timer->isPeriodic()) {
310:                 event_add(
311:                     $this->timerEvents[$timer],
312:                     $timer->getInterval() * self::MICROSECONDS_PER_SECOND
313:                 );
314: 
315:             // Clean-up one shot timers ...
316:             } else {
317:                 $this->cancelTimer($timer);
318:             }
319:         };
320:     }
321: 
322:     /**
323:      * Create a callback used as the target of stream events.
324:      *
325:      * A reference is kept to the callback for the lifetime of the loop
326:      * to prevent "Cannot destroy active lambda function" fatal error from
327:      * the event extension.
328:      */
329:     private function createStreamCallback()
330:     {
331:         $this->streamCallback = function ($stream, $flags) {
332:             $key = (int) $stream;
333: 
334:             if (EV_READ === (EV_READ & $flags) && isset($this->readListeners[$key])) {
335:                 call_user_func($this->readListeners[$key], $stream, $this);
336:             }
337: 
338:             if (EV_WRITE === (EV_WRITE & $flags) && isset($this->writeListeners[$key])) {
339:                 call_user_func($this->writeListeners[$key], $stream, $this);
340:             }
341:         };
342:     }
343: }
344: 
Ratchet API documentation generated by ApiGen 2.8.0