1: <?php
2:
3: namespace React\Stream;
4:
5: use Evenement\EventEmitter;
6:
7: class CompositeStream extends EventEmitter implements DuplexStreamInterface
8: {
9: protected $readable;
10: protected $writable;
11: protected $pipeSource;
12: protected $closed = false;
13:
14: public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
15: {
16: $this->readable = $readable;
17: $this->writable = $writable;
18:
19: Util::forwardEvents($this->readable, $this, array('data', 'end', 'error'));
20: Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe'));
21:
22: $this->readable->on('close', array($this, 'close'));
23: $this->writable->on('close', array($this, 'close'));
24:
25: $this->on('pipe', array($this, 'handlePipeEvent'));
26: }
27:
28: public function handlePipeEvent($source)
29: {
30: $this->pipeSource = $source;
31: }
32:
33: public function isReadable()
34: {
35: return $this->readable->isReadable();
36: }
37:
38: public function pause()
39: {
40: if ($this->pipeSource) {
41: $this->pipeSource->pause();
42: }
43:
44: $this->readable->pause();
45: }
46:
47: public function resume()
48: {
49: if (!$this->writable->isWritable()) {
50: return;
51: }
52:
53: if ($this->pipeSource) {
54: $this->pipeSource->resume();
55: }
56:
57: $this->readable->resume();
58: }
59:
60: public function pipe(WritableStreamInterface $dest, array $options = array())
61: {
62: return Util::pipe($this, $dest, $options);
63: }
64:
65: public function isWritable()
66: {
67: return $this->writable->isWritable();
68: }
69:
70: public function write($data)
71: {
72: return $this->writable->write($data);
73: }
74:
75: public function end($data = null)
76: {
77: $this->readable->pause();
78: $this->writable->end($data);
79: }
80:
81: public function close()
82: {
83: if ($this->closed) {
84: return;
85: }
86:
87: $this->closed = true;
88: $this->pipeSource = null;
89:
90: $this->readable->close();
91: $this->writable->close();
92:
93: $this->emit('close');
94: }
95: }
96: