phpdot/mongodb

Resilient MongoDB client with fluent CRUD builders, Document object, exception translation, and query logging.

Maintainers

Package info

github.com/phpdot/mongodb

pkg:composer/phpdot/mongodb

Statistics

Installs: 0

Dependents: 0

Suggesters: 0

Stars: 0

Open Issues: 0

v1.0.0 2026-04-04 11:59 UTC

This package is auto-updated.

Last update: 2026-04-04 12:05:48 UTC


README

Resilient MongoDB client for PHP 8.3+. Fluent CRUD builders, typed Document object, auto-reconnect, exception translation, and query logging. Wraps mongodb/mongodb v2.x.

Table of Contents

Install

composer require phpdot/mongodb
Requirement Version
PHP >= 8.3
ext-mongodb >= 2.1
mongodb/mongodb >= 2.1

Quick Start

use PHPdot\MongoDB\Connection\Connection;
use PHPdot\MongoDB\Connection\ConnectionConfig;
use PHPdot\MongoDB\Database\Database;

$connection = new Connection(new ConnectionConfig(
    hosts: 'localhost',
    database: 'myapp',
));
$connection->connect();

$db = new Database($connection);
$users = $db->collection('users');

// Insert
$users->insertOne(['name' => 'Omar', 'email' => 'omar@example.com']);

// Find
$user = $users->findOne(['email' => 'omar@example.com']);
echo $user->name; // 'Omar'

// Fluent query
$active = $users->find()
    ->filter(['status' => 'active'])
    ->sort(['created_at' => -1])
    ->limit(10)
    ->execute();

foreach ($active as $doc) {
    echo $doc->name;
}

Architecture

ConnectionConfig (readonly DTO)
    ↓
Connection (resilient, exponential backoff, auto-reconnect)
    ↓
Database (collection access, transactions, admin commands)
    ↓
Collection (entry point for all operations)
    ├── find()          → FindQuery (fluent) → Cursor of Documents
    ├── updateOne()     → UpdateQuery (fluent) → UpdateResult
    ├── updateMany()    → UpdateQuery (fluent) → UpdateResult
    ├── deleteOne()     → DeleteQuery (fluent) → DeleteResult
    ├── deleteMany()    → DeleteQuery (fluent) → DeleteResult
    ├── findOne()       → Document|null
    ├── insertOne()     → InsertOneResult
    ├── insertMany()    → InsertManyResult
    ├── aggregate()     → Cursor of Documents
    ├── bulkWrite()     → BulkWriteResult
    └── ...30+ methods

Every operation pipeline:
    Developer call
        → runWithReconnect (retry on connection loss)
            → exception translation (typed errors with context)
                → query logging (operation, filter, duration, slow flag)
                    → Document wrapping (BSON → PHP types)
                        → return to developer

Package structure:

src/
├── Connection/
│   ├── ConnectionConfig.php    # Immutable config DTO (17 parameters)
│   └── Connection.php          # Connect, reconnect, ping, backoff
├── Database/
│   └── Database.php            # Collection access, transactions, admin
├── Collection/
│   ├── Collection.php          # 34 public methods — all CRUD + indexes + aggregation
│   ├── FindQuery.php           # Fluent find builder (16 chainable methods)
│   ├── UpdateQuery.php         # Fluent update builder (10 chainable methods)
│   └── DeleteQuery.php         # Fluent delete builder (7 chainable methods)
├── Document/
│   ├── Document.php            # Typed wrapper — BSON→PHP, ArrayAccess, JSON
│   └── Cursor.php              # Iterator yielding Documents
├── Filter/
│   └── Filter.php              # Fluent filter builder (22 operators)
├── Logging/
│   ├── QueryLogger.php         # Ring buffer logger with slow tracking
│   └── QueryLog.php            # Immutable log entry DTO
├── GridFS/
│   └── Bucket.php              # File storage wrapper
└── Exception/
    ├── MongoException.php      # Base (extends RuntimeException)
    ├── ConnectionException.php # + getHost()
    ├── AuthenticationException.php
    ├── QueryException.php      # + getOperation(), getCollection()
    ├── WriteException.php      # + getOperation(), getCollection()
    ├── DuplicateKeyException.php # + getDuplicateKey()
    ├── ValidationException.php
    ├── BulkWriteException.php  # + getPartialResult()
    └── TimeoutException.php    # + getOperation(), getCollection()

Connection

ConnectionConfig

Immutable readonly DTO. All 17 parameters have sensible defaults.

$config = new ConnectionConfig(
    hosts: 'localhost',                    // string or list of strings
    port: 27017,                           // default port (ignored if host includes port)
    username: '',                          // auth username
    password: '',                          // auth password
    database: 'myapp',                     // default database
    deployment: 'single',                  // 'single', 'replicaSet', 'sharded'
    replicaSet: '',                        // replica set name
    timeoutMs: 1000,                       // connection timeout
    readPreference: 'primary',             // read preference mode
    writeConcern: 'majority',              // string or int
    readConcern: 'local',                  // read concern level
    maxStalenessSeconds: -1,               // -1 = no limit
    tags: [],                              // tag sets for read preference
    retryWrites: true,                     // retryable writes
    retryReads: true,                      // retryable reads
    maxRetries: 3,                         // reconnection retry attempts
    options: [],                           // additional MongoDB URI options
);

Multiple hosts:

$config = new ConnectionConfig(
    hosts: ['mongo1.example.com', 'mongo2.example.com:27018'],
    database: 'myapp',
);
// URI: mongodb://mongo1.example.com:27017,mongo2.example.com:27018

With authentication:

$config = new ConnectionConfig(
    hosts: 'mongo.example.com',
    username: 'admin',
    password: 'p@ss w0rd!',   // auto URL-encoded
    database: 'myapp',
);
// URI: mongodb://admin:p%40ss%20w0rd%21@mongo.example.com:27017

Connection Lifecycle

$connection = new Connection($config);

$connection->connect();         // connect with exponential backoff retries
$connection->isConnected();     // local flag check (no server round-trip)
$connection->ping();            // actual server ping
$connection->ensureConnected(); // throws ConnectionException if not connected
$connection->reconnect();       // close + connect
$connection->close();           // disconnect

$connection->getClient();       // MongoDB\Client
$connection->getDatabase();     // MongoDB\Database (uses config database)
$connection->getConfig();       // ConnectionConfig

Topologies

// Single server
new ConnectionConfig(hosts: 'localhost', database: 'myapp');

// Replica set
new ConnectionConfig(
    hosts: ['rs1.example.com', 'rs2.example.com', 'rs3.example.com'],
    deployment: 'replicaSet',
    replicaSet: 'rs0',
    readPreference: 'secondaryPreferred',
);

// Sharded cluster
new ConnectionConfig(
    hosts: ['mongos1.example.com', 'mongos2.example.com'],
    deployment: 'sharded',
);

Resilience

Every collection operation runs through runWithReconnect:

  1. Execute the operation
  2. On ConnectionException or connection error code: reconnect with exponential backoff (100ms, 200ms, 400ms...), refresh the collection reference, retry once
  3. On other errors: translate to typed exception

Recognized connection error codes: HostUnreachable (6), HostNotFound (7), NetworkTimeout (89), SocketException (9001), NotWritablePrimary (10107), InterruptedAtShutdown (11600), InterruptedDueToReplStateChange (11602), NotPrimaryNoSecondaryOk (13435), NotPrimaryOrSecondary (13436).

Database

Collection Access

$db = new Database($connection);
$db = new Database($connection, $logger); // with query logging

$users = $db->collection('users');        // Collection instance

Transactions

Transactions require a replica set. The callback receives the Database and a Session.

$result = $db->transaction(function (Database $db, Session $session) {
    $db->collection('accounts')->updateOne()
        ->filter(['_id' => $from])
        ->update(['$inc' => ['balance' => -100]])
        ->session($session)
        ->execute();

    $db->collection('accounts')->updateOne()
        ->filter(['_id' => $to])
        ->update(['$inc' => ['balance' => 100]])
        ->session($session)
        ->execute();

    return 'transferred';
});
// $result === 'transferred'

If the callback throws, the transaction is automatically aborted and the exception re-thrown.

// Transaction options
$db->transaction($callback, [
    'readConcern' => new ReadConcern('snapshot'),
    'writeConcern' => new WriteConcern('majority'),
]);

Database Commands

$result = $db->command(['ping' => 1]);
// ['ok' => 1]

$result = $db->command(['serverStatus' => 1]);

Collection Management

$db->createCollection('users');
$db->createCollection('validated', [
    'validator' => [
        '$jsonSchema' => [
            'bsonType' => 'object',
            'required' => ['name', 'email'],
        ],
    ],
]);

$db->dropCollection('users');
$db->renameCollection('old_name', 'new_name');

$collections = $db->listCollections();
// [['name' => 'users', 'type' => 'collection', 'options' => []], ...]

$db->raw(); // MongoDB\Database escape hatch

Collection

Insert

// Single document
$result = $users->insertOne(['name' => 'Omar', 'email' => 'omar@example.com']);
$result->getInsertedId();    // ObjectId
$result->getInsertedCount(); // 1

// Custom ID
$users->insertOne(['_id' => new ObjectId(), 'name' => 'Omar']);

// Multiple documents
$result = $users->insertMany([
    ['name' => 'Alice', 'age' => 25],
    ['name' => 'Bob', 'age' => 30],
    ['name' => 'Charlie', 'age' => 35],
]);
$result->getInsertedCount(); // 3
$result->getInsertedIds();   // [ObjectId, ObjectId, ObjectId]

Find (Quick Access)

// Find one — returns Document or null
$user = $users->findOne(['email' => 'omar@example.com']);
$user = $users->findOne(); // first document in collection

