Overview

Namespaces

  • Evenement
  • None
  • PHP
  • Psr
    • Http
      • Message
  • Ratchet
    • Http
    • RFC6455
      • Handshake
      • Messaging
    • Server
    • Session
      • Serialize
      • Storage
        • Proxy
    • Wamp
    • WebSocket
  • React
    • EventLoop
      • Tick
      • Timer
    • Socket
    • Stream
  • Symfony
    • Component
      • HttpFoundation
        • Session
          • Attribute
          • Flash
          • Storage
            • Handler
            • Proxy
      • Routing
        • Annotation
        • Exception
        • Generator
          • Dumper
        • Loader
          • DependencyInjection
        • Matcher
          • Dumper
        • Tests
          • Annotation
          • Fixtures
            • AnnotatedClasses
            • OtherAnnotatedClasses
          • Generator
            • Dumper
          • Loader
          • Matcher
            • Dumper

Classes

  • BufferedSink
  • CompositeStream
  • DuplexResourceStream
  • ReadableResourceStream
  • ReadableStream
  • ThroughStream
  • Util
  • WritableResourceStream
  • WritableStream

Interfaces

  • DuplexStreamInterface
  • ReadableStreamInterface
  • WritableStreamInterface
  • Overview
  • Namespace
  • Class
  • Tree
 1: <?php
 2: 
 3: namespace React\Stream;
 4: 
 5: use Evenement\EventEmitter;
 6: 
 7: class CompositeStream extends EventEmitter implements DuplexStreamInterface
 8: {
 9:     protected $readable;
10:     protected $writable;
11:     protected $pipeSource;
12:     protected $closed = false;
13: 
14:     public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
15:     {
16:         $this->readable = $readable;
17:         $this->writable = $writable;
18: 
19:         Util::forwardEvents($this->readable, $this, array('data', 'end', 'error'));
20:         Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe'));
21: 
22:         $this->readable->on('close', array($this, 'close'));
23:         $this->writable->on('close', array($this, 'close'));
24: 
25:         $this->on('pipe', array($this, 'handlePipeEvent'));
26:     }
27: 
28:     public function handlePipeEvent($source)
29:     {
30:         $this->pipeSource = $source;
31:     }
32: 
33:     public function isReadable()
34:     {
35:         return $this->readable->isReadable();
36:     }
37: 
38:     public function pause()
39:     {
40:         if ($this->pipeSource) {
41:             $this->pipeSource->pause();
42:         }
43: 
44:         $this->readable->pause();
45:     }
46: 
47:     public function resume()
48:     {
49:         if (!$this->writable->isWritable()) {
50:             return;
51:         }
52: 
53:         if ($this->pipeSource) {
54:             $this->pipeSource->resume();
55:         }
56: 
57:         $this->readable->resume();
58:     }
59: 
60:     public function pipe(WritableStreamInterface $dest, array $options = array())
61:     {
62:         return Util::pipe($this, $dest, $options);
63:     }
64: 
65:     public function isWritable()
66:     {
67:         return $this->writable->isWritable();
68:     }
69: 
70:     public function write($data)
71:     {
72:         return $this->writable->write($data);
73:     }
74: 
75:     public function end($data = null)
76:     {
77:         $this->readable->pause();
78:         $this->writable->end($data);
79:     }
80: 
81:     public function close()
82:     {
83:         if ($this->closed) {
84:             return;
85:         }
86: 
87:         $this->closed = true;
88:         $this->pipeSource = null;
89: 
90:         $this->readable->close();
91:         $this->writable->close();
92: 
93:         $this->emit('close');
94:     }
95: }
96: 
Ratchet API documentation generated by ApiGen 2.8.0