flix-tech/confluent-schema-registry-api

A PHP 7.4+ library to consume the Confluent Schema Registry REST API.

8.1.0 2024-01-26 13:22 UTC

README

schema-registry-ci Actions Status Maintainability Test Coverage Latest Stable Version Total Downloads License

A PHP 7.4+ library to consume the Confluent Schema Registry REST API. It provides low level functions to create PSR-7 compliant requests that can be used as well as high level abstractions to ease developer experience.

Contents

Requirements

Hard dependencies

Optional dependencies

Installation

This library is installed via composer.

composer require "flix-tech/confluent-schema-registry-api=^7.4"

NOTE

If you are still running on a version less than 5.0.3 we recommend updating it immediately since there was a critical bug with the exception handling.

Compatibility

This library follows strict semantic versioning, so you can expect any minor and patch release to be compatible, while major version upgrades will have incompatibilities that will be released in the UPGRADE.md file.

Usage

Asynchronous API

Interface declaration

Example PromisingRegistry

<?php

use GuzzleHttp\Client;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException;

$registry = new PromisingRegistry(
    new Client(['base_uri' => 'registry.example.com'])
);

// Register a schema with a subject
$schema = AvroSchema::parse('{"type": "string"}');

// The promise will either contain a schema id as int when fulfilled,
// or a SchemaRegistryException instance when rejected.
// If the subject does not exist, it will be created implicitly
$promise = $registry->register('test-subject', $schema);

// If you want to resolve the promise, you might either get the value or an instance of a SchemaRegistryException
// It is more like an Either Monad, since returning Exceptions from rejection callbacks will throw them.
// We want to leave that decision to the user of the lib.
// TODO: Maybe return an Either Monad instead
$promise = $promise->then(
    static function ($schemaIdOrSchemaRegistryException) {
        if ($schemaIdOrSchemaRegistryException instanceof SchemaRegistryException) {
            throw $schemaIdOrSchemaRegistryException;
        }
        
        return $schemaIdOrSchemaRegistryException;
    }
);

// Resolve the promise
$schemaId = $promise->wait();

// Get a schema by schema id
$promise = $registry->schemaForId($schemaId);
// As above you could add additional callbacks to the promise
$schema = $promise->wait();

// Get the version of a schema for a given subject.
$version = $registry->schemaVersion(
    'test-subject',
    $schema
)->wait();

// You can also get a schema by subject and version
$schema = $registry->schemaForSubjectAndVersion('test-subject', $version)->wait();

// You can additionally just query for the currently latest schema within a subject.
// *NOTE*: Once you requested this it might not be the latest version anymore.
$latestSchema = $registry->latestVersion('test-subject')->wait();

// Sometimes you want to find out the global schema id for a given schema
$schemaId = $registry->schemaId('test-subject', $schema)->wait();

Synchronous API

Interface declaration

Example BlockingRegistry

<?php

use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;

$registry = new BlockingRegistry(
    new PromisingRegistry(
        new Client(['base_uri' => 'registry.example.com'])
    )
);

// What the blocking registry does is actually resolving the promises
// with `wait` and adding a throwing rejection callback.
$schema = AvroSchema::parse('{"type": "string"}');

// This will be an int, and not a promise
$schemaId = $registry->register('test-subject', $schema);

Caching

There is a CachedRegistry that accepts a CacheAdapter together with a Registry. It supports both async and sync APIs.

NOTE:

From version 4.x of this library the API for the CacheAdapterInterface has been changed in order to allow caching of schema ids by hash of a given schema.

Example

<?php

use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\Cache\DoctrineCacheAdapter;
use Doctrine\Common\Cache\ArrayCache;
use GuzzleHttp\Client;

$asyncApi = new PromisingRegistry(
    new Client(['base_uri' => 'registry.example.com'])
);

$syncApi = new BlockingRegistry($asyncApi);

$doctrineCachedSyncApi = new CachedRegistry(
    $asyncApi,
    new DoctrineCacheAdapter(
        new ArrayCache()
    )
);

// All adapters support both APIs, for async APIs additional fulfillment callbacks will be registered.
$avroObjectCachedAsyncApi = new CachedRegistry(
    $syncApi,
    new AvroObjectCacheAdapter()
);

// NEW in version 4.x, passing in custom hash functions to cache schema ids via the schema hash
// By default the following function is used internally
$defaultHashFunction = static function (AvroSchema $schema) {
   return md5((string) $schema); 
};

// You can also define your own hash callable
$sha1HashFunction = static function (AvroSchema $schema) {
   return sha1((string) $schema); 
};

// Pass the hash function as optional 3rd parameter to the CachedRegistry constructor
$avroObjectCachedAsyncApi = new CachedRegistry(
    $syncApi,
    new AvroObjectCacheAdapter(),
    $sha1HashFunction
);

Low Level API

There is a low-level API that provides simple functions that return PSR-7 request objects for the different endpoints of the registry. See Requests/Functions for more information.

There are also requests to use the new DELETE API of the schema registry.

Testing

This library uses a Makefile to run the test suite and requires docker.

You can set the default variables by copying variables.mk.dist to variables.mk and change them to your liking.

Build the local docker image

PHP_VERSION=7.3 XDEBUG_VERSION=2.9.8 make docker

Unit tests, Coding standards and static analysis

PHP_VERSION=7.3 make ci-local

Integration tests

This library uses a docker-compose configuration to fire up a schema registry for integration testing, hence docker-compose from version 1.18.x is required to run those tests.

The platform can be controlled with the following environment variables
CONFLUENT_VERSION=latest
CONFLUENT_NETWORK_SUBNET=172.68.0.0/24
SCHEMA_REGISTRY_IPV4=172.68.0.103
KAFKA_BROKER_IPV4=172.68.0.102
ZOOKEEPER_IPV4=172.68.0.101
Building the confluent platform with a specific version and run the integration tests
CONFLUENT_VERSION=5.2.3 make platform
make phpunit-integration
make clean

Contributing

In order to contribute to this library, follow this workflow:

  • Fork the repository
  • Create a feature branch
  • Work on the feature
  • Run tests to verify that the tests are passing
  • Open a PR to the upstream
  • Be happy about contributing to open source!