// With options
$user = $users->findOne(
    ['name' => 'Omar'],
    ['projection' => ['name' => 1, 'email' => 1, '_id' => 0]],
);

Find (Fluent Builder)

find() returns a FindQuery with 16 chainable methods. Call execute(), first(), or count() to run.

// Full example
$cursor = $users->find()
    ->filter(['status' => 'active'])           // array filter
    ->projection(['name' => 1, 'email' => 1])  // fields to return
    ->sort(['created_at' => -1, 'name' => 1])  // sort order
    ->limit(10)                                 // max documents
    ->skip(20)                                  // pagination offset
    ->hint('status_1_created_at_-1')            // index hint (string or array)
    ->collation(['locale' => 'en', 'strength' => 2])
    ->maxTimeMS(5000)                           // timeout
    ->batchSize(100)                            // cursor batch size
    ->allowDiskUse()                            // large sorts
    ->comment('admin dashboard query')          // profiler comment
    ->session($session)                         // transaction session
    ->option('noCursorTimeout', true)           // any additional option
    ->execute();                                // → Cursor of Documents

// Iterate results
foreach ($cursor as $doc) {
    echo $doc->name;
}

// Shortcuts
$first = $users->find()->filter(['status' => 'active'])->first(); // Document|null
$count = $users->find()->filter(['status' => 'active'])->count(); // int
$plan  = $users->find()->filter(['status' => 'active'])->explain(); // array

// Filter builder callback
$users->find()
    ->where(fn(Filter $f) => $f
        ->eq('status', 'active')
        ->gte('age', 18)
        ->in('tags', ['vip', 'premium'])
        ->or(
            Filter::new()->eq('role', 'admin'),
            Filter::new()->gt('score', 90),
        )
    )
    ->sort(['score' => -1])
    ->limit(20)
    ->execute();

// Debug compiled state
$query = $users->find()->filter(['status' => 'active'])->sort(['name' => 1]);
$query->getFilter();  // ['status' => 'active']
$query->getOptions(); // ['sort' => ['name' => 1]]

Update (Fluent Builder)

updateOne() and updateMany() return an UpdateQuery.

// Update one
$result = $users->updateOne()
    ->filter(['email' => 'omar@example.com'])
    ->update(['$set' => ['name' => 'Omar H.'], '$inc' => ['logins' => 1]])
    ->execute();

$result->getMatchedCount();  // 1
$result->getModifiedCount(); // 1

// Update with where callback
$users->updateOne()
    ->where(fn(Filter $f) => $f->eq('status', 'inactive')->lt('last_login', $threshold))
    ->update(['$set' => ['archived' => true]])
    ->execute();

// Upsert (insert if not found)
$users->updateOne()
    ->filter(['email' => 'new@example.com'])
    ->update(['$set' => ['name' => 'New User']])
    ->upsert()
    ->execute();

// Update many
$result = $users->updateMany()
    ->filter(['status' => 'trial'])
    ->update(['$set' => ['status' => 'expired']])
    ->execute();

// Array filters for positional operators
$users->updateOne()
    ->filter(['_id' => $id])
    ->update(['$set' => ['items.$[elem].qty' => 0]])
    ->arrayFilters([['elem.status' => 'inactive']])
    ->execute();

// Options
$users->updateOne()
    ->filter(['_id' => $id])
    ->update(['$set' => ['name' => 'Omar']])
    ->hint('email_1')
    ->collation(['locale' => 'en'])
    ->session($session)
    ->option('bypassDocumentValidation', true)
    ->execute();

// Debug
$query = $users->updateOne()->filter(['x' => 1])->update(['$set' => ['y' => 2]])->upsert();
$query->getFilter();  // ['x' => 1]
$query->getUpdate();  // ['$set' => ['y' => 2]]
$query->getOptions(); // ['upsert' => true]

Delete (Fluent Builder)

deleteOne() and deleteMany() return a DeleteQuery.

// Delete one
$result = $users->deleteOne()
    ->filter(['email' => 'omar@example.com'])
    ->execute();

$result->getDeletedCount(); // 1

// Delete with where callback
$sessions->deleteMany()
    ->where(fn(Filter $f) => $f->lt('expires_at', new UTCDateTime()))
    ->execute();

// Options
$users->deleteOne()
    ->filter(['_id' => $id])
    ->hint('email_1')
    ->collation(['locale' => 'en'])
    ->session($session)
    ->option('comment', 'cleanup')
    ->execute();

Replace

Replace the entire document (not a partial update).

$result = $users->replaceOne(
    ['email' => 'omar@example.com'],                           // filter
    ['name' => 'Omar', 'email' => 'omar@example.com', 'v' => 2], // replacement
);

$result->getMatchedCount();  // 1
$result->getModifiedCount(); // 1

Atomic Find-and-Modify

Find a document and atomically modify it. Returns the document before modification (by default).

