Overview

Namespaces

  • Evenement
  • None
  • PHP
  • Psr
    • Http
      • Message
  • Ratchet
    • Http
    • RFC6455
      • Handshake
      • Messaging
    • Server
    • Session
      • Serialize
      • Storage
        • Proxy
    • Wamp
    • WebSocket
  • React
    • EventLoop
      • Tick
      • Timer
    • Socket
    • Stream
  • Symfony
    • Component
      • HttpFoundation
        • Session
          • Attribute
          • Flash
          • Storage
            • Handler
            • Proxy
      • Routing
        • Annotation
        • Exception
        • Generator
          • Dumper
        • Loader
          • DependencyInjection
        • Matcher
          • Dumper
        • Tests
          • Annotation
          • Fixtures
            • AnnotatedClasses
            • OtherAnnotatedClasses
          • Generator
            • Dumper
          • Loader
          • Matcher
            • Dumper

Classes

  • BufferedSink
  • CompositeStream
  • DuplexResourceStream
  • ReadableResourceStream
  • ReadableStream
  • ThroughStream
  • Util
  • WritableResourceStream
  • WritableStream

Interfaces

  • DuplexStreamInterface
  • ReadableStreamInterface
  • WritableStreamInterface
  • Overview
  • Namespace
  • Class
  • Tree
 1: <?php
 2: 
 3: namespace React\Stream;
 4: 
 5: class Util
 6: {
 7:     /**
 8:      * Pipes all the data from the given $source into the $dest
 9:      *
10:      * @param ReadableStreamInterface $source
11:      * @param WritableStreamInterface $dest
12:      * @param array $options
13:      * @return WritableStreamInterface $dest stream as-is
14:      * @see ReadableStreamInterface::pipe() for more details
15:      */
16:     public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
17:     {
18:         // source not readable => NO-OP
19:         if (!$source->isReadable()) {
20:             return $dest;
21:         }
22: 
23:         // destination not writable => just pause() source
24:         if (!$dest->isWritable()) {
25:             $source->pause();
26: 
27:             return $dest;
28:         }
29: 
30:         $dest->emit('pipe', array($source));
31: 
32:         // forward all source data events as $dest->write()
33:         $source->on('data', $dataer = function ($data) use ($source, $dest) {
34:             $feedMore = $dest->write($data);
35: 
36:             if (false === $feedMore) {
37:                 $source->pause();
38:             }
39:         });
40:         $dest->on('close', function () use ($source, $dataer) {
41:             $source->removeListener('data', $dataer);
42:             $source->pause();
43:         });
44: 
45:         // forward destination drain as $source->resume()
46:         $dest->on('drain', $drainer = function () use ($source) {
47:             $source->resume();
48:         });
49:         $source->on('close', function () use ($dest, $drainer) {
50:             $dest->removeListener('drain', $drainer);
51:         });
52: 
53:         // forward end event from source as $dest->end()
54:         $end = isset($options['end']) ? $options['end'] : true;
55:         if ($end) {
56:             $source->on('end', $ender = function () use ($dest) {
57:                 $dest->end();
58:             });
59:             $dest->on('close', function () use ($source, $ender) {
60:                 $source->removeListener('end', $ender);
61:             });
62:         }
63: 
64:         return $dest;
65:     }
66: 
67:     public static function forwardEvents($source, $target, array $events)
68:     {
69:         foreach ($events as $event) {
70:             $source->on($event, function () use ($event, $target) {
71:                 $target->emit($event, func_get_args());
72:             });
73:         }
74:     }
75: }
76: 
Ratchet API documentation generated by ApiGen 2.8.0