3slab/vdm-library-flysystem-transport-bundle
Install the Bundle
composer require 3slab/vdm-library-flysystem-transport-bundle
Configure the Transport
Add the transport to your config/packages/messenger.yaml:
framework:
messenger:
transports:
flysystem_consumer:
dsn: "vdm+flysystem://default.storage"
retry_strategy:
max_retries: 0
First Use Case: Polling Files
Define a message handler (e.g., ProcessFileMessageHandler) to process files fetched from FlySystem. The transport will automatically poll files from the configured FlySystem adapter (e.g., default.storage).
Define a Message Class
Create a message class (e.g., FileProcessMessage) to encapsulate file metadata (e.g., path, content, metadata).
class FileProcessMessage {
public function __construct(
private string $filePath,
private string $content,
private array $metadata
) {}
}
Configure the Transport
Use the vdm+flysystem:// DSN format to link the transport to a FlySystem service:
transports:
flysystem_consumer:
dsn: "vdm+flysystem://my_custom_flysystem.storage"
options:
flysystem_executor: app.custom_flysystem_executor
Custom Executor for Advanced Logic
Extend DefaultFlysystemExecutor to customize behavior (e.g., recursive listing, file deletion after processing):
namespace App\Messenger\Executor;
use VdmLibrary\FlysystemTransportBundle\Executor\DefaultFlysystemExecutor;
class CustomFlysystemExecutor extends DefaultFlysystemExecutor {
public function listFiles(string $directory): array {
return $this->filesystem->listContents($directory, true); // Recursive listing
}
public function deleteProcessedFile(string $filePath): void {
$this->filesystem->delete($filePath);
}
}
Register the executor as a service in config/services.yaml:
services:
App\Messenger\Executor\CustomFlysystemExecutor:
tags: ['messenger.executor']
Handle Messages
Implement a handler for FileProcessMessage:
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class FileProcessHandler {
public function __invoke(FileProcessMessage $message) {
// Process file (e.g., parse, transform, store in DB)
$this->process($message->filePath, $message->content);
}
}
Dispatch the Consumer Run the messenger worker to start polling:
php bin/console messenger:consume flysystem_consumer -vv
local, s3, ftp) is properly configured in config/packages/flysystem.yaml.max_retries: 0).mimeType, timestamp) to enrich messages:
$file = $this->filesystem->read($filePath);
$message = new FileProcessMessage($filePath, $file, [
'mime' => $this->filesystem->mimeType($filePath),
'size' => $this->filesystem->fileSize($filePath),
]);
DSN Format Strictness
vdm+flysystem:// followed by the FlySystem service ID (e.g., default.storage).InvalidArgumentException if the DSN is malformed or the FlySystem service doesn’t exist.Retry Strategy
max_retries must be 0. Attempting to set a non-zero value will throw an exception or silently fail.File Locking
fileExists() checks or external locks (e.g., Redis).Executor Misconfiguration
flysystem_executor is not set, the bundle falls back to DefaultFlysystemExecutor. Ensure your custom executor is properly tagged as a service:
tags: ['messenger.executor'] # Required for autowiring
File Deletion Timing
$this->filesystem->delete($filePath) in your executor or handler if cleanup is required.Verify FlySystem Service Check if the FlySystem service exists and is configured:
php bin/console debug:container my_flysystem_service
Log Transport Activity
Enable debug mode (APP_DEBUG=true) and inspect logs for transport polling:
php bin/console messenger:consume flysystem_consumer -vv
Look for entries like:
[2023-10-01 12:00:00] messenger.DEBUG: Polling files from "vdm+flysystem://default.storage" []
Test Executor Logic Unit test your custom executor to ensure it behaves as expected:
public function testListFiles() {
$executor = new CustomFlysystemExecutor($this->createMock(FlySystemInterface::class));
$files = $executor->listFiles('/test');
$this->assertCount(2, $files); // Example assertion
}
Handle Large Directories
$files = $this->filesystem->listContents($directory, false);
$files = array_filter($files, fn($file) => str_ends_with($file['path'], '.csv'));
Custom Message Mapping
Override the default message mapping (e.g., FileProcessMessage) by extending the transport’s MessageMapper:
namespace App\Messenger\Mapper;
use VdmLibrary\FlysystemTransportBundle\Mapper\FileMessageMapper;
class CustomFileMessageMapper extends FileMessageMapper {
protected function createMessageFromFile(SplFileInfo $file): FileProcessMessage {
return new FileProcessMessage(
$file->getPathname(),
$this->filesystem->read($file->getPathname()),
['custom' => 'metadata']
);
}
}
Register it as a service and reference it in the transport options.
Async Processing Offload heavy processing to a queue (e.g., Symfony Messenger + Doctrine) by dispatching a secondary message:
#[AsMessageHandler]
class FileProcessHandler {
public function __invoke(FileProcessMessage $message) {
$this->messageBus->dispatch(new AsyncProcessMessage($message->filePath));
}
}
Dynamic Transport Configuration Use environment variables or runtime logic to dynamically set the FlySystem service ID:
transports:
flysystem_consumer:
dsn: "%env(FLYSYSTEM_TRANSPORT_DSN)%" # e.g., "vdm+flysystem://%env(FLYSYSTEM_SERVICE)%"
How can I help you explore Laravel packages today?