php-standard-library/channel
A lightweight PHP standard library component that provides a channel abstraction for passing messages between producers and consumers. Useful for simple concurrency patterns, pipelines, and event-style communication with a minimal, dependency-free API.
Install the Package
composer require php-standard-library/channel
Ensure your composer.json includes it under require or require-dev if testing.
Basic Channel Setup Create an unbuffered channel for synchronous communication:
use PhpStandardLibrary\Channel\Channel;
$channel = new Channel();
First Use Case: Async Task Coordination Replace a synchronous service call with a channel to decouple components:
// In ServiceA (producer)
$channel = new Channel();
$channel->send('process_order', ['order_id' => 123]);
// In ServiceB (consumer, runs in a separate process/fiber)
$data = $channel->receive();
processOrder($data['order_id']);
Laravel Integration (Manual)
Register a channel instance in AppServiceProvider:
public function register()
{
$this->app->singleton('orderChannel', fn() => new Channel());
}
Inject it into classes via constructor:
public function __construct(private Channel $orderChannel) {}
Producer-Consumer Pattern
// Producer (e.g., HTTP request handler)
$channel = app('orderChannel');
$channel->send('order_created', ['user_id' => 1, 'items' => [...]]);
// Consumer (e.g., background process)
while (true) {
$payload = $channel->receive(); // Blocks until data arrives
handleOrder($payload['user_id'], $payload['items']);
}
Buffered Channels for Rate Limiting Limit inbound requests to a service:
$bufferedChannel = new BufferedChannel(5); // Max 5 pending messages
$bufferedChannel->send('task', $data); // Blocks if buffer is full
Selective Receive (Non-Blocking)
Wait on multiple channels (requires pcntl or fibers):
$channel1 = new Channel();
$channel2 = new Channel();
$selected = Channel::select([
$channel1,
$channel2,
]);
if ($selected === $channel1) {
$data = $channel1->receive();
}
Laravel Job Integration Use channels to trigger jobs without queue overhead:
// In a controller
$channel = app('jobTriggerChannel');
$channel->send('send_email', ['user_id' => 1]);
// In a JobConsumer (runs in a separate process)
while (true) {
$jobData = $channel->receive();
SendEmailJob::dispatch($jobData['user_id']);
}
With Swoole/RoadRunner: Use channels for inter-worker communication within the same process group. Example (Swoole):
$channel = new Channel();
Swoole\Coroutine::create(function() use ($channel) {
$channel->send('data', 'from_coroutine');
});
With ReactPHP: Bridge channels to ReactPHP’s event loop:
$loop = React\EventLoop\Factory::create();
$channel = new Channel();
$loop->addTimer(0, function() use ($channel, $loop) {
$data = $channel->receive();
$loop->stop();
});
With Laravel Queues: Use channels for local coordination and queues for distributed tasks:
// Channel for internal sync
$channel = new Channel();
$channel->send('sync_data', $data);
// Queue for external persistence
dispatch(new SyncDataJob($data));
Channel Factories: Centralize channel creation in a service:
class ChannelFactory
{
public function make(string $name, int $bufferSize = 0): Channel
{
return $bufferSize > 0
? new BufferedChannel($bufferSize)
: new Channel();
}
}
Deadlocks
UnbufferedChannel for critical paths or implement timeouts:
if (!$channel->sendNonBlocking('data', $timeoutMs)) {
log('Channel full, retrying...');
}
Message Loss
$channel = new Channel();
$redis = Redis::connection();
$channel->send('critical', $data);
$redis->rpush('channel_backlog:critical', json_encode($data));
Thread Safety
pcntl or pthreads can introduce race conditions.Buffer Size Misconfiguration
memory_get_usage() and adjust dynamically:
$bufferSize = min(100, memory_get_peak_usage(true) / 1024); // ~100KB buffer
Laravel Service Container Conflicts
AppServiceProvider:
$this->app->bind('channels.order', fn() => new BufferedChannel(5));
Logging Channel States Add logging to track sends/receives:
$channel = new Channel();
$channel->onSend(function($data) {
Log::debug('Channel sent', ['data' => $data]);
});
Timeouts for Blocking Calls Prevent indefinite hangs:
$data = $channel->receive(1000); // Timeout after 1s
if ($data === null) {
Log::warning('Channel receive timed out');
}
Monitoring Buffer Usage Track buffered messages to avoid overflows:
if ($channel instanceof BufferedChannel) {
$usage = $channel->bufferUsage();
Log::info('Channel buffer usage', ['usage' => $usage]);
}
Process Isolation Run consumers in separate processes to avoid GIL contention:
php artisan process:start --channel=orders
Custom Channel Types
Extend Channel for domain-specific logic:
class OrderChannel extends Channel
{
public function sendOrder(Order $order)
{
$this->send('order', $order->toArray());
}
}
Middleware for Channels Add validation/transformations:
$channel = new Channel();
$channel->onSend(function($data) {
if (!isset($data['user_id'])) {
throw new \InvalidArgumentException('Missing user_id');
}
});
Channel Routers Route messages to multiple consumers:
class ChannelRouter
{
public function __construct(private array $channels) {}
public function route(string $type, $data)
{
foreach ($this->channels[$type] ?? [] as $channel) {
$channel->send($data);
}
}
}
Integration with Laravel Events Bridge channels to Laravel’s event system:
$channel = new Channel();
event(new OrderCreated($order));
// Consumer
event(new ChannelEventListener($channel));
Queue vs. Channel Confusion
Artisan Command Integration Run consumers as Artisan commands:
class ProcessOrdersCommand extends Command
{
protected $signature = 'orders:process';
protected $channel;
public function __construct()
{
parent::__construct();
$this->channel = app('orderChannel');
}
public function handle()
{
while (true) {
$order = $this->channel->receive();
$this->processOrder($order);
}
}
}
Testing Channels Mock channels in tests to avoid blocking:
$mockChannel = Mockery::mock(Channel::class);
How can I help you explore Laravel packages today?