// Find and update
$doc = $users->findOneAndUpdate(
    ['email' => 'omar@example.com'],
    ['$set' => ['last_login' => new UTCDateTime()]],
);
// $doc is the document BEFORE the update (or null if not found)

// Return the document AFTER update
$doc = $users->findOneAndUpdate(
    ['email' => 'omar@example.com'],
    ['$inc' => ['visits' => 1]],
    ['returnDocument' => \MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER],
);

// Find and replace
$doc = $users->findOneAndReplace(
    ['email' => 'omar@example.com'],
    ['name' => 'Omar', 'email' => 'omar@example.com'],
);

// Find and delete
$doc = $users->findOneAndDelete(['status' => 'expired']);
// $doc is the deleted document (or null)

Bulk Write

Execute multiple write operations in a single command.

$result = $users->bulkWrite([
    ['insertOne' => [['name' => 'New', 'age' => 20]]],
    ['updateOne' => [['name' => 'Existing'], ['$set' => ['active' => true]]]],
    ['updateMany' => [['status' => 'trial'], ['$set' => ['status' => 'expired']]]],
    ['replaceOne' => [['name' => 'Old'], ['name' => 'Replaced']]],
    ['deleteOne' => [['name' => 'Remove']]],
    ['deleteMany' => [['status' => 'deleted']]],
]);

$result->getInsertedCount();
$result->getMatchedCount();
$result->getModifiedCount();
$result->getDeletedCount();
$result->getUpsertedCount();

Count and Distinct

// Count with filter
$total = $users->countDocuments();
$active = $users->countDocuments(['status' => 'active']);

// Estimated count (fast, metadata-based — no filter)
$approx = $users->estimatedDocumentCount();

// Distinct values
$statuses = $users->distinct('status');
// ['active', 'inactive', 'suspended']

$names = $users->distinct('name', ['status' => 'active']);
// ['Alice', 'Bob', 'Omar']

Aggregation

PHPdot delegates aggregation building to mongodb/mongodb's Pipeline builder (46 stages, 37 accumulators, 190 expressions). PHPdot wraps execution with resilience and Document wrapping.

// Raw pipeline arrays
$results = $orders->aggregate([
    ['$match' => ['status' => 'completed']],
    ['$group' => [
        '_id' => '$category',
        'total' => ['$sum' => '$amount'],
        'count' => ['$sum' => 1],
    ]],
    ['$sort' => ['total' => -1]],
    ['$limit' => 10],
]);

foreach ($results as $doc) {
    echo $doc->_id;    // category name
    echo $doc->total;  // aggregated sum
}

// mongodb/mongodb's type-safe builder
use MongoDB\Builder\Stage;
use MongoDB\Builder\Accumulator;

$results = $orders->aggregate(
    Stage::match(status: 'completed')
        ->lookup(from: 'customers', localField: 'customer_id', foreignField: '_id', as: 'customer')
        ->unwind('$customer')
        ->group(
            _id: '$customer_id',
            totalSpent: Accumulator::sum('$amount'),
            orderCount: Accumulator::count(),
            lastOrder: Accumulator::max('$created_at'),
        )
        ->sort(totalSpent: -1)
        ->limit(10)
        ->getPipeline()
);

Index Management

// Single field index
$users->createIndex(['email' => 1]);                           // 'email_1'

// Compound index
$users->createIndex(['status' => 1, 'created_at' => -1]);     // 'status_1_created_at_-1'

// Unique index
$users->createIndex(['email' => 1], ['unique' => true]);

// Named index
$users->createIndex(['email' => 1], ['name' => 'custom_idx']); // 'custom_idx'

// Text index
$users->createIndex(['content' => 'text']);

// TTL index (auto-expire documents)
$users->createIndex(['expires_at' => 1], ['expireAfterSeconds' => 3600]);

// Sparse index
$users->createIndex(['optional_field' => 1], ['sparse' => true]);

// Multiple indexes at once
$names = $users->createIndexes([
    ['key' => ['name' => 1]],
    ['key' => ['age' => -1]],
    ['key' => ['status' => 1, 'name' => 1]],
]);
// ['name_1', 'age_-1', 'status_1_name_1']

// List all indexes
$indexes = $users->listIndexes();

// Drop indexes
$users->dropIndex('email_1');
$users->dropIndexes(); // drops all except _id

Explain

Analyze query execution plans.

// Explain find
$plan = $users->explain(['status' => 'active']);
$plan = $users->explain(); // empty filter

// Via builder
$plan = $users->find()
    ->filter(['status' => 'active'])
    ->sort(['created_at' => -1])
    ->explain();

// Explain aggregation
$plan = $users->explainAggregate([
    ['$match' => ['status' => 'completed']],
    ['$group' => ['_id' => '$category', 'total' => ['$sum' => '$amount']]],
]);

Change Streams

Watch for real-time changes on a collection (requires replica set).

$stream = $users->watch(
    [['$match' => ['operationType' => 'insert']]],  // pipeline filter
    ['maxAwaitTimeMS' => 1000],                       // options
);

foreach ($stream as $event) {
    echo $event->operationType;
    echo $event->fullDocument->name;
}

Document

Every document returned from queries is a Document instance. Immutable, with automatic BSON-to-PHP type conversion on access.

Field Access

$doc = $users->findOne(['email' => 'omar@example.com']);

// Property access (with type conversion)
$doc->name;                          // 'Omar'
$doc->age;                           // 30
$doc->active;                        // true
$doc->missing;                       // null (missing fields return null)

// Nested documents become Document instances
$doc->address->city;                 // 'Amman'
$doc->address->geo->lat;             // 31.95

// Arrays stay as arrays
$doc->tags;                          // ['php', 'mongodb']
$doc->scores;                        // [95, 87, 92]

// Dates become DateTimeImmutable
$doc->created_at;                    // DateTimeImmutable
$doc->created_at->format('Y-m-d');   // '2026-04-04'

// ObjectId
$doc->id();                          // ObjectId instance
echo $doc->_id;                      // '507f...' via __toString

// Existence checks
$doc->has('email');                   // true
$doc->has('missing');                 // false
isset($doc->email);                  // true

// Default values
$doc->get('role', 'user');           // 'user' if role is missing
$doc->get('name', 'Anonymous');      // 'Omar' (field exists)

Type Conversions

Automatic on __get() access:

BSON Type PHP Type Notes
UTCDateTime DateTimeImmutable UTC timezone
Nested document (associative array) Document Recursive — $doc->address->city
Array (list) PHP array Native foreach, count
Int64 int Native arithmetic
Binary string Via getData()
ObjectId ObjectId (unchanged) Has __toString
Decimal128 Decimal128 (unchanged) Has __toString
string, int, float, bool, null unchanged PHP natives

toArray() converts everything to plain PHP (ObjectId and Decimal128 become strings).

Conversion Methods

// Plain PHP array — no Document objects, no BSON types
$array = $doc->toArray();
$array['address']['city']; // 'Amman' (plain array, not Document)
$array['_id'];             // '507f...' (string, not ObjectId)

// JSON string
$json = $doc->toJson();
$json = $doc->toJson(JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE);

// Raw data — no conversion, original BSON types preserved
$raw = $doc->getRaw();
$raw['_id']; // ObjectId instance

ArrayAccess and JsonSerializable

// ArrayAccess (read-only)
$doc['name'];           // 'Omar'
isset($doc['name']);    // true
$doc['name'] = 'x';    // throws LogicException ('Document is immutable')
unset($doc['name']);    // throws LogicException

// JsonSerializable
json_encode($doc);      // '{"_id":"507f...","name":"Omar",...}'

Cursor

Cursor wraps MongoDB's cursor and yields Document instances. Implements IteratorAggregate.

$cursor = $users->find()->filter(['status' => 'active'])->execute();

// Iterate
foreach ($cursor as $index => $doc) {
    echo "$index: {$doc->name}";
}

// Collect all
$docs = $cursor->toArray();     // list<Document>

// First document
$first = $cursor->first();     // Document|null

// Lazy generator
$gen = $cursor->lazy();
foreach ($gen as $doc) { ... }

// Count (consumes the cursor)
$count = $cursor->count();     // int

Filter Builder

Build MongoDB query filters with a fluent API. All methods return self for chaining.

use PHPdot\MongoDB\Filter\Filter;

$filter = Filter::new()
    ->eq('status', 'active')
    ->gte('age', 18)
    ->toArray();
// ['status' => 'active', 'age' => ['$gte' => 18]]

Use with builders via where():

$users->find()->where(fn(Filter $f) => $f->eq('status', 'active')->gte('age', 18))->execute();
$users->updateOne()->where(fn(Filter $f) => $f->eq('name', 'Omar'))->update([...])->execute();
$users->deleteMany()->where(fn(Filter $f) => $f->lt('expires_at', $now))->execute();

Comparison Operators

$f->eq('status', 'active');     // status = 'active'
$f->ne('status', 'deleted');    // status != 'deleted'
$f->gt('age', 18);              // age > 18
$f->gte('age', 18);             // age >= 18
$f->lt('age', 65);              // age < 65
$f->lte('age', 65);             // age <= 65

// Range — operators stack on the same field
$f->gte('age', 18)->lte('age', 65);
// ['age' => ['$gte' => 18, '$lte' => 65]]

Array Operators

$f->in('status', ['active', 'pending']);          // status in [...]
$f->nin('role', ['banned', 'suspended']);          // status not in [...]
$f->all('tags', ['php', 'mongodb']);               // array contains all
$f->size('tags', 3);                               // array has exactly 3 elements
$f->elemMatch('scores', ['$gte' => 80, '$lt' => 100]); // array element matches

Logical Operators

$f->or(
    Filter::new()->eq('role', 'admin'),
    Filter::new()->gt('score', 90),
);

$f->and(
    Filter::new()->eq('status', 'active'),
    Filter::new()->gte('age', 18),
);

$f->nor(
    Filter::new()->eq('status', 'banned'),
    Filter::new()->eq('role', 'bot'),
);

$f->not('age', ['$gt' => 100]);

Element Operators

$f->exists('email');              // field exists
$f->exists('deletedAt', false);   // field does not exist
$f->type('age', 'int');           // field is of BSON type

String Operators

$f->regex('name', '^Omar', 'i');  // regex with flags
$f->regex('email', '@example\.com$');

$f->text('mongodb php');          // full-text search (requires text index)
$f->text('mongodb', ['$language' => 'en', '$caseSensitive' => true]);

Geospatial Operators

$f->near('location', [35.9, 31.9], maxDistance: 1000.0);
$f->near('location', [35.9, 31.9], maxDistance: 5000.0, minDistance: 100.0);

Raw Filters

$f->raw(['$where' => 'this.age > 18']);
$f->raw(['age' => ['$mod' => [5, 0]]]);

Query Logging

Ring buffer logger with slow query tracking. Shared across all collections via Database.

use PHPdot\MongoDB\Logging\QueryLogger;

$logger = new QueryLogger(
    maxEntries: 100,        // ring buffer size (overwrites oldest when full)
    slowThresholdMs: 50.0,  // threshold for slow query flag
);

$db = new Database($connection, $logger);

// Use normally — all operations are logged
$db->collection('users')->findOne(['status' => 'active']);
$db->collection('users')->insertOne(['name' => 'Test']);

// Query the log
$logger->getAll();            // list<QueryLog>
$logger->getSlow();           // list<QueryLog> (only slow queries)
$logger->count();             // int
$logger->getSlowThreshold();  // float
$logger->clear();             // reset

// Each entry (readonly)
foreach ($logger->getAll() as $log) {
    $log->operation;   // 'findOne', 'insertOne', 'aggregate', etc.
    $log->collection;  // 'users'
    $log->filter;      // ['status' => 'active']
    $log->durationMs;  // 2.34
    $log->slow;        // false
}

Logged operations: findOne, find, insertOne, insertMany, updateOne, updateMany, deleteOne, deleteMany, replaceOne, countDocuments, estimatedDocumentCount, distinct, findOneAndUpdate, findOneAndReplace, findOneAndDelete, bulkWrite, aggregate.

GridFS

Store and retrieve large files. Wraps MongoDB's GridFS with a clean API.

use PHPdot\MongoDB\GridFS\Bucket;

$bucket = new Bucket($connection);                    // default 'fs' bucket
$bucket = new Bucket($connection, 'uploads');         // custom bucket name
$bucket = new Bucket($connection, 'uploads', [        // with options
    'chunkSizeBytes' => 1048576,
]);

// Upload from stream
$source = fopen('/path/to/file.pdf', 'r');
$id = $bucket->uploadFromStream('document.pdf', $source, [
    'metadata' => ['author' => 'Omar', 'type' => 'invoice'],
]);
fclose($source);

// Download to stream
$dest = fopen('/tmp/download.pdf', 'w');
$bucket->downloadToStream($id, $dest);
fclose($dest);

// Open streams directly
$readStream = $bucket->openDownloadStream($id);
$content = stream_get_contents($readStream);
fclose($readStream);

$writeStream = $bucket->openUploadStream('output.txt');
fwrite($writeStream, 'Hello, GridFS!');
fclose($writeStream);

// Find files
$files = $bucket->find();                               // all files
$files = $bucket->find(['filename' => 'document.pdf']);  // with filter

// Manage
$bucket->rename($id, 'renamed.pdf');
$bucket->delete($id);
$bucket->drop();       // drop entire bucket (files + chunks)

$bucket->raw();        // MongoDB\GridFS\Bucket escape hatch

Exception Handling

Exception Hierarchy

MongoException (extends RuntimeException)
├── ConnectionException          — connection lost after retry
│   └── getHost(): string
├── AuthenticationException      — authentication failure
├── QueryException               — read operation failure
│   ├── getOperation(): string
│   └── getCollection(): string
├── WriteException               — write operation failure
│   ├── getOperation(): string
│   ├── getCollection(): string
│   ├── DuplicateKeyException    — unique constraint violation (11000/11001)
│   │   └── getDuplicateKey(): string
│   ├── ValidationException      — server-side schema validation (121)
│   └── BulkWriteException       — bulk write partial/total failure
│       ├── getPartialResult(): ?BulkWriteResult
│       └── setPartialResult(BulkWriteResult): void
└── TimeoutException             — execution timeout (50)
    ├── getOperation(): string
    └── getCollection(): string

All exceptions carry the original MongoDB driver exception as getPrevious().

Exception Translation

MongoDB Error PHPdot Exception Error Code
Duplicate key violation DuplicateKeyException 11000, 11001
Document validation failure ValidationException 121
Execution timeout TimeoutException 50
Connection lost after retry ConnectionException 6, 7, 89, 9001, ...
Authentication failure AuthenticationException varies
Other write errors WriteException varies
Other read errors QueryException varies
Bulk write failure BulkWriteException varies

Catching Exceptions

use PHPdot\MongoDB\Exception\DuplicateKeyException;
use PHPdot\MongoDB\Exception\ValidationException;
use PHPdot\MongoDB\Exception\TimeoutException;
use PHPdot\MongoDB\Exception\WriteException;
use PHPdot\MongoDB\Exception\QueryException;
use PHPdot\MongoDB\Exception\ConnectionException;
use PHPdot\MongoDB\Exception\MongoException;

try {
    $users->insertOne(['email' => 'omar@example.com']);
} catch (DuplicateKeyException $e) {
    // Unique index violation
    $e->getCollection();   // 'users'
    $e->getDuplicateKey(); // 'email_1'
    $e->getCode();         // 11000
    $e->getPrevious();     // original MongoDB\Driver\Exception\RuntimeException
} catch (ValidationException $e) {
    // Server-side schema validation failed
    $e->getCollection();   // 'users'
    $e->getCode();         // 121
} catch (TimeoutException $e) {
    // Operation exceeded maxTimeMS
    $e->getOperation();    // 'insertOne'
    $e->getCollection();   // 'users'
} catch (WriteException $e) {
    // Any other write error
    $e->getOperation();
    $e->getCollection();
} catch (QueryException $e) {
    // Read operation error
    $e->getOperation();
    $e->getCollection();
} catch (ConnectionException $e) {
    // Connection lost even after retry
    $e->getHost();
} catch (MongoException $e) {
    // Catch-all for any PHPdot MongoDB exception
}

Escape Hatches

Every wrapper exposes the underlying mongodb/mongodb object for edge cases:

$users->raw();            // MongoDB\Collection
$db->raw();               // MongoDB\Database
$connection->getClient(); // MongoDB\Client
$bucket->raw();           // MongoDB\GridFS\Bucket

API Reference

ConnectionConfig API

final readonly class ConnectionConfig

__construct(hosts, port, username, password, database, deployment, replicaSet,
            timeoutMs, readPreference, writeConcern, readConcern,
            maxStalenessSeconds, tags, retryWrites, retryReads, maxRetries, options)
buildUri(): string
buildUriOptions(): array<string, mixed>
getHostString(): string

Connection API

final class Connection

__construct(ConnectionConfig $config)
connect(): void                          throws ConnectionException, AuthenticationException
close(): void
isConnected(): bool
ping(): bool
ensureConnected(): void                  throws ConnectionException
reconnect(): void                        throws ConnectionException
getClient(): MongoDB\Client              throws ConnectionException
getDatabase(): MongoDB\Database          throws ConnectionException
getConfig(): ConnectionConfig

Database API

final class Database

__construct(Connection $connection, ?QueryLogger $logger = null)
collection(string $name): Collection
transaction(callable $callback, array $options = []): mixed
command(array $command): array<string, mixed>
createCollection(string $name, array $options = []): void
dropCollection(string $name): void
listCollections(array $options = []): list<array<string, mixed>>
renameCollection(string $from, string $to): void
raw(): MongoDB\Database

Collection API

final class Collection

__construct(MongoDB\Collection, Connection, ?QueryLogger)

Fluent Builders:
  find(): FindQuery
  updateOne(): UpdateQuery
  updateMany(): UpdateQuery
  deleteOne(): DeleteQuery
  deleteMany(): DeleteQuery

Quick Access:
  findOne(array $filter = [], array $options = []): ?Document
  insertOne(array $document, array $options = []): InsertOneResult
  insertMany(array $documents, array $options = []): InsertManyResult
  replaceOne(array $filter, array $replacement, array $options = []): UpdateResult
  countDocuments(array $filter = [], array $options = []): int
  estimatedDocumentCount(array $options = []): int
  distinct(string $fieldName, array $filter = [], array $options = []): list<mixed>
  findOneAndUpdate(array $filter, array $update, array $options = []): ?Document
  findOneAndReplace(array $filter, array $replacement, array $options = []): ?Document
  findOneAndDelete(array $filter, array $options = []): ?Document
  bulkWrite(array $operations, array $options = []): BulkWriteResult

Aggregation:
  aggregate(Pipeline|array $pipeline, array $options = []): Cursor

Indexes:
  createIndex(array $keys, array $options = []): string
  createIndexes(array $indexes): list<string>
  dropIndex(string $name): void
  dropIndexes(): void
  listIndexes(): list<array<string, mixed>>

Change Streams:
  watch(array $pipeline = [], array $options = []): ChangeStream

Explain:
  explain(array $filter = [], array $options = []): array<string, mixed>
  explainAggregate(Pipeline|array $pipeline, array $options = []): array<string, mixed>

Utilities:
  filter(): Filter
  raw(): MongoDB\Collection
  getName(): string
  getNamespace(): string

Builder Internals (used by FindQuery, UpdateQuery, DeleteQuery):
  executeFindQuery(array $filter, array $options): Cursor
  executeCountQuery(array $filter, array $options): int
  executeFindExplain(array $filter, array $options): array
  executeUpdateQuery(array $filter, array $update, array $options, bool $many): UpdateResult
  executeDeleteQuery(array $filter, array $options, bool $many): DeleteResult

FindQuery API

final class FindQuery

filter(array $filter): self
where(callable(Filter): Filter $callback): self
projection(array $fields): self
sort(array $sort): self
limit(int $limit): self
skip(int $skip): self
hint(string|array $hint): self
collation(array $collation): self
maxTimeMS(int $ms): self
batchSize(int $size): self
allowDiskUse(bool $allow = true): self
comment(string $comment): self
session(Session $session): self
option(string $key, mixed $value): self
execute(): Cursor
first(): ?Document
count(): int
explain(): array
getFilter(): array
getOptions(): array

UpdateQuery API

final class UpdateQuery

filter(array $filter): self
where(callable(Filter): Filter $callback): self
update(array $update): self
upsert(bool $upsert = true): self
arrayFilters(array $filters): self
hint(string|array $hint): self
collation(array $collation): self
session(Session $session): self
option(string $key, mixed $value): self
execute(): UpdateResult
getFilter(): array
getUpdate(): array
getOptions(): array

DeleteQuery API

final class DeleteQuery

filter(array $filter): self
where(callable(Filter): Filter $callback): self
hint(string|array $hint): self
collation(array $collation): self
session(Session $session): self
option(string $key, mixed $value): self
execute(): DeleteResult
getFilter(): array
getOptions(): array

Document API

final class Document implements ArrayAccess<string, mixed>, JsonSerializable

__construct(array<string, mixed> $data)
static fromBSON(object|array $source): self
id(): ?ObjectId
__get(string $name): mixed
__isset(string $name): bool
has(string $key): bool
get(string $key, mixed $default = null): mixed
toArray(): array<string, mixed>
toJson(int $flags = 0): string
getRaw(): array<string, mixed>
offsetExists(mixed $offset): bool
offsetGet(mixed $offset): mixed
offsetSet(mixed $offset, mixed $value): void    throws LogicException
offsetUnset(mixed $offset): void                throws LogicException
jsonSerialize(): array<string, mixed>

Cursor API

final class Cursor implements IteratorAggregate<int, Document>

__construct(CursorInterface $cursor)
getIterator(): Generator<int, Document>
toArray(): list<Document>
first(): ?Document
lazy(): Generator<int, Document>
count(): int

Filter API

final class Filter

static new(): self

Comparison:  eq, ne, gt, gte, lt, lte
Array:       in, nin, all, size, elemMatch
Logical:     or, and, nor, not
Element:     exists, type
String:      regex, text
Geospatial:  near
Raw:         raw
Compile:     toArray(): array<string, mixed>

QueryLogger API

final class QueryLogger

__construct(int $maxEntries = 100, float $slowThresholdMs = 100.0)
log(string $operation, string $collection, array $filter, float $durationMs): void
getAll(): list<QueryLog>
getSlow(): list<QueryLog>
count(): int
clear(): void
getSlowThreshold(): float
final readonly class QueryLog

string $operation
string $collection
array  $filter
float  $durationMs
bool   $slow

Bucket API

final class Bucket

__construct(Connection $connection, string $bucketName = 'fs', array $options = [])
uploadFromStream(string $filename, mixed $source, array $options = []): ObjectId
downloadToStream(ObjectId $id, mixed $destination): void
openDownloadStream(ObjectId $id): mixed (resource)
openUploadStream(string $filename, array $options = []): mixed (resource)
delete(ObjectId $id): void
find(array $filter = [], array $options = []): CursorInterface
rename(ObjectId $id, string $newFilename): void
drop(): void
raw(): GridFSBucket

Exceptions API

MongoException(string $message, int $code, ?Throwable $previous)

ConnectionException(string $message, string $host, int $code, ?Throwable $previous)
  getHost(): string

AuthenticationException — no additional methods

QueryException(string $message, string $operation, string $collection, int $code, ?Throwable $previous)
  getOperation(): string
  getCollection(): string

WriteException(string $message, string $operation, string $collection, int $code, ?Throwable $previous)
  getOperation(): string
  getCollection(): string

DuplicateKeyException(string $message, string $collection, string $duplicateKey, int $code, ?Throwable $previous)
  getDuplicateKey(): string
  getCollection(): string

ValidationException — inherits WriteException

BulkWriteException — inherits WriteException
  setPartialResult(BulkWriteResult $result): void
  getPartialResult(): ?BulkWriteResult

TimeoutException(string $message, string $operation, string $collection, int $code, ?Throwable $previous)
  getOperation(): string
  getCollection(): string

License

MIT