php-standard-library/channel
Async channels for PHP: lightweight, standard-library-style primitives to pass values between coroutines or threads. Provides buffered/unbuffered channels, send/receive operations, closing semantics, and helpers for coordinating producers/consumers and building pipelines.
Installation
composer require php-standard-library/channel
Add to composer.json if using a monorepo or custom package management.
Basic Channel Creation
use PhpStandardLibrary\Channel;
$channel = new Channel(); // Unbuffered by default
First Use Case: Simple Producer-Consumer
// Producer (runs in a background process or queue job)
$channel->send('Hello, Channel!');
// Consumer (runs elsewhere)
$message = $channel->receive();
echo $message; // Outputs: "Hello, Channel!"
Where to Look First
README.md for basic examples.src/Channel.php for core logic.tests/ for edge cases (e.g., ChannelTest.php).$channel = new Channel(); // No buffer
N messages without blocking.
$channel = new Channel(5); // Buffer size = 5
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
class SendMessageJob implements ShouldQueue
{
use Queueable;
public function handle()
{
$channel = app(Channel::class);
$channel->send('Async message');
}
}
$channel = app(Channel::class);
$message = $channel->receive(); // Blocks until message arrives
$channel = new Channel();
$timeout = 2.0; // Seconds
if ($channel->select([$channel], $timeout)) {
$message = $channel->receive();
// Handle message
} else {
// Timeout or other channel ready
}
$channel->close(); // Prevents further sends/receives
if ($channel->isClosed()) {
// Cleanup logic
}
// config/app.php
'bindings' => [
Channel::class => function ($app) {
return new Channel(10); // Singleton with buffer
},
];
// Service A
$channel = app('order.processed');
$channel->send($orderId);
// Service B (listens in a separate process)
$channel = app('order.processed');
$orderId = $channel->receive();
// Process order...
$mockChannel = Mockery::mock(Channel::class);
$mockChannel->shouldReceive('receive')->once()->andReturn('test');
select() for timeouts.
$channel = new Channel(1); // Small buffer to prevent starvation
if ($channel->isFull()) {
throw new \RuntimeException('Channel buffer full');
}
__serialize()/__unserialize() or use json_encode()/json_decode() explicitly.
$channel->send(json_encode($complexObject));
$data = $channel->receive();
$object = json_decode($data, true);
\Log::debug('Channel buffer size:', ['size' => $channel->bufferSize()]);
$message = $channel->receive(5.0); // 5-second timeout
if ($message === null) {
\Log::warning('Channel receive timed out');
}
if ($channel->isClosed()) {
throw new \RuntimeException('Channel is closed');
}
Channel for domain-specific logic:
class OrderedChannel extends Channel
{
public function sendPrioritized($message, int $priority)
{
// Custom logic
}
}
$channel = new Channel();
$channel->onSend(function ($message) {
\Log::info('Message sent:', ['message' => $message]);
});
$channel = new FileChannel('/tmp/channel.dat');
new Channel()) have a buffer size of 0. Ensure this matches your use case.memory_limit. Monitor with:
$bufferSize = $channel->bufferSize();
if ($bufferSize > 1000) {
\Log::warning('Large channel buffer detected');
}
How can I help you explore Laravel packages today?