opencck / server
HTTP/WS/SSE non-blocking server based on revolt event loop and amphp-v3
1.0.0
2023-12-30 11:48 UTC
Requires
- amphp/http: v2.1.0
- amphp/http-server: v3.3.0
- amphp/http-server-form-parser: v2.0.0
- amphp/http-server-router: v2.0.0
- amphp/http-server-static-content: v2.0.0
- amphp/log: v2.0.0
- amphp/websocket: v2.0.3
- amphp/websocket-server: v4.0.0
Requires (Dev)
- amphp/http-client: v5.0.1
- amphp/phpunit-util: ^3
- amphp/websocket-client: v2.0.0
- phpunit/phpunit: ^9
This package is auto-updated.
Last update: 2025-03-22 05:33:51 UTC
README
This package provides a non-blocking HTTP/1.1 and HTTP/2 application server (with WebSocket and SSE) written in PHP based on AmpPHP v3 and Revolt Event Loop.
Requirements
- PHP 8.1
- php sockets extension
Installation
This package can be installed as a Composer dependency.
composer require opencck/server
Documentation
Example server
<?php use Amp\Http\Server\Driver\ConnectionLimitingClientFactory; use Amp\Http\Server\Driver\ConnectionLimitingServerSocketFactory; use Amp\Http\Server\Driver\SocketClientFactory; use Amp\Http\Server\SocketHttpServer; use Amp\Http\Server\DefaultErrorHandler; use Amp\Http\Server\RequestHandler\ClosureRequestHandler; use Amp\Http\Server\Request; use Amp\Http\Server\Response; use Amp\Http\Server\Router; use Amp\Http\Server\StaticContent\DocumentRoot; use Amp\Http\Server\FormParser; use Amp\Http\HttpStatus; use Amp\Sync\LocalSemaphore; use Amp\Websocket\Server\Websocket; use Amp\Websocket\Server\WebsocketGateway; use Amp\Websocket\Server\WebsocketClientGateway; use Amp\Websocket\Server\WebsocketClientHandler; use Amp\Websocket\Server\AllowOriginAcceptor; use Amp\Websocket\WebsocketClient; use Amp\Log\ConsoleFormatter; use Amp\Log\StreamHandler; use Monolog\Level; use Monolog\Logger; use Amp\Socket; use Amp\ByteStream\WritableResourceStream; use Revolt\EventLoop; use function Amp\trapSignal; require __DIR__ . '/vendor/autoload.php'; /** * @param int|string $val * @return int */ function return_bytes(int|string $val): int { $val = trim($val); $units = ['g' => 1_073_741_824, 'm' => 1_048_576, 'k' => 1024]; $unit = strtolower($val[strlen($val) - 1]); return intval($val) * $units[$unit]; } // logger $logHandler = new StreamHandler(new WritableResourceStream(STDOUT)); $logHandler->setFormatter(new ConsoleFormatter()); $logHandler->setLevel(Level::Info); $logger = new Logger('server'); $logger->pushHandler($logHandler); // server $serverSocketFactory = new ConnectionLimitingServerSocketFactory(new LocalSemaphore(maxLocks: 1000)); $clientFactory = new ConnectionLimitingClientFactory( clientFactory: new SocketClientFactory($logger), logger: $logger, connectionsPerIpLimit: 10 ); $server = new SocketHttpServer($logger, $serverSocketFactory, $clientFactory); // router $errorHandler = new DefaultErrorHandler(); $router = new Router($server, $logger, $errorHandler); // http route (GET) $router->addRoute( method: 'GET', uri: '/', requestHandler: new ClosureRequestHandler(function (Request $request): Response { return new Response( HttpStatus::OK, ['content-type' => 'text/html; charset=utf-8'], ' <!DOCTYPE html> <html lang="en"> <body> <script> const ws = new WebSocket(`ws://${location.host}/ws`); ws.onopen = function () { console.log("Connected"); } ws.onmessage = function (messageEvent) { console.log(messageEvent.data); } </script> <script> const eventSource = new EventSource("/events"); const eventList = document.createElement("ol"); document.body.appendChild(eventList); eventSource.addEventListener("notification", function (e) { const element = document.createElement("li"); element.textContent = "Message: " + e.data; eventList.appendChild(element); }); </script> </body> </html>' ); }) ); // http route (POST) $router->addRoute( method: 'POST', uri: '/', requestHandler: new ClosureRequestHandler(function (Request $request): Response { $form = FormParser\Form::fromRequest($request, new FormParser\FormParser(return_bytes(ini_get('post_max_size')))); return new Response( HttpStatus::OK, ['content-type' => 'text/html; charset=utf-8'], '<!DOCTYPE html><html lang="en"><body><pre>' . print_r($form->getValues(), true) . '</pre></body></html>' ); }) ); // websocket route $gateway = new WebsocketClientGateway(); $router->addRoute( method: 'GET', uri: '/ws', requestHandler: new Websocket( httpServer: $server, logger: $logger, acceptor: new AllowOriginAcceptor([ 'http://' . ($_ENV['HOST_MACHINE_IP'] ?? '127.0.0.1') . ':8080', 'http://localhost:8080', 'http://[::1]:8080', ]), clientHandler: new class ($gateway) implements WebsocketClientHandler { public function __construct(private readonly WebsocketGateway $gateway = new WebsocketClientGateway()) { } public function handleClient(WebsocketClient $client, Request $request, Response $response): void { $this->gateway->addClient($client); while ($message = $client->receive()) { $this->gateway->broadcastText($message->read()); } } } ) ); // SSE route $router->addRoute( method: 'GET', uri: '/events', requestHandler: new ClosureRequestHandler(function (Request $request): Response { // We stream the response here, one event every 500 ms. return new Response( status: HttpStatus::OK, headers: ['content-type' => 'text/event-stream; charset=utf-8'], body: new \Amp\ByteStream\ReadableIterableStream( (function () { for ($i = 0; $i < 30; $i++) { \Amp\delay(0.5); yield "event: notification\ndata: Event {$i}\n\n"; } })() ) ); }) ); // set static files from directory for fallback $dir = __DIR__ . DIRECTORY_SEPARATOR . 'public'; if (!is_dir($dir)) { mkdir($dir); } $router->setFallback(new DocumentRoot($server, $errorHandler, $dir)); // send micro time every 1 second to all WS clients EventLoop::repeat(1, static function () use ($gateway) { $gateway->broadcastText(microtime(true)); }); // handle event loop errors EventLoop::setErrorHandler(function ($e) use ($logger) { $logger->error($e->getMessage()); }); // start server try { $server->expose(new Socket\InternetAddress('0.0.0.0', 8080)); //$server->expose(new Socket\InternetAddress('[::]', 8080)); $server->start($router, $errorHandler); if (defined('SIGINT') && defined('SIGTERM')) { // Await SIGINT or SIGTERM to be received. $signal = trapSignal([SIGINT, SIGTERM]); $logger->info(\sprintf('Received signal %d, stopping HTTP server', $signal)); $server->stop(); } else { EventLoop::run(); } } catch (Throwable $e) { $logger->error($e->getMessage()); }
Start example server (php)
php server.php
Start example server (docker)
cp .env.example .env
# set HOST_MACHINE_IP env for ws origin handshake
docker-compose up
License
The MIT License (MIT). Please see LICENSE
for more information.