basis-company/nats
PHP NATS client with support for TLS and JWT auth, pub/sub and request/reply patterns, plus JetStream APIs (microservices, key-value storage). Configurable reconnect and retry delays. Install via Composer and connect with simple configuration.
Installation Add the package via Composer:
composer require basis-company/nats
Ensure libsodium or sodium_compat is installed for NKeys functionality.
Basic Connection Initialize a client with default or custom configuration:
use Basis\Nats\Client;
use Basis\Nats\Configuration;
$client = new Client(new Configuration(host: 'nats-service'));
$client->ping(); // Verify connection
First Use Case: Pub/Sub Publish a message and subscribe to it:
$client->publish('test.subject', 'Hello NATS!');
$queue = $client->subscribe('test.subject');
$message = $queue->next();
echo $message->payload; // Output: "Hello NATS!"
src/: Core classes like Client, Configuration, and Message.tests/: Real-world examples and edge cases.$queue = $client->subscribe('orders.created');
while ($message = $queue->next()) {
processOrder($message->payload);
$message->ack(); // Acknowledge receipt
}
// Sync (blocking)
$response = $client->dispatch('rpc.user', 'fetch:123');
// Async (non-blocking)
$client->request('rpc.user', 'fetch:123', function ($resp) {
logResponse($resp);
});
$client->process(); // Process callbacks
$stream = $client->getApi()->getStream('orders');
$stream->create(); // Auto-configure with defaults
$stream->put('orders.new', json_encode(['id': 123]));
$consumer = $stream->getConsumer('processor');
$consumer->handle(function ($msg) {
// Process message
$msg->ack();
});
$service = $client->service('UserService', 'Handles user operations', '1.0');
$v1 = $service->addGroup('v1');
$v1->addEndpoint('create', CreateUserHandler::class);
$service->run();
$bucket = $client->getApi()->getBucket('config');
$bucket->put('theme', 'dark');
echo $bucket->get('theme'); // "dark"
try {
$client->publish('critical', 'alert');
} catch (\Basis\Nats\Exception\NatsException $e) {
logError($e);
}
Client instances; avoid recreating for every request.process() to handle callbacks in loops or long-running scripts.Connection Timeouts
ping() periodically or implement heartbeat logic:
if (!$client->ping()) reconnect();
Message Ordering
WORK_QUEUE retention may reorder messages.LAST_MESSAGE or SAMPLES retention for strict ordering.Consumer Leaks
Payload Size Limits
NKey/JWT Expiry
$config = new Configuration(verbose: true);
nats-server -DV for debug output.fetch() vs next():
fetch(): Non-blocking (returns null if empty).next(): Blocks until a message arrives (use with timeout).Custom Middleware
Client::publish() or Client::subscribe() to add logic:
$client->subscribe('audit.*', function ($msg) {
auditLog($msg->subject, $msg->payload);
});
JetStream Hooks
Stream or Consumer classes to add pre/post-processing:
class CustomStream extends \Basis\Nats\Stream\Stream {
public function put($subject, $data) {
$this->logPut($subject, $data); // Custom logic
parent::put($subject, $data);
}
}
Performance Tuning
$consumer->setBatching(100)->setIterations(5);
$config = new Configuration(verbose: false);
TLS Quirks
tlsCaFile points to the CA bundle.tlsCertFile and tlsKeyFile paths are correct.0.001 (1ms) in CONSTANT mode.getStream()) return null if the stream doesn’t exist.*) in subject filters match literally (not regex).Payload for Headers:
$payload = new \Basis\Nats\Message\Payload('data', [
'X-Custom-Header' => 'value'
]);
$client->publish('subject', $payload);
stream->info() to track message counts and lag.// In a Laravel service provider
$this->app->singleton(Client::class, function () {
return new Client(new Configuration(host: config('nats.host')));
});
How can I help you explore Laravel packages today?