Overview

Namespaces

  • Evenement
  • None
  • PHP
  • Psr
    • Http
      • Message
  • Ratchet
    • Http
    • RFC6455
      • Handshake
      • Messaging
    • Server
    • Session
      • Serialize
      • Storage
        • Proxy
    • Wamp
    • WebSocket
  • React
    • EventLoop
      • Tick
      • Timer
    • Socket
    • Stream
  • Symfony
    • Component
      • HttpFoundation
        • Session
          • Attribute
          • Flash
          • Storage
            • Handler
            • Proxy
      • Routing
        • Annotation
        • Exception
        • Generator
          • Dumper
        • Loader
          • DependencyInjection
        • Matcher
          • Dumper
        • Tests
          • Annotation
          • Fixtures
            • AnnotatedClasses
            • OtherAnnotatedClasses
          • Generator
            • Dumper
          • Loader
          • Matcher
            • Dumper

Classes

  • BufferedSink
  • CompositeStream
  • DuplexResourceStream
  • ReadableResourceStream
  • ReadableStream
  • ThroughStream
  • Util
  • WritableResourceStream
  • WritableStream

Interfaces

  • DuplexStreamInterface
  • ReadableStreamInterface
  • WritableStreamInterface
  • Overview
  • Namespace
  • Class
  • Tree
  1: <?php
  2: 
  3: namespace React\Stream;
  4: 
  5: use Evenement\EventEmitter;
  6: use React\EventLoop\LoopInterface;
  7: 
  8: class WritableResourceStream extends EventEmitter implements WritableStreamInterface
  9: {
 10:     public $stream;
 11:     public $softLimit = 65536;
 12: 
 13:     private $listening = false;
 14:     private $writable = true;
 15:     private $closed = false;
 16:     private $loop;
 17:     private $data = '';
 18: 
 19:     public function __construct($stream, LoopInterface $loop)
 20:     {
 21:         if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
 22:             throw new \InvalidArgumentException('First parameter must be a valid stream resource');
 23:         }
 24: 
 25:         $meta = stream_get_meta_data($stream);
 26:         if (isset($meta['mode']) && str_replace(array('b', 't'), '', $meta['mode']) === 'r') {
 27:             throw new \InvalidArgumentException('Given stream resource is not opened in write mode');
 28:         }
 29: 
 30:         // this class relies on non-blocking I/O in order to not interrupt the event loop
 31:         // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
 32:         if (stream_set_blocking($stream, 0) !== true) {
 33:             throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
 34:         }
 35: 
 36:         $this->stream = $stream;
 37:         $this->loop = $loop;
 38:     }
 39: 
 40:     public function isWritable()
 41:     {
 42:         return $this->writable;
 43:     }
 44: 
 45:     public function write($data)
 46:     {
 47:         if (!$this->writable) {
 48:             return false;
 49:         }
 50: 
 51:         $this->data .= $data;
 52: 
 53:         if (!$this->listening && $this->data !== '') {
 54:             $this->listening = true;
 55: 
 56:             $this->loop->addWriteStream($this->stream, array($this, 'handleWrite'));
 57:         }
 58: 
 59:         return !isset($this->data[$this->softLimit - 1]);
 60:     }
 61: 
 62:     public function end($data = null)
 63:     {
 64:         if (null !== $data) {
 65:             $this->write($data);
 66:         }
 67: 
 68:         $this->writable = false;
 69: 
 70:         // close immediately if buffer is already empty
 71:         // otherwise wait for buffer to flush first
 72:         if ($this->data === '') {
 73:             $this->close();
 74:         }
 75:     }
 76: 
 77:     public function close()
 78:     {
 79:         if ($this->closed) {
 80:             return;
 81:         }
 82: 
 83:         if ($this->listening) {
 84:             $this->listening = false;
 85:             $this->loop->removeWriteStream($this->stream);
 86:         }
 87: 
 88:         $this->closed = true;
 89:         $this->writable = false;
 90:         $this->data = '';
 91: 
 92:         $this->emit('close', array($this));
 93:         $this->removeAllListeners();
 94:     }
 95: 
 96:     /** @internal */
 97:     public function handleWrite()
 98:     {
 99:         $error = null;
100:         set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
101:             $error = array(
102:                 'message' => $errstr,
103:                 'number' => $errno,
104:                 'file' => $errfile,
105:                 'line' => $errline
106:             );
107:         });
108: 
109:         $sent = fwrite($this->stream, $this->data);
110: 
111:         restore_error_handler();
112: 
113:         // Only report errors if *nothing* could be sent.
114:         // Any hard (permanent) error will fail to send any data at all.
115:         // Sending excessive amounts of data will only flush *some* data and then
116:         // report a temporary error (EAGAIN) which we do not raise here in order
117:         // to keep the stream open for further tries to write.
118:         // Should this turn out to be a permanent error later, it will eventually
119:         // send *nothing* and we can detect this.
120:         if ($sent === 0 || $sent === false) {
121:             if ($error === null) {
122:                 $error = new \RuntimeException('Send failed');
123:             } else {
124:                 $error = new \ErrorException(
125:                     $error['message'],
126:                     0,
127:                     $error['number'],
128:                     $error['file'],
129:                     $error['line']
130:                 );
131:             }
132: 
133:             $this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . $error->getMessage(), 0, $error)));
134:             $this->close();
135: 
136:             return;
137:         }
138: 
139:         $exceeded = isset($this->data[$this->softLimit - 1]);
140:         $this->data = (string) substr($this->data, $sent);
141: 
142:         // buffer has been above limit and is now below limit
143:         if ($exceeded && !isset($this->data[$this->softLimit - 1])) {
144:             $this->emit('drain');
145:         }
146: 
147:         // buffer is now completely empty => stop trying to write
148:         if ($this->data === '') {
149:             // stop waiting for resource to be writable
150:             if ($this->listening) {
151:                 $this->loop->removeWriteStream($this->stream);
152:                 $this->listening = false;
153:             }
154: 
155:             // buffer is end()ing and now completely empty => close buffer
156:             if (!$this->writable) {
157:                 $this->close();
158:             }
159:         }
160:     }
161: }
162: 
Ratchet API documentation generated by ApiGen 2.8.0