Overview

Namespaces

  • Evenement
  • None
  • PHP
  • Psr
    • Http
      • Message
  • Ratchet
    • Http
    • RFC6455
      • Handshake
      • Messaging
    • Server
    • Session
      • Serialize
      • Storage
        • Proxy
    • Wamp
    • WebSocket
  • React
    • EventLoop
      • Tick
      • Timer
    • Socket
    • Stream
  • Symfony
    • Component
      • HttpFoundation
        • Session
          • Attribute
          • Flash
          • Storage
            • Handler
            • Proxy
      • Routing
        • Annotation
        • Exception
        • Generator
          • Dumper
        • Loader
          • DependencyInjection
        • Matcher
          • Dumper
        • Tests
          • Annotation
          • Fixtures
            • AnnotatedClasses
            • OtherAnnotatedClasses
          • Generator
            • Dumper
          • Loader
          • Matcher
            • Dumper

Classes

  • BufferedSink
  • CompositeStream
  • DuplexResourceStream
  • ReadableResourceStream
  • ReadableStream
  • ThroughStream
  • Util
  • WritableResourceStream
  • WritableStream

Interfaces

  • DuplexStreamInterface
  • ReadableStreamInterface
  • WritableStreamInterface
  • Overview
  • Namespace
  • Class
  • Tree
  1: <?php
  2: 
  3: namespace React\Stream;
  4: 
  5: use Evenement\EventEmitter;
  6: use React\EventLoop\LoopInterface;
  7: use InvalidArgumentException;
  8: 
  9: class ReadableResourceStream extends EventEmitter implements ReadableStreamInterface
 10: {
 11:     /**
 12:      * Controls the maximum buffer size in bytes to read at once from the stream.
 13:      *
 14:      * This value SHOULD NOT be changed unless you know what you're doing.
 15:      *
 16:      * This can be a positive number which means that up to X bytes will be read
 17:      * at once from the underlying stream resource. Note that the actual number
 18:      * of bytes read may be lower if the stream resource has less than X bytes
 19:      * currently available.
 20:      *
 21:      * This can be `null` which means read everything available from the
 22:      * underlying stream resource.
 23:      * This should read until the stream resource is not readable anymore
 24:      * (i.e. underlying buffer drained), note that this does not neccessarily
 25:      * mean it reached EOF.
 26:      *
 27:      * @var int|null
 28:      */
 29:     public $bufferSize = 65536;
 30: 
 31:     /**
 32:      * @var resource
 33:      */
 34:     public $stream;
 35: 
 36:     private $closed = false;
 37:     private $loop;
 38: 
 39:     public function __construct($stream, LoopInterface $loop)
 40:     {
 41:         if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
 42:              throw new InvalidArgumentException('First parameter must be a valid stream resource');
 43:         }
 44: 
 45:         // ensure resource is opened for reading (fopen mode must contain "r" or "+")
 46:         $meta = stream_get_meta_data($stream);
 47:         if (isset($meta['mode']) && $meta['mode'] !== '' && strpos($meta['mode'], 'r') === strpos($meta['mode'], '+')) {
 48:             throw new InvalidArgumentException('Given stream resource is not opened in read mode');
 49:         }
 50: 
 51:         // this class relies on non-blocking I/O in order to not interrupt the event loop
 52:         // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
 53:         if (stream_set_blocking($stream, 0) !== true) {
 54:             throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
 55:         }
 56: 
 57:         // Use unbuffered read operations on the underlying stream resource.
 58:         // Reading chunks from the stream may otherwise leave unread bytes in
 59:         // PHP's stream buffers which some event loop implementations do not
 60:         // trigger events on (edge triggered).
 61:         // This does not affect the default event loop implementation (level
 62:         // triggered), so we can ignore platforms not supporting this (HHVM).
 63:         // Pipe streams (such as STDIN) do not seem to require this and legacy
 64:         // PHP < 5.4 causes SEGFAULTs on unbuffered pipe streams, so skip this.
 65:         if (function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
 66:             stream_set_read_buffer($stream, 0);
 67:         }
 68: 
 69:         $this->stream = $stream;
 70:         $this->loop = $loop;
 71: 
 72:         $this->resume();
 73:     }
 74: 
 75:     public function isReadable()
 76:     {
 77:         return !$this->closed;
 78:     }
 79: 
 80:     public function pause()
 81:     {
 82:         $this->loop->removeReadStream($this->stream);
 83:     }
 84: 
 85:     public function resume()
 86:     {
 87:         if (!$this->closed) {
 88:             $this->loop->addReadStream($this->stream, array($this, 'handleData'));
 89:         }
 90:     }
 91: 
 92:     public function pipe(WritableStreamInterface $dest, array $options = array())
 93:     {
 94:         return Util::pipe($this, $dest, $options);
 95:     }
 96: 
 97:     public function close()
 98:     {
 99:         if ($this->closed) {
100:             return;
101:         }
102: 
103:         $this->closed = true;
104: 
105:         $this->emit('close');
106:         $this->loop->removeStream($this->stream);
107:         $this->removeAllListeners();
108: 
109:         $this->handleClose();
110:     }
111: 
112:     /** @internal */
113:     public function handleData()
114:     {
115:         $error = null;
116:         set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
117:             $error = new \ErrorException(
118:                 $errstr,
119:                 0,
120:                 $errno,
121:                 $errfile,
122:                 $errline
123:             );
124:         });
125: 
126:         $data = stream_get_contents($this->stream, $this->bufferSize === null ? -1 : $this->bufferSize);
127: 
128:         restore_error_handler();
129: 
130:         if ($error !== null) {
131:             $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
132:             $this->close();
133:             return;
134:         }
135: 
136:         if ($data !== '') {
137:             $this->emit('data', array($data));
138:         } else{
139:             // no data read => we reached the end and close the stream
140:             $this->emit('end');
141:             $this->close();
142:         }
143:     }
144: 
145:     /** @internal */
146:     public function handleClose()
147:     {
148:         if (is_resource($this->stream)) {
149:             fclose($this->stream);
150:         }
151:     }
152: 
153:     /**
154:      * Returns whether this is a pipe resource in a legacy environment
155:      *
156:      * @param resource $resource
157:      * @return bool
158:      *
159:      * @codeCoverageIgnore
160:      */
161:     private function isLegacyPipe($resource)
162:     {
163:         if (PHP_VERSION_ID < 50400) {
164:             $meta = stream_get_meta_data($resource);
165: 
166:             if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
167:                 return true;
168:             }
169:         }
170:         return false;
171:     }
172: }
173: 
Ratchet API documentation generated by ApiGen 2.8.0