Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Buffer.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ public function __construct($stream, LoopInterface $loop)
throw new \InvalidArgumentException('First parameter must be a valid stream resource');
}

// this class relies on non-blocking I/O in order to not interrupt the event loop
// e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
if (stream_set_blocking($stream, 0) !== true) {
throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
}

$this->stream = $stream;
$this->loop = $loop;
}
Expand Down
12 changes: 8 additions & 4 deletions src/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ class Stream extends EventEmitter implements DuplexStreamInterface

public function __construct($stream, LoopInterface $loop, WritableStreamInterface $buffer = null)
{
$this->stream = $stream;
if (!is_resource($this->stream) || get_resource_type($this->stream) !== "stream") {
if (!is_resource($stream) || get_resource_type($stream) !== "stream") {
throw new InvalidArgumentException('First parameter must be a valid stream resource');
}

stream_set_blocking($this->stream, 0);
// this class relies on non-blocking I/O in order to not interrupt the event loop
// e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
if (stream_set_blocking($stream, 0) !== true) {
throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
}

// Use unbuffered read operations on the underlying stream resource.
// Reading chunks from the stream may otherwise leave unread bytes in
Expand All @@ -49,13 +52,14 @@ public function __construct($stream, LoopInterface $loop, WritableStreamInterfac
// This does not affect the default event loop implementation (level
// triggered), so we can ignore platforms not supporting this (HHVM).
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($this->stream, 0);
stream_set_read_buffer($stream, 0);
}

if ($buffer === null) {
$buffer = new Buffer($stream, $loop);
}

$this->stream = $stream;
$this->loop = $loop;
$this->buffer = $buffer;

Expand Down
18 changes: 17 additions & 1 deletion tests/BufferTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,29 @@ public function testConstructor()

/**
* @covers React\Stream\Buffer::__construct
* @expectedException InvalidArgumentException
*/
public function testConstructorThrowsIfNotAValidStreamResource()
{
$stream = null;
$loop = $this->createLoopMock();

$this->setExpectedException('InvalidArgumentException');
new Buffer($stream, $loop);
}

/**
* @covers React\Stream\Buffer::__construct
*/
public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking()
{
if (!in_array('blocking', stream_get_wrappers())) {
stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper');
}

$stream = fopen('blocking://test', 'r+');
$loop = $this->createLoopMock();

$this->setExpectedException('RuntimeException');
new Buffer($stream, $loop);
}

Expand Down
30 changes: 30 additions & 0 deletions tests/EnforceBlockingWrapper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace React\Tests\Stream;

/**
* Used to test dummy stream resources that do not support setting non-blocking mode
*
* @link http://php.net/manual/de/class.streamwrapper.php
*/
class EnforceBlockingWrapper
{
public function stream_open($path, $mode, $options, &$opened_path)
{
return true;
}

public function stream_cast($cast_as)
{
return false;
}

public function stream_set_option($option, $arg1, $arg2)
{
if ($option === STREAM_OPTION_BLOCKING) {
return false;
}

return true;
}
}
19 changes: 18 additions & 1 deletion tests/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,26 @@ public function testConstructor()
*/
public function testConstructorThrowsExceptionOnInvalidStream()
{
$loop = $this->createLoopMock();

$this->setExpectedException('InvalidArgumentException');
new Stream('breakme', $loop);
}

/**
* @covers React\Stream\Stream::__construct
*/
public function testConstructorThrowsExceptionIfStreamDoesNotSupportNonBlocking()
{
if (!in_array('blocking', stream_get_wrappers())) {
stream_wrapper_register('blocking', 'React\Tests\Stream\EnforceBlockingWrapper');
}

$stream = fopen('blocking://test', 'r+');
$loop = $this->createLoopMock();
$conn = new Stream('breakme', $loop);

$this->setExpectedException('RuntimeException');
new Stream($stream, $loop);
}

/**
Expand Down