enqueue/stomp
Enqueue STOMP transport: a Queue Interop implementation for sending and consuming messages over the STOMP protocol. Includes docs and community support resources; MIT licensed.
Install the Package
composer require enqueue/stomp
Ensure stomp-php/stomp-php is installed (required dependency).
Configure the DSN
Define a DSN (Data Source Name) in your Laravel .env:
QUEUE_CONNECTION=stomp
STOMP_DSN="stomp://user:pass@host:port/vhost"
Example with TLS:
STOMP_DSN="stomp+ssl://user:pass@host:61613/vhost"
First Use Case: Send a Job
use Enqueue\Client\Producer;
use Enqueue\Stomp\StompConnectionFactory;
$connectionFactory = new StompConnectionFactory();
$connection = $connectionFactory->createConnection(['dsn' => env('STOMP_DSN')]);
$producer = new Producer($connection);
$producer->send(new \QueueInterop\Message\Message('Hello STOMP!'), 'queue_name');
Consume Messages
use Enqueue\Client\Consumer;
$consumer = new Consumer($connection);
$consumer->consume('queue_name', function ($message) {
echo $message->getBody();
$message->ack();
});
config/enqueue.php (Laravel config)vendor/enqueue/stomp/src (Core logic)Producer-Consumer Pattern
$producer->send(new \QueueInterop\Message\Message(json_encode($data)), 'laravel.jobs');
$consumer->consume('laravel.jobs', function ($message) {
$data = json_decode($message->getBody(), true);
// Process $data
$message->ack(); // Acknowledge success
});
Laravel Integration
// config/queue.php
'connections' => [
'stomp' => [
'driver' => 'stomp',
'dsn' => env('STOMP_DSN'),
'options' => [
'prefetch_count' => 10, // Optimize performance
'heartbeat' => 30000, // 30s heartbeat
],
],
],
dispatch(new MyJob())->onConnection('stomp');
Error Handling
try-catch for message processing:
try {
$consumer->consume('queue_name', function ($message) {
// Process
$message->ack();
});
} catch (\Exception $e) {
$message->reject(); // Requeue or dead-letter
}
Batch Processing
$consumer->consume('queue_name', function ($batch) {
foreach ($batch as $message) {
// Process each message
$message->ack();
}
});
andrewmy/rabbitmq-management-api (included) for STOMP metrics.Connection Timeouts
$connectionFactory->setReconnectDelay(100); // ms
Message Ordering
x-max-priority) if ordering is critical.Memory Leaks
$message->ack() or $message->reject().setAutoAck(false) in consumers to manually control acknowledgments.SSL/TLS Issues
StompException. Configure CA certs:
$connectionFactory->setSslContext([
'local_cert' => '/path/to/cert.pem',
'local_pk' => '/path/to/key.pem',
'passphrase' => 'password',
]);
Queue Naming
$connectionFactory->setLogger(new \Monolog\Logger('stomp'));
if (!$connection->isConnected()) {
$connection->reconnect();
}
$message->getHeaders(); // Inspect metadata
$message->getBody(); // Inspect payload
Custom Middleware
$producer->addMiddleware(new class implements \Enqueue\Client\Middleware {
public function handle($message, callable $next) {
// Pre-process
$result = $next($message);
// Post-process
return $result;
}
});
Serialization
$message = new \QueueInterop\Message\Message(
serialize($data),
['content-type' => 'application/x-php-serialized']
);
Dynamic Queue Routing
$queueName = $message->getHeader('queue_name') ?? 'default';
$producer->send($message, $queueName);
Performance Tuning
$connectionFactory->setOptions(['prefetch_count' => 100]);
$consumer->consumeAsync('queue_name', $callback);
How can I help you explore Laravel packages today?