1: <?php
2:
3: namespace React\Stream;
4:
5: class ThroughStream extends CompositeStream
6: {
7: private $paused = false;
8:
9: public function __construct()
10: {
11: $readable = new ReadableStream();
12: $writable = new WritableStream();
13:
14: parent::__construct($readable, $writable);
15: }
16:
17: public function filter($data)
18: {
19: return $data;
20: }
21:
22: public function pause()
23: {
24: parent::pause();
25: $this->paused = true;
26: }
27:
28: public function resume()
29: {
30: parent::resume();
31: $this->paused = false;
32: }
33:
34: public function write($data)
35: {
36: if (!$this->writable->isWritable()) {
37: return false;
38: }
39:
40: $this->readable->emit('data', array($this->filter($data)));
41:
42: return $this->writable->isWritable() && !$this->paused;
43: }
44:
45: public function end($data = null)
46: {
47: if (!$this->writable->isWritable()) {
48: return;
49: }
50:
51: if (null !== $data) {
52: $this->readable->emit('data', array($this->filter($data)));
53: }
54:
55: $this->readable->emit('end');
56:
57: $this->writable->end();
58: }
59: }
60: