diff --git a/src/Buffer.php b/src/Buffer.php index 614edb3..af4b84e 100644 --- a/src/Buffer.php +++ b/src/Buffer.php @@ -21,6 +21,12 @@ public function __construct($stream, LoopInterface $loop) throw new \InvalidArgumentException('First parameter must be a valid stream resource'); } + // this class relies on non-blocking I/O in order to not interrupt the event loop + // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918 + if (stream_set_blocking($stream, 0) !== true) { + throw new \RuntimeException('Unable to set stream resource to non-blocking mode'); + } + $this->stream = $stream; $this->loop = $loop; } diff --git a/src/Stream.php b/src/Stream.php index a23a4a8..02ded13 100644 --- a/src/Stream.php +++ b/src/Stream.php @@ -35,12 +35,15 @@ class Stream extends EventEmitter implements DuplexStreamInterface public function __construct($stream, LoopInterface $loop, WritableStreamInterface $buffer = null) { - $this->stream = $stream; - if (!is_resource($this->stream) || get_resource_type($this->stream) !== "stream") { + if (!is_resource($stream) || get_resource_type($stream) !== "stream") { throw new InvalidArgumentException('First parameter must be a valid stream resource'); } - stream_set_blocking($this->stream, 0); + // this class relies on non-blocking I/O in order to not interrupt the event loop + // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918 + if (stream_set_blocking($stream, 0) !== true) { + throw new \RuntimeException('Unable to set stream resource to non-blocking mode'); + } // Use unbuffered read operations on the underlying stream resource. // Reading chunks from the stream may otherwise leave unread bytes in @@ -49,13 +52,14 @@ public function __construct($stream, LoopInterface $loop, WritableStreamInterfac // This does not affect the default event loop implementation (level // triggered), so we can ignore platforms not supporting this (HHVM). if (function_exists('stream_set_read_buffer')) { - stream_set_read_buffer($this->stream, 0); + stream_set_read_buffer($stream, 0); } if ($buffer === null) { $buffer = new Buffer($stream, $loop); } + $this->stream = $stream; $this->loop = $loop; $this->buffer = $buffer; diff --git a/tests/BufferTest.php b/tests/BufferTest.php index 284a842..5c19669 100644 --- a/tests/BufferTest.php +++ b/tests/BufferTest.php @@ -20,13 +20,29 @@ public function testConstructor() /** * @covers React\Stream\Buffer::__construct - * @expectedException InvalidArgumentException */ public function testConstructorThrowsIfNotAValidStreamResource() { $stream = null; $loop = $this->createLoopMock(); + $this->setExpectedException('InvalidArgumentException'); + new Buffer($stream, $loop); + } + + /** + * @covers React\Stream\Buffer::__construct + */ + public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking() + { + if (!in_array('blocking', stream_get_wrappers())) { + stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper'); + } + + $stream = fopen('blocking://test', 'r+'); + $loop = $this->createLoopMock(); + + $this->setExpectedException('RuntimeException'); new Buffer($stream, $loop); } diff --git a/tests/EnforceBlockingWrapper.php b/tests/EnforceBlockingWrapper.php new file mode 100644 index 0000000..1cc15ab --- /dev/null +++ b/tests/EnforceBlockingWrapper.php @@ -0,0 +1,30 @@ +createLoopMock(); + $this->setExpectedException('InvalidArgumentException'); + new Stream('breakme', $loop); + } + + /** + * @covers React\Stream\Stream::__construct + */ + public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking() + { + if (!in_array('blocking', stream_get_wrappers())) { + stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper'); + } + + $stream = fopen('blocking://test', 'r+'); $loop = $this->createLoopMock(); - $conn = new Stream('breakme', $loop); + + $this->setExpectedException('RuntimeException'); + new Stream($stream, $loop); } /**