Overview

Namespaces

  • Evenement
  • None
  • PHP
  • Psr
    • Http
      • Message
  • Ratchet
    • Http
    • RFC6455
      • Handshake
      • Messaging
    • Server
    • Session
      • Serialize
      • Storage
        • Proxy
    • Wamp
    • WebSocket
  • React
    • EventLoop
      • Tick
      • Timer
    • Socket
    • Stream
  • Symfony
    • Component
      • HttpFoundation
        • Session
          • Attribute
          • Flash
          • Storage
            • Handler
            • Proxy
      • Routing
        • Annotation
        • Exception
        • Generator
          • Dumper
        • Loader
          • DependencyInjection
        • Matcher
          • Dumper
        • Tests
          • Annotation
          • Fixtures
            • AnnotatedClasses
            • OtherAnnotatedClasses
          • Generator
            • Dumper
          • Loader
          • Matcher
            • Dumper

Classes

  • ExtEventLoop
  • Factory
  • LibEventLoop
  • LibEvLoop
  • StreamSelectLoop

Interfaces

  • LoopInterface
  • Overview
  • Namespace
  • Class
  • Tree
  1: <?php
  2: 
  3: namespace React\EventLoop;
  4: 
  5: use React\EventLoop\Tick\FutureTickQueue;
  6: use React\EventLoop\Tick\NextTickQueue;
  7: use React\EventLoop\Timer\Timer;
  8: use React\EventLoop\Timer\TimerInterface;
  9: use React\EventLoop\Timer\Timers;
 10: 
 11: /**
 12:  * A stream_select() based event-loop.
 13:  */
 14: class StreamSelectLoop implements LoopInterface
 15: {
 16:     const MICROSECONDS_PER_SECOND = 1000000;
 17: 
 18:     private $nextTickQueue;
 19:     private $futureTickQueue;
 20:     private $timers;
 21:     private $readStreams = [];
 22:     private $readListeners = [];
 23:     private $writeStreams = [];
 24:     private $writeListeners = [];
 25:     private $running;
 26: 
 27:     public function __construct()
 28:     {
 29:         $this->nextTickQueue = new NextTickQueue($this);
 30:         $this->futureTickQueue = new FutureTickQueue($this);
 31:         $this->timers = new Timers();
 32:     }
 33: 
 34:     /**
 35:      * {@inheritdoc}
 36:      */
 37:     public function addReadStream($stream, callable $listener)
 38:     {
 39:         $key = (int) $stream;
 40: 
 41:         if (!isset($this->readStreams[$key])) {
 42:             $this->readStreams[$key] = $stream;
 43:             $this->readListeners[$key] = $listener;
 44:         }
 45:     }
 46: 
 47:     /**
 48:      * {@inheritdoc}
 49:      */
 50:     public function addWriteStream($stream, callable $listener)
 51:     {
 52:         $key = (int) $stream;
 53: 
 54:         if (!isset($this->writeStreams[$key])) {
 55:             $this->writeStreams[$key] = $stream;
 56:             $this->writeListeners[$key] = $listener;
 57:         }
 58:     }
 59: 
 60:     /**
 61:      * {@inheritdoc}
 62:      */
 63:     public function removeReadStream($stream)
 64:     {
 65:         $key = (int) $stream;
 66: 
 67:         unset(
 68:             $this->readStreams[$key],
 69:             $this->readListeners[$key]
 70:         );
 71:     }
 72: 
 73:     /**
 74:      * {@inheritdoc}
 75:      */
 76:     public function removeWriteStream($stream)
 77:     {
 78:         $key = (int) $stream;
 79: 
 80:         unset(
 81:             $this->writeStreams[$key],
 82:             $this->writeListeners[$key]
 83:         );
 84:     }
 85: 
 86:     /**
 87:      * {@inheritdoc}
 88:      */
 89:     public function removeStream($stream)
 90:     {
 91:         $this->removeReadStream($stream);
 92:         $this->removeWriteStream($stream);
 93:     }
 94: 
 95:     /**
 96:      * {@inheritdoc}
 97:      */
 98:     public function addTimer($interval, callable $callback)
 99:     {
100:         $timer = new Timer($this, $interval, $callback, false);
101: 
102:         $this->timers->add($timer);
103: 
104:         return $timer;
105:     }
106: 
107:     /**
108:      * {@inheritdoc}
109:      */
110:     public function addPeriodicTimer($interval, callable $callback)
111:     {
112:         $timer = new Timer($this, $interval, $callback, true);
113: 
114:         $this->timers->add($timer);
115: 
116:         return $timer;
117:     }
118: 
119:     /**
120:      * {@inheritdoc}
121:      */
122:     public function cancelTimer(TimerInterface $timer)
123:     {
124:         $this->timers->cancel($timer);
125:     }
126: 
127:     /**
128:      * {@inheritdoc}
129:      */
130:     public function isTimerActive(TimerInterface $timer)
131:     {
132:         return $this->timers->contains($timer);
133:     }
134: 
135:     /**
136:      * {@inheritdoc}
137:      */
138:     public function nextTick(callable $listener)
139:     {
140:         $this->nextTickQueue->add($listener);
141:     }
142: 
143:     /**
144:      * {@inheritdoc}
145:      */
146:     public function futureTick(callable $listener)
147:     {
148:         $this->futureTickQueue->add($listener);
149:     }
150: 
151:     /**
152:      * {@inheritdoc}
153:      */
154:     public function tick()
155:     {
156:         $this->nextTickQueue->tick();
157: 
158:         $this->futureTickQueue->tick();
159: 
160:         $this->timers->tick();
161: 
162:         $this->waitForStreamActivity(0);
163:     }
164: 
165:     /**
166:      * {@inheritdoc}
167:      */
168:     public function run()
169:     {
170:         $this->running = true;
171: 
172:         while ($this->running) {
173:             $this->nextTickQueue->tick();
174: 
175:             $this->futureTickQueue->tick();
176: 
177:             $this->timers->tick();
178: 
179:             // Next-tick or future-tick queues have pending callbacks ...
180:             if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
181:                 $timeout = 0;
182: 
183:             // There is a pending timer, only block until it is due ...
184:             } elseif ($scheduledAt = $this->timers->getFirst()) {
185:                 $timeout = $scheduledAt - $this->timers->getTime();
186:                 if ($timeout < 0) {
187:                     $timeout = 0;
188:                 } else {
189:                     /*
190:                      * round() needed to correct float error:
191:                      * https://github.com/reactphp/event-loop/issues/48
192:                      */
193:                     $timeout = round($timeout * self::MICROSECONDS_PER_SECOND);
194:                 }
195: 
196:             // The only possible event is stream activity, so wait forever ...
197:             } elseif ($this->readStreams || $this->writeStreams) {
198:                 $timeout = null;
199: 
200:             // There's nothing left to do ...
201:             } else {
202:                 break;
203:             }
204: 
205:             $this->waitForStreamActivity($timeout);
206:         }
207:     }
208: 
209:     /**
210:      * {@inheritdoc}
211:      */
212:     public function stop()
213:     {
214:         $this->running = false;
215:     }
216: 
217:     /**
218:      * Wait/check for stream activity, or until the next timer is due.
219:      */
220:     private function waitForStreamActivity($timeout)
221:     {
222:         $read  = $this->readStreams;
223:         $write = $this->writeStreams;
224: 
225:         $available = $this->streamSelect($read, $write, $timeout);
226:         if (false === $available) {
227:             // if a system call has been interrupted,
228:             // we cannot rely on it's outcome
229:             return;
230:         }
231: 
232:         foreach ($read as $stream) {
233:             $key = (int) $stream;
234: 
235:             if (isset($this->readListeners[$key])) {
236:                 call_user_func($this->readListeners[$key], $stream, $this);
237:             }
238:         }
239: 
240:         foreach ($write as $stream) {
241:             $key = (int) $stream;
242: 
243:             if (isset($this->writeListeners[$key])) {
244:                 call_user_func($this->writeListeners[$key], $stream, $this);
245:             }
246:         }
247:     }
248: 
249:     /**
250:      * Emulate a stream_select() implementation that does not break when passed
251:      * empty stream arrays.
252:      *
253:      * @param array        &$read   An array of read streams to select upon.
254:      * @param array        &$write  An array of write streams to select upon.
255:      * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
256:      *
257:      * @return integer|false The total number of streams that are ready for read/write.
258:      * Can return false if stream_select() is interrupted by a signal.
259:      */
260:     protected function streamSelect(array &$read, array &$write, $timeout)
261:     {
262:         if ($read || $write) {
263:             $except = null;
264: 
265:             // suppress warnings that occur, when stream_select is interrupted by a signal
266:             return @stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
267:         }
268: 
269:         $timeout && usleep($timeout);
270: 
271:         return 0;
272:     }
273: }
274: 
Ratchet API documentation generated by ApiGen 2.8.0