Overview

Namespaces

  • Evenement
  • None
  • PHP
  • Psr
    • Http
      • Message
  • Ratchet
    • Http
    • RFC6455
      • Handshake
      • Messaging
    • Server
    • Session
      • Serialize
      • Storage
        • Proxy
    • Wamp
    • WebSocket
  • React
    • EventLoop
      • Tick
      • Timer
    • Socket
    • Stream
  • Symfony
    • Component
      • HttpFoundation
        • Session
          • Attribute
          • Flash
          • Storage
            • Handler
            • Proxy
      • Routing
        • Annotation
        • Exception
        • Generator
          • Dumper
        • Loader
          • DependencyInjection
        • Matcher
          • Dumper
        • Tests
          • Annotation
          • Fixtures
            • AnnotatedClasses
            • OtherAnnotatedClasses
          • Generator
            • Dumper
          • Loader
          • Matcher
            • Dumper

Classes

  • BufferedSink
  • CompositeStream
  • DuplexResourceStream
  • ReadableResourceStream
  • ReadableStream
  • ThroughStream
  • Util
  • WritableResourceStream
  • WritableStream

Interfaces

  • DuplexStreamInterface
  • ReadableStreamInterface
  • WritableStreamInterface
  • Overview
  • Namespace
  • Class
  • Tree
  1: <?php
  2: 
  3: namespace React\Stream;
  4: 
  5: use Evenement\EventEmitter;
  6: use React\EventLoop\LoopInterface;
  7: use InvalidArgumentException;
  8: 
  9: class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
 10: {
 11:     /**
 12:      * Controls the maximum buffer size in bytes to read at once from the stream.
 13:      *
 14:      * This can be a positive number which means that up to X bytes will be read
 15:      * at once from the underlying stream resource. Note that the actual number
 16:      * of bytes read may be lower if the stream resource has less than X bytes
 17:      * currently available.
 18:      *
 19:      * This can be `null` which means read everything available from the
 20:      * underlying stream resource.
 21:      * This should read until the stream resource is not readable anymore
 22:      * (i.e. underlying buffer drained), note that this does not neccessarily
 23:      * mean it reached EOF.
 24:      *
 25:      * @var int|null
 26:      */
 27:     public $bufferSize = 65536;
 28: 
 29:     public $stream;
 30:     protected $readable = true;
 31:     protected $writable = true;
 32:     protected $closing = false;
 33:     protected $loop;
 34:     protected $buffer;
 35: 
 36:     public function __construct($stream, LoopInterface $loop, WritableStreamInterface $buffer = null)
 37:     {
 38:         if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
 39:              throw new InvalidArgumentException('First parameter must be a valid stream resource');
 40:         }
 41: 
 42:         // ensure resource is opened for reading and wrting (fopen mode must contain "+")
 43:         $meta = stream_get_meta_data($stream);
 44:         if (isset($meta['mode']) && $meta['mode'] !== '' && strpos($meta['mode'], '+') === false) {
 45:             throw new InvalidArgumentException('Given stream resource is not opened in read and write mode');
 46:         }
 47: 
 48:         // this class relies on non-blocking I/O in order to not interrupt the event loop
 49:         // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
 50:         if (stream_set_blocking($stream, 0) !== true) {
 51:             throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
 52:         }
 53: 
 54:         // Use unbuffered read operations on the underlying stream resource.
 55:         // Reading chunks from the stream may otherwise leave unread bytes in
 56:         // PHP's stream buffers which some event loop implementations do not
 57:         // trigger events on (edge triggered).
 58:         // This does not affect the default event loop implementation (level
 59:         // triggered), so we can ignore platforms not supporting this (HHVM).
 60:         // Pipe streams (such as STDIN) do not seem to require this and legacy
 61:         // PHP < 5.4 causes SEGFAULTs on unbuffered pipe streams, so skip this.
 62:         if (function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
 63:             stream_set_read_buffer($stream, 0);
 64:         }
 65: 
 66:         if ($buffer === null) {
 67:             $buffer = new WritableResourceStream($stream, $loop);
 68:         }
 69: 
 70:         $this->stream = $stream;
 71:         $this->loop = $loop;
 72:         $this->buffer = $buffer;
 73: 
 74:         $that = $this;
 75: 
 76:         $this->buffer->on('error', function ($error) use ($that) {
 77:             $that->emit('error', array($error));
 78:         });
 79: 
 80:         $this->buffer->on('close', array($this, 'close'));
 81: 
 82:         $this->buffer->on('drain', function () use ($that) {
 83:             $that->emit('drain');
 84:         });
 85: 
 86:         $this->resume();
 87:     }
 88: 
 89:     public function isReadable()
 90:     {
 91:         return $this->readable;
 92:     }
 93: 
 94:     public function isWritable()
 95:     {
 96:         return $this->writable;
 97:     }
 98: 
 99:     public function pause()
100:     {
101:         $this->loop->removeReadStream($this->stream);
102:     }
103: 
104:     public function resume()
105:     {
106:         if ($this->readable) {
107:             $this->loop->addReadStream($this->stream, array($this, 'handleData'));
108:         }
109:     }
110: 
111:     public function write($data)
112:     {
113:         if (!$this->writable) {
114:             return false;
115:         }
116: 
117:         return $this->buffer->write($data);
118:     }
119: 
120:     public function close()
121:     {
122:         if (!$this->writable && !$this->closing) {
123:             return;
124:         }
125: 
126:         $this->closing = false;
127: 
128:         $this->readable = false;
129:         $this->writable = false;
130: 
131:         $this->emit('close');
132:         $this->loop->removeStream($this->stream);
133:         $this->buffer->close();
134:         $this->removeAllListeners();
135: 
136:         $this->handleClose();
137:     }
138: 
139:     public function end($data = null)
140:     {
141:         if (!$this->writable) {
142:             return;
143:         }
144: 
145:         $this->closing = true;
146: 
147:         $this->readable = false;
148:         $this->writable = false;
149:         $this->pause();
150: 
151:         $this->buffer->end($data);
152:     }
153: 
154:     public function pipe(WritableStreamInterface $dest, array $options = array())
155:     {
156:         return Util::pipe($this, $dest, $options);
157:     }
158: 
159:     public function handleData($stream)
160:     {
161:         $error = null;
162:         set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
163:             $error = new \ErrorException(
164:                 $errstr,
165:                 0,
166:                 $errno,
167:                 $errfile,
168:                 $errline
169:             );
170:         });
171: 
172:         $data = stream_get_contents($stream, $this->bufferSize === null ? -1 : $this->bufferSize);
173: 
174:         restore_error_handler();
175: 
176:         if ($error !== null) {
177:             $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
178:             $this->close();
179:             return;
180:         }
181: 
182:         if ($data !== '') {
183:             $this->emit('data', array($data));
184:         } else{
185:             // no data read => we reached the end and close the stream
186:             $this->emit('end');
187:             $this->close();
188:         }
189:     }
190: 
191:     public function handleClose()
192:     {
193:         if (is_resource($this->stream)) {
194:             fclose($this->stream);
195:         }
196:     }
197: 
198:     /**
199:      * @return WritableStreamInterface
200:      */
201:     public function getBuffer()
202:     {
203:         return $this->buffer;
204:     }
205: 
206:     /**
207:      * Returns whether this is a pipe resource in a legacy environment
208:      *
209:      * @param resource $resource
210:      * @return bool
211:      *
212:      * @codeCoverageIgnore
213:      */
214:     private function isLegacyPipe($resource)
215:     {
216:         if (PHP_VERSION_ID < 50400) {
217:             $meta = stream_get_meta_data($resource);
218: 
219:             if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
220:                 return true;
221:             }
222:         }
223:         return false;
224:     }
225: }
226: 
Ratchet API documentation generated by ApiGen 2.8.0