phpdot / mongodb
Resilient MongoDB client with fluent CRUD builders, Document object, exception translation, and query logging.
Requires
- php: >=8.3
- ext-mongodb: ^2.1
- mongodb/mongodb: ^2.1
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.94
- phpstan/phpstan: ^2.0
- phpstan/phpstan-strict-rules: ^2.0
- phpunit/phpunit: ^11.0
README
Resilient MongoDB client for PHP 8.3+. Fluent CRUD builders, typed Document object, auto-reconnect, exception translation, and query logging. Wraps mongodb/mongodb v2.x.
Table of Contents
- Install
- Quick Start
- Architecture
- Connection
- Database
- Collection
- Document
- Cursor
- Filter Builder
- Query Logging
- GridFS
- Exception Handling
- Escape Hatches
- API Reference
- License
Install
composer require phpdot/mongodb
| Requirement | Version |
|---|---|
| PHP | >= 8.3 |
| ext-mongodb | >= 2.1 |
| mongodb/mongodb | >= 2.1 |
Quick Start
use PHPdot\MongoDB\Connection\Connection; use PHPdot\MongoDB\Connection\ConnectionConfig; use PHPdot\MongoDB\Database\Database; $connection = new Connection(new ConnectionConfig( hosts: 'localhost', database: 'myapp', )); $connection->connect(); $db = new Database($connection); $users = $db->collection('users'); // Insert $users->insertOne(['name' => 'Omar', 'email' => 'omar@example.com']); // Find $user = $users->findOne(['email' => 'omar@example.com']); echo $user->name; // 'Omar' // Fluent query $active = $users->find() ->filter(['status' => 'active']) ->sort(['created_at' => -1]) ->limit(10) ->execute(); foreach ($active as $doc) { echo $doc->name; }
Architecture
ConnectionConfig (readonly DTO)
↓
Connection (resilient, exponential backoff, auto-reconnect)
↓
Database (collection access, transactions, admin commands)
↓
Collection (entry point for all operations)
├── find() → FindQuery (fluent) → Cursor of Documents
├── updateOne() → UpdateQuery (fluent) → UpdateResult
├── updateMany() → UpdateQuery (fluent) → UpdateResult
├── deleteOne() → DeleteQuery (fluent) → DeleteResult
├── deleteMany() → DeleteQuery (fluent) → DeleteResult
├── findOne() → Document|null
├── insertOne() → InsertOneResult
├── insertMany() → InsertManyResult
├── aggregate() → Cursor of Documents
├── bulkWrite() → BulkWriteResult
└── ...30+ methods
Every operation pipeline:
Developer call
→ runWithReconnect (retry on connection loss)
→ exception translation (typed errors with context)
→ query logging (operation, filter, duration, slow flag)
→ Document wrapping (BSON → PHP types)
→ return to developer
Package structure:
src/
├── Connection/
│ ├── ConnectionConfig.php # Immutable config DTO (17 parameters)
│ └── Connection.php # Connect, reconnect, ping, backoff
├── Database/
│ └── Database.php # Collection access, transactions, admin
├── Collection/
│ ├── Collection.php # 34 public methods — all CRUD + indexes + aggregation
│ ├── FindQuery.php # Fluent find builder (16 chainable methods)
│ ├── UpdateQuery.php # Fluent update builder (10 chainable methods)
│ └── DeleteQuery.php # Fluent delete builder (7 chainable methods)
├── Document/
│ ├── Document.php # Typed wrapper — BSON→PHP, ArrayAccess, JSON
│ └── Cursor.php # Iterator yielding Documents
├── Filter/
│ └── Filter.php # Fluent filter builder (22 operators)
├── Logging/
│ ├── QueryLogger.php # Ring buffer logger with slow tracking
│ └── QueryLog.php # Immutable log entry DTO
├── GridFS/
│ └── Bucket.php # File storage wrapper
└── Exception/
├── MongoException.php # Base (extends RuntimeException)
├── ConnectionException.php # + getHost()
├── AuthenticationException.php
├── QueryException.php # + getOperation(), getCollection()
├── WriteException.php # + getOperation(), getCollection()
├── DuplicateKeyException.php # + getDuplicateKey()
├── ValidationException.php
├── BulkWriteException.php # + getPartialResult()
└── TimeoutException.php # + getOperation(), getCollection()
Connection
ConnectionConfig
Immutable readonly DTO. All 17 parameters have sensible defaults.
$config = new ConnectionConfig( hosts: 'localhost', // string or list of strings port: 27017, // default port (ignored if host includes port) username: '', // auth username password: '', // auth password database: 'myapp', // default database deployment: 'single', // 'single', 'replicaSet', 'sharded' replicaSet: '', // replica set name timeoutMs: 1000, // connection timeout readPreference: 'primary', // read preference mode writeConcern: 'majority', // string or int readConcern: 'local', // read concern level maxStalenessSeconds: -1, // -1 = no limit tags: [], // tag sets for read preference retryWrites: true, // retryable writes retryReads: true, // retryable reads maxRetries: 3, // reconnection retry attempts options: [], // additional MongoDB URI options );
Multiple hosts:
$config = new ConnectionConfig( hosts: ['mongo1.example.com', 'mongo2.example.com:27018'], database: 'myapp', ); // URI: mongodb://mongo1.example.com:27017,mongo2.example.com:27018
With authentication:
$config = new ConnectionConfig( hosts: 'mongo.example.com', username: 'admin', password: 'p@ss w0rd!', // auto URL-encoded database: 'myapp', ); // URI: mongodb://admin:p%40ss%20w0rd%21@mongo.example.com:27017
Connection Lifecycle
$connection = new Connection($config); $connection->connect(); // connect with exponential backoff retries $connection->isConnected(); // local flag check (no server round-trip) $connection->ping(); // actual server ping $connection->ensureConnected(); // throws ConnectionException if not connected $connection->reconnect(); // close + connect $connection->close(); // disconnect $connection->getClient(); // MongoDB\Client $connection->getDatabase(); // MongoDB\Database (uses config database) $connection->getConfig(); // ConnectionConfig
Topologies
// Single server new ConnectionConfig(hosts: 'localhost', database: 'myapp'); // Replica set new ConnectionConfig( hosts: ['rs1.example.com', 'rs2.example.com', 'rs3.example.com'], deployment: 'replicaSet', replicaSet: 'rs0', readPreference: 'secondaryPreferred', ); // Sharded cluster new ConnectionConfig( hosts: ['mongos1.example.com', 'mongos2.example.com'], deployment: 'sharded', );
Resilience
Every collection operation runs through runWithReconnect:
- Execute the operation
- On
ConnectionExceptionor connection error code: reconnect with exponential backoff (100ms, 200ms, 400ms...), refresh the collection reference, retry once - On other errors: translate to typed exception
Recognized connection error codes: HostUnreachable (6), HostNotFound (7), NetworkTimeout (89), SocketException (9001), NotWritablePrimary (10107), InterruptedAtShutdown (11600), InterruptedDueToReplStateChange (11602), NotPrimaryNoSecondaryOk (13435), NotPrimaryOrSecondary (13436).
Database
Collection Access
$db = new Database($connection); $db = new Database($connection, $logger); // with query logging $users = $db->collection('users'); // Collection instance
Transactions
Transactions require a replica set. The callback receives the Database and a Session.
$result = $db->transaction(function (Database $db, Session $session) { $db->collection('accounts')->updateOne() ->filter(['_id' => $from]) ->update(['$inc' => ['balance' => -100]]) ->session($session) ->execute(); $db->collection('accounts')->updateOne() ->filter(['_id' => $to]) ->update(['$inc' => ['balance' => 100]]) ->session($session) ->execute(); return 'transferred'; }); // $result === 'transferred'
If the callback throws, the transaction is automatically aborted and the exception re-thrown.
// Transaction options $db->transaction($callback, [ 'readConcern' => new ReadConcern('snapshot'), 'writeConcern' => new WriteConcern('majority'), ]);
Database Commands
$result = $db->command(['ping' => 1]); // ['ok' => 1] $result = $db->command(['serverStatus' => 1]);
Collection Management
$db->createCollection('users'); $db->createCollection('validated', [ 'validator' => [ '$jsonSchema' => [ 'bsonType' => 'object', 'required' => ['name', 'email'], ], ], ]); $db->dropCollection('users'); $db->renameCollection('old_name', 'new_name'); $collections = $db->listCollections(); // [['name' => 'users', 'type' => 'collection', 'options' => []], ...] $db->raw(); // MongoDB\Database escape hatch
Collection
Insert
// Single document $result = $users->insertOne(['name' => 'Omar', 'email' => 'omar@example.com']); $result->getInsertedId(); // ObjectId $result->getInsertedCount(); // 1 // Custom ID $users->insertOne(['_id' => new ObjectId(), 'name' => 'Omar']); // Multiple documents $result = $users->insertMany([ ['name' => 'Alice', 'age' => 25], ['name' => 'Bob', 'age' => 30], ['name' => 'Charlie', 'age' => 35], ]); $result->getInsertedCount(); // 3 $result->getInsertedIds(); // [ObjectId, ObjectId, ObjectId]
Find (Quick Access)
// Find one — returns Document or null $user = $users->findOne(['email' => 'omar@example.com']); $user = $users->findOne(); // first document in collection // With options $user = $users->findOne( ['name' => 'Omar'], ['projection' => ['name' => 1, 'email' => 1, '_id' => 0]], );
Find (Fluent Builder)
find() returns a FindQuery with 16 chainable methods. Call execute(), first(), or count() to run.
// Full example $cursor = $users->find() ->filter(['status' => 'active']) // array filter ->projection(['name' => 1, 'email' => 1]) // fields to return ->sort(['created_at' => -1, 'name' => 1]) // sort order ->limit(10) // max documents ->skip(20) // pagination offset ->hint('status_1_created_at_-1') // index hint (string or array) ->collation(['locale' => 'en', 'strength' => 2]) ->maxTimeMS(5000) // timeout ->batchSize(100) // cursor batch size ->allowDiskUse() // large sorts ->comment('admin dashboard query') // profiler comment ->session($session) // transaction session ->option('noCursorTimeout', true) // any additional option ->execute(); // → Cursor of Documents // Iterate results foreach ($cursor as $doc) { echo $doc->name; } // Shortcuts $first = $users->find()->filter(['status' => 'active'])->first(); // Document|null $count = $users->find()->filter(['status' => 'active'])->count(); // int $plan = $users->find()->filter(['status' => 'active'])->explain(); // array // Filter builder callback $users->find() ->where(fn(Filter $f) => $f ->eq('status', 'active') ->gte('age', 18) ->in('tags', ['vip', 'premium']) ->or( Filter::new()->eq('role', 'admin'), Filter::new()->gt('score', 90), ) ) ->sort(['score' => -1]) ->limit(20) ->execute(); // Debug compiled state $query = $users->find()->filter(['status' => 'active'])->sort(['name' => 1]); $query->getFilter(); // ['status' => 'active'] $query->getOptions(); // ['sort' => ['name' => 1]]
Update (Fluent Builder)
updateOne() and updateMany() return an UpdateQuery.
// Update one $result = $users->updateOne() ->filter(['email' => 'omar@example.com']) ->update(['$set' => ['name' => 'Omar H.'], '$inc' => ['logins' => 1]]) ->execute(); $result->getMatchedCount(); // 1 $result->getModifiedCount(); // 1 // Update with where callback $users->updateOne() ->where(fn(Filter $f) => $f->eq('status', 'inactive')->lt('last_login', $threshold)) ->update(['$set' => ['archived' => true]]) ->execute(); // Upsert (insert if not found) $users->updateOne() ->filter(['email' => 'new@example.com']) ->update(['$set' => ['name' => 'New User']]) ->upsert() ->execute(); // Update many $result = $users->updateMany() ->filter(['status' => 'trial']) ->update(['$set' => ['status' => 'expired']]) ->execute(); // Array filters for positional operators $users->updateOne() ->filter(['_id' => $id]) ->update(['$set' => ['items.$[elem].qty' => 0]]) ->arrayFilters([['elem.status' => 'inactive']]) ->execute(); // Options $users->updateOne() ->filter(['_id' => $id]) ->update(['$set' => ['name' => 'Omar']]) ->hint('email_1') ->collation(['locale' => 'en']) ->session($session) ->option('bypassDocumentValidation', true) ->execute(); // Debug $query = $users->updateOne()->filter(['x' => 1])->update(['$set' => ['y' => 2]])->upsert(); $query->getFilter(); // ['x' => 1] $query->getUpdate(); // ['$set' => ['y' => 2]] $query->getOptions(); // ['upsert' => true]
Delete (Fluent Builder)
deleteOne() and deleteMany() return a DeleteQuery.
// Delete one $result = $users->deleteOne() ->filter(['email' => 'omar@example.com']) ->execute(); $result->getDeletedCount(); // 1 // Delete with where callback $sessions->deleteMany() ->where(fn(Filter $f) => $f->lt('expires_at', new UTCDateTime())) ->execute(); // Options $users->deleteOne() ->filter(['_id' => $id]) ->hint('email_1') ->collation(['locale' => 'en']) ->session($session) ->option('comment', 'cleanup') ->execute();
Replace
Replace the entire document (not a partial update).
$result = $users->replaceOne( ['email' => 'omar@example.com'], // filter ['name' => 'Omar', 'email' => 'omar@example.com', 'v' => 2], // replacement ); $result->getMatchedCount(); // 1 $result->getModifiedCount(); // 1
Atomic Find-and-Modify
Find a document and atomically modify it. Returns the document before modification (by default).
// Find and update $doc = $users->findOneAndUpdate( ['email' => 'omar@example.com'], ['$set' => ['last_login' => new UTCDateTime()]], ); // $doc is the document BEFORE the update (or null if not found) // Return the document AFTER update $doc = $users->findOneAndUpdate( ['email' => 'omar@example.com'], ['$inc' => ['visits' => 1]], ['returnDocument' => \MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER], ); // Find and replace $doc = $users->findOneAndReplace( ['email' => 'omar@example.com'], ['name' => 'Omar', 'email' => 'omar@example.com'], ); // Find and delete $doc = $users->findOneAndDelete(['status' => 'expired']); // $doc is the deleted document (or null)
Bulk Write
Execute multiple write operations in a single command.
$result = $users->bulkWrite([ ['insertOne' => [['name' => 'New', 'age' => 20]]], ['updateOne' => [['name' => 'Existing'], ['$set' => ['active' => true]]]], ['updateMany' => [['status' => 'trial'], ['$set' => ['status' => 'expired']]]], ['replaceOne' => [['name' => 'Old'], ['name' => 'Replaced']]], ['deleteOne' => [['name' => 'Remove']]], ['deleteMany' => [['status' => 'deleted']]], ]); $result->getInsertedCount(); $result->getMatchedCount(); $result->getModifiedCount(); $result->getDeletedCount(); $result->getUpsertedCount();
Count and Distinct
// Count with filter $total = $users->countDocuments(); $active = $users->countDocuments(['status' => 'active']); // Estimated count (fast, metadata-based — no filter) $approx = $users->estimatedDocumentCount(); // Distinct values $statuses = $users->distinct('status'); // ['active', 'inactive', 'suspended'] $names = $users->distinct('name', ['status' => 'active']); // ['Alice', 'Bob', 'Omar']
Aggregation
PHPdot delegates aggregation building to mongodb/mongodb's Pipeline builder (46 stages, 37 accumulators, 190 expressions). PHPdot wraps execution with resilience and Document wrapping.
// Raw pipeline arrays $results = $orders->aggregate([ ['$match' => ['status' => 'completed']], ['$group' => [ '_id' => '$category', 'total' => ['$sum' => '$amount'], 'count' => ['$sum' => 1], ]], ['$sort' => ['total' => -1]], ['$limit' => 10], ]); foreach ($results as $doc) { echo $doc->_id; // category name echo $doc->total; // aggregated sum } // mongodb/mongodb's type-safe builder use MongoDB\Builder\Stage; use MongoDB\Builder\Accumulator; $results = $orders->aggregate( Stage::match(status: 'completed') ->lookup(from: 'customers', localField: 'customer_id', foreignField: '_id', as: 'customer') ->unwind('$customer') ->group( _id: '$customer_id', totalSpent: Accumulator::sum('$amount'), orderCount: Accumulator::count(), lastOrder: Accumulator::max('$created_at'), ) ->sort(totalSpent: -1) ->limit(10) ->getPipeline() );
Index Management
// Single field index $users->createIndex(['email' => 1]); // 'email_1' // Compound index $users->createIndex(['status' => 1, 'created_at' => -1]); // 'status_1_created_at_-1' // Unique index $users->createIndex(['email' => 1], ['unique' => true]); // Named index $users->createIndex(['email' => 1], ['name' => 'custom_idx']); // 'custom_idx' // Text index $users->createIndex(['content' => 'text']); // TTL index (auto-expire documents) $users->createIndex(['expires_at' => 1], ['expireAfterSeconds' => 3600]); // Sparse index $users->createIndex(['optional_field' => 1], ['sparse' => true]); // Multiple indexes at once $names = $users->createIndexes([ ['key' => ['name' => 1]], ['key' => ['age' => -1]], ['key' => ['status' => 1, 'name' => 1]], ]); // ['name_1', 'age_-1', 'status_1_name_1'] // List all indexes $indexes = $users->listIndexes(); // Drop indexes $users->dropIndex('email_1'); $users->dropIndexes(); // drops all except _id
Explain
Analyze query execution plans.
// Explain find $plan = $users->explain(['status' => 'active']); $plan = $users->explain(); // empty filter // Via builder $plan = $users->find() ->filter(['status' => 'active']) ->sort(['created_at' => -1]) ->explain(); // Explain aggregation $plan = $users->explainAggregate([ ['$match' => ['status' => 'completed']], ['$group' => ['_id' => '$category', 'total' => ['$sum' => '$amount']]], ]);
Change Streams
Watch for real-time changes on a collection (requires replica set).
$stream = $users->watch( [['$match' => ['operationType' => 'insert']]], // pipeline filter ['maxAwaitTimeMS' => 1000], // options ); foreach ($stream as $event) { echo $event->operationType; echo $event->fullDocument->name; }
Document
Every document returned from queries is a Document instance. Immutable, with automatic BSON-to-PHP type conversion on access.
Field Access
$doc = $users->findOne(['email' => 'omar@example.com']); // Property access (with type conversion) $doc->name; // 'Omar' $doc->age; // 30 $doc->active; // true $doc->missing; // null (missing fields return null) // Nested documents become Document instances $doc->address->city; // 'Amman' $doc->address->geo->lat; // 31.95 // Arrays stay as arrays $doc->tags; // ['php', 'mongodb'] $doc->scores; // [95, 87, 92] // Dates become DateTimeImmutable $doc->created_at; // DateTimeImmutable $doc->created_at->format('Y-m-d'); // '2026-04-04' // ObjectId $doc->id(); // ObjectId instance echo $doc->_id; // '507f...' via __toString // Existence checks $doc->has('email'); // true $doc->has('missing'); // false isset($doc->email); // true // Default values $doc->get('role', 'user'); // 'user' if role is missing $doc->get('name', 'Anonymous'); // 'Omar' (field exists)
Type Conversions
Automatic on __get() access:
| BSON Type | PHP Type | Notes |
|---|---|---|
| UTCDateTime | DateTimeImmutable | UTC timezone |
| Nested document (associative array) | Document | Recursive — $doc->address->city |
| Array (list) | PHP array | Native foreach, count |
| Int64 | int | Native arithmetic |
| Binary | string | Via getData() |
| ObjectId | ObjectId (unchanged) | Has __toString |
| Decimal128 | Decimal128 (unchanged) | Has __toString |
| string, int, float, bool, null | unchanged | PHP natives |
toArray() converts everything to plain PHP (ObjectId and Decimal128 become strings).
Conversion Methods
// Plain PHP array — no Document objects, no BSON types $array = $doc->toArray(); $array['address']['city']; // 'Amman' (plain array, not Document) $array['_id']; // '507f...' (string, not ObjectId) // JSON string $json = $doc->toJson(); $json = $doc->toJson(JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE); // Raw data — no conversion, original BSON types preserved $raw = $doc->getRaw(); $raw['_id']; // ObjectId instance
ArrayAccess and JsonSerializable
// ArrayAccess (read-only) $doc['name']; // 'Omar' isset($doc['name']); // true $doc['name'] = 'x'; // throws LogicException ('Document is immutable') unset($doc['name']); // throws LogicException // JsonSerializable json_encode($doc); // '{"_id":"507f...","name":"Omar",...}'
Cursor
Cursor wraps MongoDB's cursor and yields Document instances. Implements IteratorAggregate.
$cursor = $users->find()->filter(['status' => 'active'])->execute(); // Iterate foreach ($cursor as $index => $doc) { echo "$index: {$doc->name}"; } // Collect all $docs = $cursor->toArray(); // list<Document> // First document $first = $cursor->first(); // Document|null // Lazy generator $gen = $cursor->lazy(); foreach ($gen as $doc) { ... } // Count (consumes the cursor) $count = $cursor->count(); // int
Filter Builder
Build MongoDB query filters with a fluent API. All methods return self for chaining.
use PHPdot\MongoDB\Filter\Filter; $filter = Filter::new() ->eq('status', 'active') ->gte('age', 18) ->toArray(); // ['status' => 'active', 'age' => ['$gte' => 18]]
Use with builders via where():
$users->find()->where(fn(Filter $f) => $f->eq('status', 'active')->gte('age', 18))->execute(); $users->updateOne()->where(fn(Filter $f) => $f->eq('name', 'Omar'))->update([...])->execute(); $users->deleteMany()->where(fn(Filter $f) => $f->lt('expires_at', $now))->execute();
Comparison Operators
$f->eq('status', 'active'); // status = 'active' $f->ne('status', 'deleted'); // status != 'deleted' $f->gt('age', 18); // age > 18 $f->gte('age', 18); // age >= 18 $f->lt('age', 65); // age < 65 $f->lte('age', 65); // age <= 65 // Range — operators stack on the same field $f->gte('age', 18)->lte('age', 65); // ['age' => ['$gte' => 18, '$lte' => 65]]
Array Operators
$f->in('status', ['active', 'pending']); // status in [...] $f->nin('role', ['banned', 'suspended']); // status not in [...] $f->all('tags', ['php', 'mongodb']); // array contains all $f->size('tags', 3); // array has exactly 3 elements $f->elemMatch('scores', ['$gte' => 80, '$lt' => 100]); // array element matches
Logical Operators
$f->or( Filter::new()->eq('role', 'admin'), Filter::new()->gt('score', 90), ); $f->and( Filter::new()->eq('status', 'active'), Filter::new()->gte('age', 18), ); $f->nor( Filter::new()->eq('status', 'banned'), Filter::new()->eq('role', 'bot'), ); $f->not('age', ['$gt' => 100]);
Element Operators
$f->exists('email'); // field exists $f->exists('deletedAt', false); // field does not exist $f->type('age', 'int'); // field is of BSON type
String Operators
$f->regex('name', '^Omar', 'i'); // regex with flags $f->regex('email', '@example\.com$'); $f->text('mongodb php'); // full-text search (requires text index) $f->text('mongodb', ['$language' => 'en', '$caseSensitive' => true]);
Geospatial Operators
$f->near('location', [35.9, 31.9], maxDistance: 1000.0); $f->near('location', [35.9, 31.9], maxDistance: 5000.0, minDistance: 100.0);
Raw Filters
$f->raw(['$where' => 'this.age > 18']); $f->raw(['age' => ['$mod' => [5, 0]]]);
Query Logging
Ring buffer logger with slow query tracking. Shared across all collections via Database.
use PHPdot\MongoDB\Logging\QueryLogger; $logger = new QueryLogger( maxEntries: 100, // ring buffer size (overwrites oldest when full) slowThresholdMs: 50.0, // threshold for slow query flag ); $db = new Database($connection, $logger); // Use normally — all operations are logged $db->collection('users')->findOne(['status' => 'active']); $db->collection('users')->insertOne(['name' => 'Test']); // Query the log $logger->getAll(); // list<QueryLog> $logger->getSlow(); // list<QueryLog> (only slow queries) $logger->count(); // int $logger->getSlowThreshold(); // float $logger->clear(); // reset // Each entry (readonly) foreach ($logger->getAll() as $log) { $log->operation; // 'findOne', 'insertOne', 'aggregate', etc. $log->collection; // 'users' $log->filter; // ['status' => 'active'] $log->durationMs; // 2.34 $log->slow; // false }
Logged operations: findOne, find, insertOne, insertMany, updateOne, updateMany, deleteOne, deleteMany, replaceOne, countDocuments, estimatedDocumentCount, distinct, findOneAndUpdate, findOneAndReplace, findOneAndDelete, bulkWrite, aggregate.
GridFS
Store and retrieve large files. Wraps MongoDB's GridFS with a clean API.
use PHPdot\MongoDB\GridFS\Bucket; $bucket = new Bucket($connection); // default 'fs' bucket $bucket = new Bucket($connection, 'uploads'); // custom bucket name $bucket = new Bucket($connection, 'uploads', [ // with options 'chunkSizeBytes' => 1048576, ]); // Upload from stream $source = fopen('/path/to/file.pdf', 'r'); $id = $bucket->uploadFromStream('document.pdf', $source, [ 'metadata' => ['author' => 'Omar', 'type' => 'invoice'], ]); fclose($source); // Download to stream $dest = fopen('/tmp/download.pdf', 'w'); $bucket->downloadToStream($id, $dest); fclose($dest); // Open streams directly $readStream = $bucket->openDownloadStream($id); $content = stream_get_contents($readStream); fclose($readStream); $writeStream = $bucket->openUploadStream('output.txt'); fwrite($writeStream, 'Hello, GridFS!'); fclose($writeStream); // Find files $files = $bucket->find(); // all files $files = $bucket->find(['filename' => 'document.pdf']); // with filter // Manage $bucket->rename($id, 'renamed.pdf'); $bucket->delete($id); $bucket->drop(); // drop entire bucket (files + chunks) $bucket->raw(); // MongoDB\GridFS\Bucket escape hatch
Exception Handling
Exception Hierarchy
MongoException (extends RuntimeException)
├── ConnectionException — connection lost after retry
│ └── getHost(): string
├── AuthenticationException — authentication failure
├── QueryException — read operation failure
│ ├── getOperation(): string
│ └── getCollection(): string
├── WriteException — write operation failure
│ ├── getOperation(): string
│ ├── getCollection(): string
│ ├── DuplicateKeyException — unique constraint violation (11000/11001)
│ │ └── getDuplicateKey(): string
│ ├── ValidationException — server-side schema validation (121)
│ └── BulkWriteException — bulk write partial/total failure
│ ├── getPartialResult(): ?BulkWriteResult
│ └── setPartialResult(BulkWriteResult): void
└── TimeoutException — execution timeout (50)
├── getOperation(): string
└── getCollection(): string
All exceptions carry the original MongoDB driver exception as getPrevious().
Exception Translation
| MongoDB Error | PHPdot Exception | Error Code |
|---|---|---|
| Duplicate key violation | DuplicateKeyException |
11000, 11001 |
| Document validation failure | ValidationException |
121 |
| Execution timeout | TimeoutException |
50 |
| Connection lost after retry | ConnectionException |
6, 7, 89, 9001, ... |
| Authentication failure | AuthenticationException |
varies |
| Other write errors | WriteException |
varies |
| Other read errors | QueryException |
varies |
| Bulk write failure | BulkWriteException |
varies |
Catching Exceptions
use PHPdot\MongoDB\Exception\DuplicateKeyException; use PHPdot\MongoDB\Exception\ValidationException; use PHPdot\MongoDB\Exception\TimeoutException; use PHPdot\MongoDB\Exception\WriteException; use PHPdot\MongoDB\Exception\QueryException; use PHPdot\MongoDB\Exception\ConnectionException; use PHPdot\MongoDB\Exception\MongoException; try { $users->insertOne(['email' => 'omar@example.com']); } catch (DuplicateKeyException $e) { // Unique index violation $e->getCollection(); // 'users' $e->getDuplicateKey(); // 'email_1' $e->getCode(); // 11000 $e->getPrevious(); // original MongoDB\Driver\Exception\RuntimeException } catch (ValidationException $e) { // Server-side schema validation failed $e->getCollection(); // 'users' $e->getCode(); // 121 } catch (TimeoutException $e) { // Operation exceeded maxTimeMS $e->getOperation(); // 'insertOne' $e->getCollection(); // 'users' } catch (WriteException $e) { // Any other write error $e->getOperation(); $e->getCollection(); } catch (QueryException $e) { // Read operation error $e->getOperation(); $e->getCollection(); } catch (ConnectionException $e) { // Connection lost even after retry $e->getHost(); } catch (MongoException $e) { // Catch-all for any PHPdot MongoDB exception }
Escape Hatches
Every wrapper exposes the underlying mongodb/mongodb object for edge cases:
$users->raw(); // MongoDB\Collection $db->raw(); // MongoDB\Database $connection->getClient(); // MongoDB\Client $bucket->raw(); // MongoDB\GridFS\Bucket
API Reference
ConnectionConfig API
final readonly class ConnectionConfig
__construct(hosts, port, username, password, database, deployment, replicaSet,
timeoutMs, readPreference, writeConcern, readConcern,
maxStalenessSeconds, tags, retryWrites, retryReads, maxRetries, options)
buildUri(): string
buildUriOptions(): array<string, mixed>
getHostString(): string
Connection API
final class Connection
__construct(ConnectionConfig $config)
connect(): void throws ConnectionException, AuthenticationException
close(): void
isConnected(): bool
ping(): bool
ensureConnected(): void throws ConnectionException
reconnect(): void throws ConnectionException
getClient(): MongoDB\Client throws ConnectionException
getDatabase(): MongoDB\Database throws ConnectionException
getConfig(): ConnectionConfig
Database API
final class Database
__construct(Connection $connection, ?QueryLogger $logger = null)
collection(string $name): Collection
transaction(callable $callback, array $options = []): mixed
command(array $command): array<string, mixed>
createCollection(string $name, array $options = []): void
dropCollection(string $name): void
listCollections(array $options = []): list<array<string, mixed>>
renameCollection(string $from, string $to): void
raw(): MongoDB\Database
Collection API
final class Collection
__construct(MongoDB\Collection, Connection, ?QueryLogger)
Fluent Builders:
find(): FindQuery
updateOne(): UpdateQuery
updateMany(): UpdateQuery
deleteOne(): DeleteQuery
deleteMany(): DeleteQuery
Quick Access:
findOne(array $filter = [], array $options = []): ?Document
insertOne(array $document, array $options = []): InsertOneResult
insertMany(array $documents, array $options = []): InsertManyResult
replaceOne(array $filter, array $replacement, array $options = []): UpdateResult
countDocuments(array $filter = [], array $options = []): int
estimatedDocumentCount(array $options = []): int
distinct(string $fieldName, array $filter = [], array $options = []): list<mixed>
findOneAndUpdate(array $filter, array $update, array $options = []): ?Document
findOneAndReplace(array $filter, array $replacement, array $options = []): ?Document
findOneAndDelete(array $filter, array $options = []): ?Document
bulkWrite(array $operations, array $options = []): BulkWriteResult
Aggregation:
aggregate(Pipeline|array $pipeline, array $options = []): Cursor
Indexes:
createIndex(array $keys, array $options = []): string
createIndexes(array $indexes): list<string>
dropIndex(string $name): void
dropIndexes(): void
listIndexes(): list<array<string, mixed>>
Change Streams:
watch(array $pipeline = [], array $options = []): ChangeStream
Explain:
explain(array $filter = [], array $options = []): array<string, mixed>
explainAggregate(Pipeline|array $pipeline, array $options = []): array<string, mixed>
Utilities:
filter(): Filter
raw(): MongoDB\Collection
getName(): string
getNamespace(): string
Builder Internals (used by FindQuery, UpdateQuery, DeleteQuery):
executeFindQuery(array $filter, array $options): Cursor
executeCountQuery(array $filter, array $options): int
executeFindExplain(array $filter, array $options): array
executeUpdateQuery(array $filter, array $update, array $options, bool $many): UpdateResult
executeDeleteQuery(array $filter, array $options, bool $many): DeleteResult
FindQuery API
final class FindQuery
filter(array $filter): self
where(callable(Filter): Filter $callback): self
projection(array $fields): self
sort(array $sort): self
limit(int $limit): self
skip(int $skip): self
hint(string|array $hint): self
collation(array $collation): self
maxTimeMS(int $ms): self
batchSize(int $size): self
allowDiskUse(bool $allow = true): self
comment(string $comment): self
session(Session $session): self
option(string $key, mixed $value): self
execute(): Cursor
first(): ?Document
count(): int
explain(): array
getFilter(): array
getOptions(): array
UpdateQuery API
final class UpdateQuery
filter(array $filter): self
where(callable(Filter): Filter $callback): self
update(array $update): self
upsert(bool $upsert = true): self
arrayFilters(array $filters): self
hint(string|array $hint): self
collation(array $collation): self
session(Session $session): self
option(string $key, mixed $value): self
execute(): UpdateResult
getFilter(): array
getUpdate(): array
getOptions(): array
DeleteQuery API
final class DeleteQuery
filter(array $filter): self
where(callable(Filter): Filter $callback): self
hint(string|array $hint): self
collation(array $collation): self
session(Session $session): self
option(string $key, mixed $value): self
execute(): DeleteResult
getFilter(): array
getOptions(): array
Document API
final class Document implements ArrayAccess<string, mixed>, JsonSerializable
__construct(array<string, mixed> $data)
static fromBSON(object|array $source): self
id(): ?ObjectId
__get(string $name): mixed
__isset(string $name): bool
has(string $key): bool
get(string $key, mixed $default = null): mixed
toArray(): array<string, mixed>
toJson(int $flags = 0): string
getRaw(): array<string, mixed>
offsetExists(mixed $offset): bool
offsetGet(mixed $offset): mixed
offsetSet(mixed $offset, mixed $value): void throws LogicException
offsetUnset(mixed $offset): void throws LogicException
jsonSerialize(): array<string, mixed>
Cursor API
final class Cursor implements IteratorAggregate<int, Document>
__construct(CursorInterface $cursor)
getIterator(): Generator<int, Document>
toArray(): list<Document>
first(): ?Document
lazy(): Generator<int, Document>
count(): int
Filter API
final class Filter
static new(): self
Comparison: eq, ne, gt, gte, lt, lte
Array: in, nin, all, size, elemMatch
Logical: or, and, nor, not
Element: exists, type
String: regex, text
Geospatial: near
Raw: raw
Compile: toArray(): array<string, mixed>
QueryLogger API
final class QueryLogger
__construct(int $maxEntries = 100, float $slowThresholdMs = 100.0)
log(string $operation, string $collection, array $filter, float $durationMs): void
getAll(): list<QueryLog>
getSlow(): list<QueryLog>
count(): int
clear(): void
getSlowThreshold(): float
final readonly class QueryLog
string $operation
string $collection
array $filter
float $durationMs
bool $slow
Bucket API
final class Bucket
__construct(Connection $connection, string $bucketName = 'fs', array $options = [])
uploadFromStream(string $filename, mixed $source, array $options = []): ObjectId
downloadToStream(ObjectId $id, mixed $destination): void
openDownloadStream(ObjectId $id): mixed (resource)
openUploadStream(string $filename, array $options = []): mixed (resource)
delete(ObjectId $id): void
find(array $filter = [], array $options = []): CursorInterface
rename(ObjectId $id, string $newFilename): void
drop(): void
raw(): GridFSBucket
Exceptions API
MongoException(string $message, int $code, ?Throwable $previous)
ConnectionException(string $message, string $host, int $code, ?Throwable $previous)
getHost(): string
AuthenticationException — no additional methods
QueryException(string $message, string $operation, string $collection, int $code, ?Throwable $previous)
getOperation(): string
getCollection(): string
WriteException(string $message, string $operation, string $collection, int $code, ?Throwable $previous)
getOperation(): string
getCollection(): string
DuplicateKeyException(string $message, string $collection, string $duplicateKey, int $code, ?Throwable $previous)
getDuplicateKey(): string
getCollection(): string
ValidationException — inherits WriteException
BulkWriteException — inherits WriteException
setPartialResult(BulkWriteResult $result): void
getPartialResult(): ?BulkWriteResult
TimeoutException(string $message, string $operation, string $collection, int $code, ?Throwable $previous)
getOperation(): string
getCollection(): string
License
MIT