abc/job-worker-bundle
Symfony bundle for processing jobs from AbcJobServerBundle via php-enqueue. Define ProcessorInterface handlers tagged per job name and provide job routes (queue/replyTo) via RouteProviderInterface. Experimental; includes demo and basic config options.
Install the Bundle
composer require abc/job-worker-bundle
Enable the bundle in config/bundles.php:
return [
// ...
Abc\JobWorkerBundle\AbcJobWorkerBundle::class => ['all' => true],
];
Configure the Bundle
Add the required configuration in config/packages/abc_job_worker.yaml:
abc_job_worker:
server_baseUrl: "http://your-job-server/api/"
default_queue: "default"
default_replyTo: "reply"
Set Up Transport Layer
Ensure php-enqueue is configured to match AbcJobServerBundle (e.g., RabbitMQ, Redis, or Doctrine). Example for Redis:
enqueue:
client: "redis://localhost"
First Job Processor
Create a processor implementing ProcessorInterface:
namespace App\Job;
use Abc\JobWorkerBundle\Processor\Context;
use Abc\JobWorkerBundle\Processor\ProcessorInterface;
class SayHelloProcessor implements ProcessorInterface
{
public function process(?string $input, Context $context): void
{
echo "Hello, " . $input . "!\n";
}
}
Register the Processor
Tag the service in config/services.yaml:
services:
App\Job\SayHelloProcessor:
tags:
- { name: 'abc.job.processor', jobName: 'say_hello' }
Run the Worker Start the worker via CLI:
php bin/console abc:job:worker
Job Dispatch
Dispatch jobs from your application to AbcJobServerBundle (e.g., via HTTP API or SDK). Example:
$client = new \GuzzleHttp\Client();
$response = $client->post('http://your-job-server/api/jobs', [
'json' => [
'jobName' => 'say_hello',
'input' => 'World',
],
]);
Worker Consumption
The worker (abc:job:worker) polls the configured transport (e.g., Redis queue) for jobs. It routes each job to the corresponding processor based on jobName.
Context Utilization
Leverage the Context object in processors for:
getJobId(), getReplyTo()).reply(string $output)):
$context->reply("Processed: " . $input);
Error Handling
Throw exceptions in processors to mark jobs as failed. Use Context::fail(string $message) for custom failure messages.
Dependency Injection Inject services into processors via constructor:
class EmailProcessor implements ProcessorInterface
{
private $mailer;
public function __construct(\Swift_Mailer $mailer)
{
$this->mailer = $mailer;
}
public function process(?string $email, Context $context): void
{
$this->mailer->send(...);
}
}
Dynamic Queues
Override the default queue per job by adding queueName to the job payload:
# config/packages/abc_job_worker.yaml
abc_job_worker:
queues:
high_priority: "high_priority_queue"
Then dispatch with:
{ "jobName": "say_hello", "input": "World", "queueName": "high_priority" }
Retry Logic
Configure retries in AbcJobServerBundle (e.g., max retries, delay). The worker respects these settings automatically.
Monitoring Log job processing in processors:
public function process(?string $input, Context $context): void
{
\Monolog\Logger::getInstance()->info("Processing job", ['input' => $input]);
}
Transport Mismatch
AbcJobServerBundle must use the same transport as configured in php-enqueue.enqueue.client in config/packages/enqueue.yaml matches the server’s transport (e.g., redis://localhost for Redis).JobName Case Sensitivity
jobName in processor tags and job payloads must match exactly (including case).final class JobNames {
public const SAY_HELLO = 'say_hello';
}
Worker Stuck on Startup
php bin/console debug:container Abc\JobWorkerBundle
Ensure the abc.job.processor tag is correctly applied.Context Not Persisted
Context may not reach the server if the transport connection drops.Context operations in a try-catch and log failures:
try {
$context->reply($output);
} catch (\Throwable $e) {
\Monolog\Logger::getInstance()->error("Reply failed", ['error' => $e->getMessage()]);
}
Log Job Payloads Add a debug processor to inspect incoming jobs:
class DebugProcessor implements ProcessorInterface
{
public function process(?string $input, Context $context): void
{
\Monolog\Logger::getInstance()->debug("Job received", [
'input' => $input,
'context' => $context->toArray(),
]);
}
}
Test Locally with In-Memory Transport
Use enqueue:amqp with localhost for testing:
enqueue:
client: "amqp://guest:guest@localhost:5672/%2f"
Check Worker Status Run the worker in foreground mode for real-time logs:
php bin/console abc:job:worker --no-daemon
Custom Transport Adapter
Extend Abc\JobWorkerBundle\Transport\TransportInterface to support unsupported transports (e.g., AWS SQS).
Middleware for Jobs Create a middleware stack for processors (e.g., logging, validation):
namespace App\Job\Middleware;
use Abc\JobWorkerBundle\Processor\Context;
use Closure;
class LoggingMiddleware
{
public function __invoke(Closure $next, ?string $input, Context $context)
{
\Monolog\Logger::getInstance()->info("Processing job", ['input' => $input]);
return $next($input, $context);
}
}
Register it in services.yaml:
services:
App\Job\Middleware\LoggingMiddleware:
tags:
- { name: 'abc.job.middleware' }
Batch Processing
Use abc:job:worker --batch-size=10 to process jobs in batches (experimental; check bundle docs for updates).
Custom Job Serialization
Override serialization in Abc\JobWorkerBundle\Serializer\SerializerInterface for non-JSON payloads (e.g., XML).
How can I help you explore Laravel packages today?