ytake / php-ksql
KSQL is the streaming SQL engine for Apache Kafka. REST Client for php
Fund package maintenance!
ytake
Installs: 203
Dependents: 0
Suggesters: 0
Security: 0
Stars: 18
Watchers: 78
Forks: 2
Open Issues: 0
Requires
- php: ^7.1
- ext-curl: *
- ext-json: *
- fig/http-message-util: ^1.1.4
- guzzlehttp/guzzle: ^6.5.5
Requires (Dev)
- monolog/monolog: ^1.23
- pdepend/pdepend: ^2.5.2
- phploc/phploc: *
- phpmd/phpmd: @stable
- phpunit/phpunit: ^7.1.4
- satooshi/php-coveralls: ^2.0.0
- sebastian/phpcpd: *
- sensiolabs/security-checker: ^4.1.8
README
Apache kafka / Confluent KSQL REST Client for php
What is KSQL
KSQL is the streaming SQL engine for Apache Kafka.
Install
required >= PHP 7.1
$ composer require ytake/php-ksql
Usage
Request Preset
Get Command Status
<?php use Ytake\KsqlClient\RestClient; use Ytake\KsqlClient\Query\CommandStatus; use Ytake\KsqlClient\Computation\CommandId; $client = new RestClient( "http://localhost:8088" ); $result = $client->requestQuery( new CommandStatus(CommandId::fromString('stream/MESSAGE_STREAM/create')) )->result();
Get Statuses
<?php use Ytake\KsqlClient\RestClient; use Ytake\KsqlClient\Query\Status; $client = new RestClient( "http://localhost:8088" ); $result = $client->requestQuery(new Status())->result();
Get KSQL Server Information
<?php use Ytake\KsqlClient\RestClient; use Ytake\KsqlClient\Query\ServerInfo; $client = new RestClient( "http://localhost:8088" ); $result = $client->requestQuery(new ServerInfo())->result();
Query KSQL
<?php use Ytake\KsqlClient\RestClient; use Ytake\KsqlClient\Query\Ksql; $client = new RestClient( "http://localhost:8088" ); $result = $client->requestQuery( new Ksql('DESCRIBE users_original;') )->result();
Client for Stream Response
<?php use Ytake\KsqlClient\StreamClient; use Ytake\KsqlClient\Query\Stream; use Ytake\KsqlClient\StreamConsumable; use Ytake\KsqlClient\Entity\StreamedRow; $client = new StreamClient( "http://localhost:8088" ); $result = $client->requestQuery( new Stream( 'SELECT * FROM testing', new class() implements StreamConsumable { public function __invoke(StreamedRow $row) { // stream response consumer } } ) )->result();