Overview

Namespaces

  • Evenement
  • None
  • PHP
  • Psr
    • Http
      • Message
  • Ratchet
    • Http
    • RFC6455
      • Handshake
      • Messaging
    • Server
    • Session
      • Serialize
      • Storage
        • Proxy
    • Wamp
    • WebSocket
  • React
    • EventLoop
      • Tick
      • Timer
    • Socket
    • Stream
  • Symfony
    • Component
      • HttpFoundation
        • Session
          • Attribute
          • Flash
          • Storage
            • Handler
            • Proxy
      • Routing
        • Annotation
        • Exception
        • Generator
          • Dumper
        • Loader
          • DependencyInjection
        • Matcher
          • Dumper
        • Tests
          • Annotation
          • Fixtures
            • AnnotatedClasses
            • OtherAnnotatedClasses
          • Generator
            • Dumper
          • Loader
          • Matcher
            • Dumper

Classes

  • ExtEventLoop
  • Factory
  • LibEventLoop
  • LibEvLoop
  • StreamSelectLoop

Interfaces

  • LoopInterface
  • Overview
  • Namespace
  • Class
  • Tree
  1: <?php
  2: 
  3: namespace React\EventLoop;
  4: 
  5: use Event;
  6: use EventBase;
  7: use EventConfig as EventBaseConfig;
  8: use React\EventLoop\Tick\FutureTickQueue;
  9: use React\EventLoop\Tick\NextTickQueue;
 10: use React\EventLoop\Timer\Timer;
 11: use React\EventLoop\Timer\TimerInterface;
 12: use SplObjectStorage;
 13: 
 14: /**
 15:  * An ext-event based event-loop.
 16:  */
 17: class ExtEventLoop implements LoopInterface
 18: {
 19:     private $eventBase;
 20:     private $nextTickQueue;
 21:     private $futureTickQueue;
 22:     private $timerCallback;
 23:     private $timerEvents;
 24:     private $streamCallback;
 25:     private $streamEvents = [];
 26:     private $streamFlags = [];
 27:     private $readListeners = [];
 28:     private $writeListeners = [];
 29:     private $running;
 30: 
 31:     public function __construct(EventBaseConfig $config = null)
 32:     {
 33:         $this->eventBase = new EventBase($config);
 34:         $this->nextTickQueue = new NextTickQueue($this);
 35:         $this->futureTickQueue = new FutureTickQueue($this);
 36:         $this->timerEvents = new SplObjectStorage();
 37: 
 38:         $this->createTimerCallback();
 39:         $this->createStreamCallback();
 40:     }
 41: 
 42:     /**
 43:      * {@inheritdoc}
 44:      */
 45:     public function addReadStream($stream, callable $listener)
 46:     {
 47:         $key = (int) $stream;
 48: 
 49:         if (!isset($this->readListeners[$key])) {
 50:             $this->readListeners[$key] = $listener;
 51:             $this->subscribeStreamEvent($stream, Event::READ);
 52:         }
 53:     }
 54: 
 55:     /**
 56:      * {@inheritdoc}
 57:      */
 58:     public function addWriteStream($stream, callable $listener)
 59:     {
 60:         $key = (int) $stream;
 61: 
 62:         if (!isset($this->writeListeners[$key])) {
 63:             $this->writeListeners[$key] = $listener;
 64:             $this->subscribeStreamEvent($stream, Event::WRITE);
 65:         }
 66:     }
 67: 
 68:     /**
 69:      * {@inheritdoc}
 70:      */
 71:     public function removeReadStream($stream)
 72:     {
 73:         $key = (int) $stream;
 74: 
 75:         if (isset($this->readListeners[$key])) {
 76:             unset($this->readListeners[$key]);
 77:             $this->unsubscribeStreamEvent($stream, Event::READ);
 78:         }
 79:     }
 80: 
 81:     /**
 82:      * {@inheritdoc}
 83:      */
 84:     public function removeWriteStream($stream)
 85:     {
 86:         $key = (int) $stream;
 87: 
 88:         if (isset($this->writeListeners[$key])) {
 89:             unset($this->writeListeners[$key]);
 90:             $this->unsubscribeStreamEvent($stream, Event::WRITE);
 91:         }
 92:     }
 93: 
 94:     /**
 95:      * {@inheritdoc}
 96:      */
 97:     public function removeStream($stream)
 98:     {
 99:         $key = (int) $stream;
100: 
101:         if (isset($this->streamEvents[$key])) {
102:             $this->streamEvents[$key]->free();
103: 
104:             unset(
105:                 $this->streamFlags[$key],
106:                 $this->streamEvents[$key],
107:                 $this->readListeners[$key],
108:                 $this->writeListeners[$key]
109:             );
110:         }
111:     }
112: 
113:     /**
114:      * {@inheritdoc}
115:      */
116:     public function addTimer($interval, callable $callback)
117:     {
118:         $timer = new Timer($this, $interval, $callback, false);
119: 
120:         $this->scheduleTimer($timer);
121: 
122:         return $timer;
123:     }
124: 
125:     /**
126:      * {@inheritdoc}
127:      */
128:     public function addPeriodicTimer($interval, callable $callback)
129:     {
130:         $timer = new Timer($this, $interval, $callback, true);
131: 
132:         $this->scheduleTimer($timer);
133: 
134:         return $timer;
135:     }
136: 
137:     /**
138:      * {@inheritdoc}
139:      */
140:     public function cancelTimer(TimerInterface $timer)
141:     {
142:         if ($this->isTimerActive($timer)) {
143:             $this->timerEvents[$timer]->free();
144:             $this->timerEvents->detach($timer);
145:         }
146:     }
147: 
148:     /**
149:      * {@inheritdoc}
150:      */
151:     public function isTimerActive(TimerInterface $timer)
152:     {
153:         return $this->timerEvents->contains($timer);
154:     }
155: 
156:     /**
157:      * {@inheritdoc}
158:      */
159:     public function nextTick(callable $listener)
160:     {
161:         $this->nextTickQueue->add($listener);
162:     }
163: 
164:     /**
165:      * {@inheritdoc}
166:      */
167:     public function futureTick(callable $listener)
168:     {
169:         $this->futureTickQueue->add($listener);
170:     }
171: 
172:     /**
173:      * {@inheritdoc}
174:      */
175:     public function tick()
176:     {
177:         $this->nextTickQueue->tick();
178: 
179:         $this->futureTickQueue->tick();
180: 
181:         // @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226
182:         @$this->eventBase->loop(EventBase::LOOP_ONCE | EventBase::LOOP_NONBLOCK);
183:     }
184: 
185:     /**
186:      * {@inheritdoc}
187:      */
188:     public function run()
189:     {
190:         $this->running = true;
191: 
192:         while ($this->running) {
193:             $this->nextTickQueue->tick();
194: 
195:             $this->futureTickQueue->tick();
196: 
197:             $flags = EventBase::LOOP_ONCE;
198:             if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
199:                 $flags |= EventBase::LOOP_NONBLOCK;
200:             } elseif (!$this->streamEvents && !$this->timerEvents->count()) {
201:                 break;
202:             }
203: 
204:             $this->eventBase->loop($flags);
205:         }
206:     }
207: 
208:     /**
209:      * {@inheritdoc}
210:      */
211:     public function stop()
212:     {
213:         $this->running = false;
214:     }
215: 
216:     /**
217:      * Schedule a timer for execution.
218:      *
219:      * @param TimerInterface $timer
220:      */
221:     private function scheduleTimer(TimerInterface $timer)
222:     {
223:         $flags = Event::TIMEOUT;
224: 
225:         if ($timer->isPeriodic()) {
226:             $flags |= Event::PERSIST;
227:         }
228: 
229:         $event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
230:         $this->timerEvents[$timer] = $event;
231: 
232:         $event->add($timer->getInterval());
233:     }
234: 
235:     /**
236:      * Create a new ext-event Event object, or update the existing one.
237:      *
238:      * @param resource $stream
239:      * @param integer  $flag   Event::READ or Event::WRITE
240:      */
241:     private function subscribeStreamEvent($stream, $flag)
242:     {
243:         $key = (int) $stream;
244: 
245:         if (isset($this->streamEvents[$key])) {
246:             $event = $this->streamEvents[$key];
247:             $flags = ($this->streamFlags[$key] |= $flag);
248: 
249:             $event->del();
250:             $event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
251:         } else {
252:             $event = new Event($this->eventBase, $stream, Event::PERSIST | $flag, $this->streamCallback);
253: 
254:             $this->streamEvents[$key] = $event;
255:             $this->streamFlags[$key] = $flag;
256:         }
257: 
258:         $event->add();
259:     }
260: 
261:     /**
262:      * Update the ext-event Event object for this stream to stop listening to
263:      * the given event type, or remove it entirely if it's no longer needed.
264:      *
265:      * @param resource $stream
266:      * @param integer  $flag   Event::READ or Event::WRITE
267:      */
268:     private function unsubscribeStreamEvent($stream, $flag)
269:     {
270:         $key = (int) $stream;
271: 
272:         $flags = $this->streamFlags[$key] &= ~$flag;
273: 
274:         if (0 === $flags) {
275:             $this->removeStream($stream);
276: 
277:             return;
278:         }
279: 
280:         $event = $this->streamEvents[$key];
281: 
282:         $event->del();
283:         $event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
284:         $event->add();
285:     }
286: 
287:     /**
288:      * Create a callback used as the target of timer events.
289:      *
290:      * A reference is kept to the callback for the lifetime of the loop
291:      * to prevent "Cannot destroy active lambda function" fatal error from
292:      * the event extension.
293:      */
294:     private function createTimerCallback()
295:     {
296:         $this->timerCallback = function ($_, $__, $timer) {
297:             call_user_func($timer->getCallback(), $timer);
298: 
299:             if (!$timer->isPeriodic() && $this->isTimerActive($timer)) {
300:                 $this->cancelTimer($timer);
301:             }
302:         };
303:     }
304: 
305:     /**
306:      * Create a callback used as the target of stream events.
307:      *
308:      * A reference is kept to the callback for the lifetime of the loop
309:      * to prevent "Cannot destroy active lambda function" fatal error from
310:      * the event extension.
311:      */
312:     private function createStreamCallback()
313:     {
314:         $this->streamCallback = function ($stream, $flags) {
315:             $key = (int) $stream;
316: 
317:             if (Event::READ === (Event::READ & $flags) && isset($this->readListeners[$key])) {
318:                 call_user_func($this->readListeners[$key], $stream, $this);
319:             }
320: 
321:             if (Event::WRITE === (Event::WRITE & $flags) && isset($this->writeListeners[$key])) {
322:                 call_user_func($this->writeListeners[$key], $stream, $this);
323:             }
324:         };
325:     }
326: }
327: 
Ratchet API documentation generated by ApiGen 2.8.0