1: <?php
2:
3: namespace React\Stream;
4:
5: class Util
6: {
7: 8: 9: 10: 11: 12: 13: 14: 15:
16: public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
17: {
18:
19: if (!$source->isReadable()) {
20: return $dest;
21: }
22:
23:
24: if (!$dest->isWritable()) {
25: $source->pause();
26:
27: return $dest;
28: }
29:
30: $dest->emit('pipe', array($source));
31:
32:
33: $source->on('data', $dataer = function ($data) use ($source, $dest) {
34: $feedMore = $dest->write($data);
35:
36: if (false === $feedMore) {
37: $source->pause();
38: }
39: });
40: $dest->on('close', function () use ($source, $dataer) {
41: $source->removeListener('data', $dataer);
42: $source->pause();
43: });
44:
45:
46: $dest->on('drain', $drainer = function () use ($source) {
47: $source->resume();
48: });
49: $source->on('close', function () use ($dest, $drainer) {
50: $dest->removeListener('drain', $drainer);
51: });
52:
53:
54: $end = isset($options['end']) ? $options['end'] : true;
55: if ($end) {
56: $source->on('end', $ender = function () use ($dest) {
57: $dest->end();
58: });
59: $dest->on('close', function () use ($source, $ender) {
60: $source->removeListener('end', $ender);
61: });
62: }
63:
64: return $dest;
65: }
66:
67: public static function forwardEvents($source, $target, array $events)
68: {
69: foreach ($events as $event) {
70: $source->on($event, function () use ($event, $target) {
71: $target->emit($event, func_get_args());
72: });
73: }
74: }
75: }
76: