1: <?php
2:
3: namespace React\Stream;
4:
5: use Evenement\EventEmitter;
6: use React\EventLoop\LoopInterface;
7: use InvalidArgumentException;
8:
9: class ReadableResourceStream extends EventEmitter implements ReadableStreamInterface
10: {
11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28:
29: public $bufferSize = 65536;
30:
31: 32: 33:
34: public $stream;
35:
36: private $closed = false;
37: private $loop;
38:
39: public function __construct($stream, LoopInterface $loop)
40: {
41: if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
42: throw new InvalidArgumentException('First parameter must be a valid stream resource');
43: }
44:
45:
46: $meta = stream_get_meta_data($stream);
47: if (isset($meta['mode']) && $meta['mode'] !== '' && strpos($meta['mode'], 'r') === strpos($meta['mode'], '+')) {
48: throw new InvalidArgumentException('Given stream resource is not opened in read mode');
49: }
50:
51:
52:
53: if (stream_set_blocking($stream, 0) !== true) {
54: throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
55: }
56:
57:
58:
59:
60:
61:
62:
63:
64:
65: if (function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
66: stream_set_read_buffer($stream, 0);
67: }
68:
69: $this->stream = $stream;
70: $this->loop = $loop;
71:
72: $this->resume();
73: }
74:
75: public function isReadable()
76: {
77: return !$this->closed;
78: }
79:
80: public function pause()
81: {
82: $this->loop->removeReadStream($this->stream);
83: }
84:
85: public function resume()
86: {
87: if (!$this->closed) {
88: $this->loop->addReadStream($this->stream, array($this, 'handleData'));
89: }
90: }
91:
92: public function pipe(WritableStreamInterface $dest, array $options = array())
93: {
94: return Util::pipe($this, $dest, $options);
95: }
96:
97: public function close()
98: {
99: if ($this->closed) {
100: return;
101: }
102:
103: $this->closed = true;
104:
105: $this->emit('close');
106: $this->loop->removeStream($this->stream);
107: $this->removeAllListeners();
108:
109: $this->handleClose();
110: }
111:
112:
113: public function handleData()
114: {
115: $error = null;
116: set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
117: $error = new \ErrorException(
118: $errstr,
119: 0,
120: $errno,
121: $errfile,
122: $errline
123: );
124: });
125:
126: $data = stream_get_contents($this->stream, $this->bufferSize === null ? -1 : $this->bufferSize);
127:
128: restore_error_handler();
129:
130: if ($error !== null) {
131: $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
132: $this->close();
133: return;
134: }
135:
136: if ($data !== '') {
137: $this->emit('data', array($data));
138: } else{
139:
140: $this->emit('end');
141: $this->close();
142: }
143: }
144:
145:
146: public function handleClose()
147: {
148: if (is_resource($this->stream)) {
149: fclose($this->stream);
150: }
151: }
152:
153: 154: 155: 156: 157: 158: 159: 160:
161: private function isLegacyPipe($resource)
162: {
163: if (PHP_VERSION_ID < 50400) {
164: $meta = stream_get_meta_data($resource);
165:
166: if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
167: return true;
168: }
169: }
170: return false;
171: }
172: }
173: