answear/messenger-heartbeat-bundle
Install the Bundle:
composer require answear/messenger-heartbeat-bundle
config/bundles.php.Enable Heartbeat for Workers:
Add the --keepalive flag to your existing messenger:consume and messenger:failed:retry commands:
php bin/console messenger:consume async -vv --keepalive
php bin/console messenger:failed:retry all --keepalive
Verify Integration:
[heartbeat] Sending keepalive signal...
Scenario: Your application uses Symfony Messenger with AMQP transport (e.g., RabbitMQ) for processing long-running background jobs (e.g., video encoding, data migration). You notice intermittent connection drops causing message loss or retries.
Solution:
--keepalive:
php bin/console messenger:consume workers:video-encoding --keepalive
Standard Worker Configuration:
--keepalive for workers handling long-running or critical tasks:
# For async processing
php bin/console messenger:consume async --keepalive --limit=10
# For failed message retries
php bin/console messenger:failed:retry all --keepalive
Environment-Specific Flags:
# In Dockerfile or deployment script
if [ "$ENVIRONMENT" = "production" ]; then
EXTRA_FLAGS="--keepalive"
fi
php bin/console messenger:consume async $EXTRA_FLAGS
Selective Heartbeat Activation:
// In a custom command class
protected function configure(): void
{
$this->addOption('keepalive', null, InputOption::VALUE_NONE, 'Enable heartbeat for this worker');
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
if ($input->getOption('keepalive')) {
// Custom logic to enable heartbeat for this worker only
}
return $this->runMessengerCommand($input, $output);
}
Heartbeat + Retry Logic:
# config/packages/messenger.yaml
framework:
messenger:
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
keepalive: true # Not directly configurable; use CLI flag
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 2
Monitoring Integration:
// In a custom listener or subscriber
public function onMessageSent(MessageSentEvent $event)
{
if (method_exists($event->getBus(), 'getTransport')) {
$transport = $event->getBus()->getTransport();
if ($transport instanceof AmqpTransport) {
$this->logger->info('Heartbeat active for transport', [
'transport' => get_class($transport),
'connection' => $transport->getConnection()->getConnectionName(),
]);
}
}
}
Graceful Shutdown:
// In a custom worker bootstrap script
declare(ticks=1);
pcntl_signal(SIGTERM, function () {
$this->logger->info('Shutting down worker gracefully...');
// Allow current message to process before exiting
sleep(1);
exit(0);
});
AMQP Transport Configuration:
messenger.yaml is configured for AMQP:
framework:
messenger:
transports:
async:
dsn: 'amqp://user:pass@rabbitmq:5672/%2f/messages'
options:
keepalive: true # CLI flag overrides this; kept for clarity
Custom Transport Extensions:
use Answear\MessengerHeartbeatBundle\DependencyInjection\Configuration;
use Symfony\Component\DependencyInjection\ContainerBuilder;
class CustomHeartbeatExtension extends Configuration
{
public function load(array $configs, ContainerBuilder $container)
{
parent::load($configs, $container);
// Add custom heartbeat interval or other options
$container->setParameter('messenger_heartbeat.interval', 30);
}
}
Docker/Kubernetes Deployments:
# kubernetes/deployment.yaml
livenessProbe:
exec:
command:
- sh
- -c
- "php bin/console messenger:consume async --keepalive --limit=1 | grep -q 'Heartbeat active'"
initialDelaySeconds: 30
periodSeconds: 10
Symfony Version Mismatch:
composer require symfony/*:^7.2 php:^8.2
Heartbeat Not Triggering:
--keepalive flag isn’t being recognized.config/bundles.php.--keepalive, not --heartbeat).messenger:consume or messenger:failed:retry commands (not custom scripts).Message Duplication:
prefetch_count set appropriately in your transport options:
options:
prefetch_count: 10 # Adjust based on your workload
Global Signals Conflicts:
// Override the heartbeat service
services:
Answear\MessengerHeartbeatBundle\Heartbeat\Heartbeat:
arguments:
$useGlobalSignals: false
Performance Overhead:
htop or Prometheus.Enable Verbose Logging:
-vv to see heartbeat activity:
php bin/console messenger:consume async --keepalive -vv
[heartbeat] Last activity: 10 seconds ago
[heartbeat] Sending keepalive signal...
Check AMQP Connection:
rabbitmqctl or the RabbitMQ management UI to verify connection status:
rabbitmqctl list_connections | grep 'heartbeat'
Test Network Interruptions:
# Linux (using tc)
sudo tc qdisc add dev eth0 root netem loss 5% # 5% packet loss
# Re-run worker and check for recovery
php bin/console messenger:consume async --keepalive
CLI Flag Precedence:
--keepalive flag overrides any transport-level keepalive options in messenger.yaml. CLI flags take priority.Bundle Auto-Registration:
How can I help you explore Laravel packages today?