1: <?php
2:
3: namespace React\Stream;
4:
5: use Evenement\EventEmitter;
6: use React\EventLoop\LoopInterface;
7:
8: class WritableResourceStream extends EventEmitter implements WritableStreamInterface
9: {
10: public $stream;
11: public $softLimit = 65536;
12:
13: private $listening = false;
14: private $writable = true;
15: private $closed = false;
16: private $loop;
17: private $data = '';
18:
19: public function __construct($stream, LoopInterface $loop)
20: {
21: if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
22: throw new \InvalidArgumentException('First parameter must be a valid stream resource');
23: }
24:
25: $meta = stream_get_meta_data($stream);
26: if (isset($meta['mode']) && str_replace(array('b', 't'), '', $meta['mode']) === 'r') {
27: throw new \InvalidArgumentException('Given stream resource is not opened in write mode');
28: }
29:
30:
31:
32: if (stream_set_blocking($stream, 0) !== true) {
33: throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
34: }
35:
36: $this->stream = $stream;
37: $this->loop = $loop;
38: }
39:
40: public function isWritable()
41: {
42: return $this->writable;
43: }
44:
45: public function write($data)
46: {
47: if (!$this->writable) {
48: return false;
49: }
50:
51: $this->data .= $data;
52:
53: if (!$this->listening && $this->data !== '') {
54: $this->listening = true;
55:
56: $this->loop->addWriteStream($this->stream, array($this, 'handleWrite'));
57: }
58:
59: return !isset($this->data[$this->softLimit - 1]);
60: }
61:
62: public function end($data = null)
63: {
64: if (null !== $data) {
65: $this->write($data);
66: }
67:
68: $this->writable = false;
69:
70:
71:
72: if ($this->data === '') {
73: $this->close();
74: }
75: }
76:
77: public function close()
78: {
79: if ($this->closed) {
80: return;
81: }
82:
83: if ($this->listening) {
84: $this->listening = false;
85: $this->loop->removeWriteStream($this->stream);
86: }
87:
88: $this->closed = true;
89: $this->writable = false;
90: $this->data = '';
91:
92: $this->emit('close', array($this));
93: $this->removeAllListeners();
94: }
95:
96:
97: public function handleWrite()
98: {
99: $error = null;
100: set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
101: $error = array(
102: 'message' => $errstr,
103: 'number' => $errno,
104: 'file' => $errfile,
105: 'line' => $errline
106: );
107: });
108:
109: $sent = fwrite($this->stream, $this->data);
110:
111: restore_error_handler();
112:
113:
114:
115:
116:
117:
118:
119:
120: if ($sent === 0 || $sent === false) {
121: if ($error === null) {
122: $error = new \RuntimeException('Send failed');
123: } else {
124: $error = new \ErrorException(
125: $error['message'],
126: 0,
127: $error['number'],
128: $error['file'],
129: $error['line']
130: );
131: }
132:
133: $this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . $error->getMessage(), 0, $error)));
134: $this->close();
135:
136: return;
137: }
138:
139: $exceeded = isset($this->data[$this->softLimit - 1]);
140: $this->data = (string) substr($this->data, $sent);
141:
142:
143: if ($exceeded && !isset($this->data[$this->softLimit - 1])) {
144: $this->emit('drain');
145: }
146:
147:
148: if ($this->data === '') {
149:
150: if ($this->listening) {
151: $this->loop->removeWriteStream($this->stream);
152: $this->listening = false;
153: }
154:
155:
156: if (!$this->writable) {
157: $this->close();
158: }
159: }
160: }
161: }
162: