amphp/sql
Async SQL library for PHP built on Amp. Provides non-blocking database connections, query execution, and result handling with a consistent API, enabling high-concurrency apps without blocking I/O. Supports common drivers and integrates cleanly with event-loop workflows.
Installation
composer require amphp/sql
Add to your composer.json under require-dev if testing only.
First Use Case: Query Execution
use Amp\Sql\Connection;
use Amp\Sql\Statement;
// Assume `$connection` is an Amp-compatible SQL connection (e.g., from `amphp/pdo` or `amphp/mysql`).
$query = $connection->prepare('SELECT * FROM users WHERE id = :id');
$query->bindValue(':id', 1);
$result = $query->execute();
$rows = $result->fetchAll();
Key Interfaces to Know
Connection: Manages connections, transactions, and prepared statements.Statement: Represents a prepared SQL statement with binding and execution methods.Result: Handles query results (e.g., fetch(), fetchAll()).Where to Look First
Amp\Sql\ConnectionInterface for core methods.Amp\Sql\StatementInterface for query execution patterns.Leverage Amp’s coroutines for non-blocking SQL operations:
use Amp\Sql\Connection;
use Amp\Sql\Result;
async function fetchUserData(Connection $db, int $userId): array {
$stmt = $db->prepare('SELECT * FROM users WHERE id = :id');
$stmt->bindValue(':id', $userId);
$result = $stmt->execute();
return $result->fetchAll(); // Non-blocking fetch
}
Wrap operations in a transaction for atomicity:
async function transferFunds(Connection $db, int $fromId, int $toId, float $amount) {
$db->beginTransaction();
try {
$db->execute('UPDATE accounts SET balance = balance - :amount WHERE id = :fromId', [
':amount' => $amount,
':fromId' => $fromId,
]);
$db->execute('UPDATE accounts SET balance = balance + :amount WHERE id = :toId', [
':amount' => $amount,
':toId' => $toId,
]);
$db->commit();
} catch (\Throwable $e) {
$db->rollBack();
throw $e;
}
}
Reuse connections efficiently (e.g., with amphp/pdo or custom pools):
$pool = new Amp\Sql\ConnectionPool([
new Amp\Sql\Connection('mysql:host=localhost;dbname=test'),
new Amp\Sql\Connection('mysql:host=localhost;dbname=test'),
]);
async function queryWithPool(Amp\Sql\ConnectionPool $pool, string $sql) {
$connection = $pool->get();
try {
$result = $connection->execute($sql);
return $result->fetchAll();
} finally {
$pool->release($connection);
}
}
Use amphp/sql alongside Laravel’s Query Builder for async operations:
use Amp\Sql\Connection;
use Illuminate\Support\Facades\DB;
async function asyncLaravelQuery(Connection $ampDb) {
$laravelConnection = DB::connection()->getPdo();
$ampConnection = new Amp\Sql\Connection($laravelConnection); // Hypothetical bridge
$query = $ampConnection->prepare(DB::raw('SELECT * FROM users'));
return $query->execute()->fetchAll();
}
Note: Requires a custom bridge (e.g., amphp/pdo or a wrapper).
Wrap queries in try-catch for async errors:
async function safeQuery(Connection $db) {
try {
$result = $db->execute('SELECT * FROM nonexistent_table');
return $result->fetchAll();
} catch (Amp\Sql\Exception\QueryException $e) {
// Log or retry
throw new \RuntimeException('Query failed: ' . $e->getMessage());
}
}
Blocking Calls
amphp/sql with synchronous Laravel DB calls (e.g., DB::select()). Use async alternatives or coroutines.await or coroutines for async operations.Connection Leaks
finally blocks or context managers (e.g., Amp\ByteStream\Resource).Driver Compatibility
amphp/pdo or amphp/mysql.amphp-specific drivers (e.g., amphp/mysql over PDO).Transaction Isolation
Result Consumption
fetchAll() loads all rows into memory. For large datasets, use fetch() in a loop.async function streamResults(Connection $db) {
$result = $db->execute('SELECT * FROM large_table');
while ($row = $result->fetch()) {
yield $row; // Process row-by-row
}
}
Enable Logging
Use Amp\Sql\LoggerInterface to log queries:
$db->setLogger(new class implements Amp\Sql\LoggerInterface {
public function log(string $query, array $params, float $time): void {
error_log("Query: $query | Time: $time ms");
}
});
Query Timeouts Set timeouts on connections:
$db = new Amp\Sql\Connection('mysql:host=localhost', [
'timeout' => 5.0, // 5 seconds
]);
Connection Errors
Handle Amp\Sql\Exception\ConnectionException for network issues.
Custom Drivers
Implement Amp\Sql\DriverInterface for unsupported databases.
Query Builders
Extend Amp\Sql\Statement to add fluent query methods:
class AsyncQueryBuilder {
public function where(Connection $db, string $column, $value): Statement {
return $db->prepare("SELECT * FROM table WHERE $column = :value")
->bindValue(':value', $value);
}
}
Result Transformers
Decorate Amp\Sql\Result to auto-map rows to objects:
class UserResult implements \IteratorAggregate {
private $result;
public function __construct(Amp\Sql\Result $result) {
$this->result = $result;
}
public function getIterator(): \Traversable {
foreach ($this->result->fetchAll() as $row) {
yield new User($row['id'], $row['name']);
}
}
}
DSN Format
Use Amp-compatible DSN strings (e.g., mysql:host=localhost;dbname=test).
Avoid PDO-specific options unless wrapped.
Character Encoding Explicitly set encoding in connections:
$db = new Amp\Sql\Connection('mysql:host=localhost', [
'charset' => 'utf8mb4',
]);
Async Context Ensure all SQL calls run within an Amp event loop:
Amp\run(function () {
$db = new Amp\Sql\Connection('mysql:host=localhost');
$result = $db->execute('SELECT 1');
echo $result->fetchColumn(); // Runs async
});
How can I help you explore Laravel packages today?