1: <?php
2:
3: namespace React\Stream;
4:
5: use React\Promise\Deferred;
6: use React\Promise\PromisorInterface;
7:
8: class BufferedSink extends WritableStream implements PromisorInterface
9: {
10: private $buffer = '';
11: private $deferred;
12:
13: public function __construct()
14: {
15: $this->deferred = new Deferred();
16:
17: $this->on('pipe', array($this, 'handlePipeEvent'));
18: $this->on('error', array($this, 'handleErrorEvent'));
19: }
20:
21: public function handlePipeEvent($source)
22: {
23: Util::forwardEvents($source, $this, array('error'));
24: $source->on('close', array($this, 'close'));
25: }
26:
27: public function handleErrorEvent($e)
28: {
29: $this->deferred->reject($e);
30: }
31:
32: public function write($data)
33: {
34: if ($this->closed) {
35: return false;
36: }
37:
38: $this->buffer .= $data;
39: $this->deferred->progress($data);
40:
41: return true;
42: }
43:
44: public function close()
45: {
46: if ($this->closed) {
47: return;
48: }
49:
50: parent::close();
51: $this->deferred->resolve($this->buffer);
52: }
53:
54: public function promise()
55: {
56: return $this->deferred->promise();
57: }
58:
59: public static function createPromise(ReadableStreamInterface $stream)
60: {
61: $sink = new static();
62: $stream->pipe($sink);
63:
64: return $sink->promise();
65: }
66: }
67: