1: <?php
2:
3: namespace React\EventLoop;
4:
5: use libev\EventLoop;
6: use libev\IOEvent;
7: use libev\TimerEvent;
8: use React\EventLoop\Tick\FutureTickQueue;
9: use React\EventLoop\Tick\NextTickQueue;
10: use React\EventLoop\Timer\Timer;
11: use React\EventLoop\Timer\TimerInterface;
12: use SplObjectStorage;
13:
14: 15: 16: 17:
18: class LibEvLoop implements LoopInterface
19: {
20: private $loop;
21: private $nextTickQueue;
22: private $futureTickQueue;
23: private $timerEvents;
24: private $readEvents = [];
25: private $writeEvents = [];
26: private $running;
27:
28: public function __construct()
29: {
30: $this->loop = new EventLoop();
31: $this->nextTickQueue = new NextTickQueue($this);
32: $this->futureTickQueue = new FutureTickQueue($this);
33: $this->timerEvents = new SplObjectStorage();
34: }
35:
36: 37: 38:
39: public function addReadStream($stream, callable $listener)
40: {
41: if (isset($this->readEvents[(int) $stream])) {
42: return;
43: }
44:
45: $callback = function () use ($stream, $listener) {
46: call_user_func($listener, $stream, $this);
47: };
48:
49: $event = new IOEvent($callback, $stream, IOEvent::READ);
50: $this->loop->add($event);
51:
52: $this->readEvents[(int) $stream] = $event;
53: }
54:
55: 56: 57:
58: public function addWriteStream($stream, callable $listener)
59: {
60: if (isset($this->writeEvents[(int) $stream])) {
61: return;
62: }
63:
64: $callback = function () use ($stream, $listener) {
65: call_user_func($listener, $stream, $this);
66: };
67:
68: $event = new IOEvent($callback, $stream, IOEvent::WRITE);
69: $this->loop->add($event);
70:
71: $this->writeEvents[(int) $stream] = $event;
72: }
73:
74: 75: 76:
77: public function removeReadStream($stream)
78: {
79: $key = (int) $stream;
80:
81: if (isset($this->readEvents[$key])) {
82: $this->readEvents[$key]->stop();
83: unset($this->readEvents[$key]);
84: }
85: }
86:
87: 88: 89:
90: public function removeWriteStream($stream)
91: {
92: $key = (int) $stream;
93:
94: if (isset($this->writeEvents[$key])) {
95: $this->writeEvents[$key]->stop();
96: unset($this->writeEvents[$key]);
97: }
98: }
99:
100: 101: 102:
103: public function removeStream($stream)
104: {
105: $this->removeReadStream($stream);
106: $this->removeWriteStream($stream);
107: }
108:
109: 110: 111:
112: public function addTimer($interval, callable $callback)
113: {
114: $timer = new Timer($this, $interval, $callback, false);
115:
116: $callback = function () use ($timer) {
117: call_user_func($timer->getCallback(), $timer);
118:
119: if ($this->isTimerActive($timer)) {
120: $this->cancelTimer($timer);
121: }
122: };
123:
124: $event = new TimerEvent($callback, $timer->getInterval());
125: $this->timerEvents->attach($timer, $event);
126: $this->loop->add($event);
127:
128: return $timer;
129: }
130:
131: 132: 133:
134: public function addPeriodicTimer($interval, callable $callback)
135: {
136: $timer = new Timer($this, $interval, $callback, true);
137:
138: $callback = function () use ($timer) {
139: call_user_func($timer->getCallback(), $timer);
140: };
141:
142: $event = new TimerEvent($callback, $interval, $interval);
143: $this->timerEvents->attach($timer, $event);
144: $this->loop->add($event);
145:
146: return $timer;
147: }
148:
149: 150: 151:
152: public function cancelTimer(TimerInterface $timer)
153: {
154: if (isset($this->timerEvents[$timer])) {
155: $this->loop->remove($this->timerEvents[$timer]);
156: $this->timerEvents->detach($timer);
157: }
158: }
159:
160: 161: 162:
163: public function isTimerActive(TimerInterface $timer)
164: {
165: return $this->timerEvents->contains($timer);
166: }
167:
168: 169: 170:
171: public function nextTick(callable $listener)
172: {
173: $this->nextTickQueue->add($listener);
174: }
175:
176: 177: 178:
179: public function futureTick(callable $listener)
180: {
181: $this->futureTickQueue->add($listener);
182: }
183:
184: 185: 186:
187: public function tick()
188: {
189: $this->nextTickQueue->tick();
190:
191: $this->futureTickQueue->tick();
192:
193: $this->loop->run(EventLoop::RUN_ONCE | EventLoop::RUN_NOWAIT);
194: }
195:
196: 197: 198:
199: public function run()
200: {
201: $this->running = true;
202:
203: while ($this->running) {
204: $this->nextTickQueue->tick();
205:
206: $this->futureTickQueue->tick();
207:
208: $flags = EventLoop::RUN_ONCE;
209: if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
210: $flags |= EventLoop::RUN_NOWAIT;
211: } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count()) {
212: break;
213: }
214:
215: $this->loop->run($flags);
216: }
217: }
218:
219: 220: 221:
222: public function stop()
223: {
224: $this->running = false;
225: }
226: }
227: