1: <?php
2:
3: namespace React\Stream;
4:
5: use Evenement\EventEmitter;
6: use React\EventLoop\LoopInterface;
7: use InvalidArgumentException;
8:
9: class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
10: {
11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26:
27: public $bufferSize = 65536;
28:
29: public $stream;
30: protected $readable = true;
31: protected $writable = true;
32: protected $closing = false;
33: protected $loop;
34: protected $buffer;
35:
36: public function __construct($stream, LoopInterface $loop, WritableStreamInterface $buffer = null)
37: {
38: if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
39: throw new InvalidArgumentException('First parameter must be a valid stream resource');
40: }
41:
42:
43: $meta = stream_get_meta_data($stream);
44: if (isset($meta['mode']) && $meta['mode'] !== '' && strpos($meta['mode'], '+') === false) {
45: throw new InvalidArgumentException('Given stream resource is not opened in read and write mode');
46: }
47:
48:
49:
50: if (stream_set_blocking($stream, 0) !== true) {
51: throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
52: }
53:
54:
55:
56:
57:
58:
59:
60:
61:
62: if (function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
63: stream_set_read_buffer($stream, 0);
64: }
65:
66: if ($buffer === null) {
67: $buffer = new WritableResourceStream($stream, $loop);
68: }
69:
70: $this->stream = $stream;
71: $this->loop = $loop;
72: $this->buffer = $buffer;
73:
74: $that = $this;
75:
76: $this->buffer->on('error', function ($error) use ($that) {
77: $that->emit('error', array($error));
78: });
79:
80: $this->buffer->on('close', array($this, 'close'));
81:
82: $this->buffer->on('drain', function () use ($that) {
83: $that->emit('drain');
84: });
85:
86: $this->resume();
87: }
88:
89: public function isReadable()
90: {
91: return $this->readable;
92: }
93:
94: public function isWritable()
95: {
96: return $this->writable;
97: }
98:
99: public function pause()
100: {
101: $this->loop->removeReadStream($this->stream);
102: }
103:
104: public function resume()
105: {
106: if ($this->readable) {
107: $this->loop->addReadStream($this->stream, array($this, 'handleData'));
108: }
109: }
110:
111: public function write($data)
112: {
113: if (!$this->writable) {
114: return false;
115: }
116:
117: return $this->buffer->write($data);
118: }
119:
120: public function close()
121: {
122: if (!$this->writable && !$this->closing) {
123: return;
124: }
125:
126: $this->closing = false;
127:
128: $this->readable = false;
129: $this->writable = false;
130:
131: $this->emit('close');
132: $this->loop->removeStream($this->stream);
133: $this->buffer->close();
134: $this->removeAllListeners();
135:
136: $this->handleClose();
137: }
138:
139: public function end($data = null)
140: {
141: if (!$this->writable) {
142: return;
143: }
144:
145: $this->closing = true;
146:
147: $this->readable = false;
148: $this->writable = false;
149: $this->pause();
150:
151: $this->buffer->end($data);
152: }
153:
154: public function pipe(WritableStreamInterface $dest, array $options = array())
155: {
156: return Util::pipe($this, $dest, $options);
157: }
158:
159: public function handleData($stream)
160: {
161: $error = null;
162: set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
163: $error = new \ErrorException(
164: $errstr,
165: 0,
166: $errno,
167: $errfile,
168: $errline
169: );
170: });
171:
172: $data = stream_get_contents($stream, $this->bufferSize === null ? -1 : $this->bufferSize);
173:
174: restore_error_handler();
175:
176: if ($error !== null) {
177: $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
178: $this->close();
179: return;
180: }
181:
182: if ($data !== '') {
183: $this->emit('data', array($data));
184: } else{
185:
186: $this->emit('end');
187: $this->close();
188: }
189: }
190:
191: public function handleClose()
192: {
193: if (is_resource($this->stream)) {
194: fclose($this->stream);
195: }
196: }
197:
198: 199: 200:
201: public function getBuffer()
202: {
203: return $this->buffer;
204: }
205:
206: 207: 208: 209: 210: 211: 212: 213:
214: private function isLegacyPipe($resource)
215: {
216: if (PHP_VERSION_ID < 50400) {
217: $meta = stream_get_meta_data($resource);
218:
219: if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
220: return true;
221: }
222: }
223: return false;
224: }
225: }
226: