add base client

This commit is contained in:
crazywhalecc 2022-12-26 19:29:47 +08:00
parent 1382114238
commit ef91fb3aeb
10 changed files with 847 additions and 1 deletions

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
namespace Choir\Http\Client;
use Psr\Http\Message\RequestInterface;
interface AsyncClientInterface
{
/**
* 以异步的形式发送 HTTP Request
*
* @param RequestInterface $request 请求对象
* @param callable $success_callback 成功请求的回调
* @param callable $error_callback 失败请求的回调
*/
public function sendRequestAsync(RequestInterface $request, callable $success_callback, callable $error_callback): bool;
}

View File

@ -0,0 +1,129 @@
<?php
/** @noinspection PhpComposerExtensionStubsInspection */
declare(strict_types=1);
namespace Choir\Http\Client;
use Choir\Http\Client\Exception\ClientException;
use Choir\Http\Client\Exception\NetworkException;
use Choir\Http\HttpFactory;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
/**
* Curl HTTP Client based on PSR-18.
* @see https://github.com/sunrise-php/http-client-curl/blob/master/src/Client.php
*/
class CurlClient implements ClientInterface, TimeoutInterface
{
protected array $curl_options;
/**
* @throws ClientException
*/
public function __construct(array $curl_options = [])
{
if (!extension_loaded('curl')) { // 必须安装 Curl 扩展才能使用
throw new ClientException('Curl extension is not loaded');
}
$this->curl_options = $curl_options;
}
/**
* {@inheritDoc}
*/
public function setTimeout(int $timeout)
{
$this->curl_options[CURLOPT_TIMEOUT_MS] = $timeout;
}
/**
* {@inheritDoc}
*/
public function sendRequest(RequestInterface $request): ResponseInterface
{
$handle = $this->createHandle($request);
$success = curl_exec($handle);
if ($success === false) {
throw new NetworkException($request, curl_error($handle), curl_errno($handle));
}
$response = $this->createResponse($handle);
curl_close($handle);
return $response;
}
/**
* @throws ClientException
* @return \CurlHandle|false|resource
*/
private function createHandle(RequestInterface $request) /* @phpstan-ignore-line */
{
$this->curl_options[CURLOPT_RETURNTRANSFER] = true; // 返回的内容作为变量储存,而不是直接输出
$this->curl_options[CURLOPT_HEADER] = true; // 获取结果返回时包含Header数据
$this->curl_options[CURLOPT_CUSTOMREQUEST] = $request->getMethod(); // 设置请求方式
$this->curl_options[CURLOPT_URL] = (string) $request->getUri(); // 设置请求的URL
$this->curl_options[CURLOPT_POSTFIELDS] = (string) $request->getBody(); // 设置请求的Body
$this->curl_options[CURLOPT_SSL_VERIFYHOST] = false; // 取消认证ssl
$this->curl_options[CURLOPT_SSL_VERIFYPEER] = false; // 取消认证ssl
// 设置请求头
foreach ($request->getHeaders() as $name => $values) {
foreach ($values as $value) {
$this->curl_options[CURLOPT_HTTPHEADER][] = sprintf('%s: %s', $name, $value);
}
}
/** @var \CurlHandle|false|resource $curl_handle */
$curl_handle = curl_init(); /* @phpstan-ignore-line */
if ($curl_handle === false) {
throw new ClientException('Unable to initialize a cURL handle');
}
$success = curl_setopt_array($curl_handle, $this->curl_options);
if ($success === false) {
throw new ClientException('Unable to configure a cURL handle');
}
return $curl_handle;
}
/**
* @param \CurlHandle|int|resource $handle
*/
private function createResponse($handle): ResponseInterface /* @phpstan-ignore-line */
{
$status_code = curl_getinfo($handle, CURLINFO_RESPONSE_CODE);
$response = HttpFactory::createResponse($status_code);
/** @var null|string $message */
$message = curl_multi_getcontent($handle); /* @phpstan-ignore-line */
if ($message === null) {
return $response;
}
$header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
$header = substr($message, 0, $header_size);
$fields = explode("\n", $header);
foreach ($fields as $field) {
$colpos = strpos($field, ':');
if ($colpos === false) { // Status Line
continue;
}
if ($colpos === 0) { // HTTP/2 Field
continue;
}
[$name, $value] = explode(':', $field, 2);
$response = $response->withAddedHeader(trim($name), trim($value));
}
$body = substr($message, $header_size);
$response->getBody()->write($body);
return $response;
}
}

View File

@ -0,0 +1,11 @@
<?php
declare(strict_types=1);
namespace Choir\Http\Client\Exception;
use Psr\Http\Client\ClientExceptionInterface;
class ClientException extends \Exception implements ClientExceptionInterface
{
}

View File

@ -0,0 +1,24 @@
<?php
declare(strict_types=1);
namespace Choir\Http\Client\Exception;
use Psr\Http\Client\NetworkExceptionInterface;
use Psr\Http\Message\RequestInterface;
class NetworkException extends \Exception implements NetworkExceptionInterface
{
private RequestInterface $request;
public function __construct(RequestInterface $request, $message = '', $code = 0, \Throwable $previous = null)
{
$this->request = $request;
parent::__construct($message, $code, $previous);
}
public function getRequest(): RequestInterface
{
return $this->request;
}
}

View File

@ -0,0 +1,27 @@
<?php
declare(strict_types=1);
namespace Choir\Http\Client\Exception;
use Psr\Http\Client\RequestExceptionInterface;
use Psr\Http\Message\RequestInterface;
class RequestException extends \Exception implements RequestExceptionInterface
{
private RequestInterface $request;
public function __construct(RequestInterface $request, $message = '', $code = 0, \Throwable $previous = null)
{
$this->request = $request;
parent::__construct($message, $code, $previous);
}
/**
* {@inheritDoc}
*/
public function getRequest(): RequestInterface
{
return $this->request;
}
}

View File

@ -0,0 +1,323 @@
<?php
declare(strict_types=1);
namespace Choir\Http\Client;
use Choir\Http\Client\Exception\NetworkException;
use Choir\Http\Client\Exception\RequestException;
use Choir\Http\Response;
use Choir\Http\Stream;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
/**
* Stream HTTP Client based on PSR-18.
* @see https://github.com/php-http/socket-client
*/
class StreamClient implements TimeoutInterface, ClientInterface
{
private array $config = [
'remote_socket' => null,
'timeout' => 1000, // 单位:毫秒
'stream_context_options' => [],
'stream_context_param' => [],
'ssl' => null,
'write_buffer_size' => 8192,
'ssl_method' => STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT,
'event_loop' => null,
];
/**
* Constructor.
*/
public function __construct(array $config = [])
{
$this->config = array_merge($this->config, $config);
$this->config['stream_context'] = stream_context_create($this->config['stream_context_options'], $this->config['stream_context_param']);
}
public function setTimeout(int $timeout): void
{
$this->config['timeout'] = $timeout;
}
/**
* {@inheritdoc}
* @throws \Exception
*/
public function sendRequest(RequestInterface $request): ResponseInterface
{
$remote = $this->config['remote_socket'];
$useSsl = $this->config['ssl'];
if (!$request->hasHeader('Connection')) {
$request = $request->withHeader('Connection', 'close');
}
if ($remote === null) {
$remote = $this->determineRemoteFromRequest($request);
}
if ($useSsl === null) {
$useSsl = ($request->getUri()->getScheme() === 'https');
}
$socket = $this->createSocket($request, $remote, $useSsl);
stream_set_timeout($socket, (int) floor($this->config['timeout'] / 1000), $this->config['timeout'] % 1000);
try {
$this->writeRequest($socket, $request, $this->config['write_buffer_size']);
$response = $this->readResponse($request, $socket);
} catch (\Exception $e) {
$this->closeSocket($socket);
throw $e;
}
return $response;
}
/**
* Create the socket to write request and read response on it.
*
* @param RequestInterface $request Request for
* @param string $remote Entrypoint for the connection
* @param bool $useSsl Whether to use ssl or not
*
* @throws NetworkException
* @return resource Socket resource
*/
protected function createSocket(RequestInterface $request, string $remote, bool $useSsl)
{
$errNo = null;
$errMsg = null;
$socket = @stream_socket_client($remote, $errNo, $errMsg, floor($this->config['timeout'] / 1000), STREAM_CLIENT_CONNECT, $this->config['stream_context']);
if ($socket === false) {
if ($errNo === 110) {
throw new NetworkException($request, $errMsg);
}
throw new NetworkException($request, $errMsg);
}
if ($useSsl && @stream_socket_enable_crypto($socket, true, $this->config['ssl_method']) === false) {
throw new NetworkException($request, sprintf('Cannot enable tls: %s', error_get_last()['message']));
}
return $socket;
}
/**
* Close the socket, used when having an error.
*
* @param resource $socket
*/
protected function closeSocket($socket): void
{
fclose($socket);
}
/**
* Write a request to a socket.
*
* @param resource $socket
* @throws NetworkException
*/
protected function writeRequest($socket, RequestInterface $request, int $bufferSize = 8192): void
{
if ($this->fwrite($socket, $this->transformRequestHeadersToString($request)) === false) {
throw new NetworkException($request, 'Failed to send request, underlying socket not accessible, (BROKEN EPIPE)');
}
if ($request->getBody()->isReadable()) {
$this->writeBody($socket, $request, $bufferSize);
}
}
/**
* Write Body of the request.
*
* @param resource $socket
* @throws NetworkException
*/
protected function writeBody($socket, RequestInterface $request, int $bufferSize = 8192): void
{
$body = $request->getBody();
if ($body->isSeekable()) {
$body->rewind();
}
while (!$body->eof()) {
$buffer = $body->read($bufferSize);
if ($this->fwrite($socket, $buffer) === false) {
throw new NetworkException($request, 'An error occur when writing request to client (BROKEN EPIPE)');
}
}
}
/**
* Produce the header of request as a string based on a PSR Request.
*/
protected function transformRequestHeadersToString(RequestInterface $request): string
{
$message = vsprintf('%s %s HTTP/%s', [
strtoupper($request->getMethod()),
$request->getRequestTarget(),
$request->getProtocolVersion(),
]) . "\r\n";
foreach ($request->getHeaders() as $name => $values) {
$message .= $name . ': ' . implode(', ', $values) . "\r\n";
}
$message .= "\r\n";
return $message;
}
/**
* Read a response from a socket.
*
* @param resource $socket
*
* @throws NetworkException
*/
protected function readResponse(RequestInterface $request, $socket): ResponseInterface
{
$headers = [];
$reason = null;
while (false !== ($line = fgets($socket))) {
if (rtrim($line) === '') {
break;
}
$headers[] = trim($line);
}
$metadatas = stream_get_meta_data($socket);
if (array_key_exists('timed_out', $metadatas) && $metadatas['timed_out'] === true) {
throw new NetworkException($request, 'Error while reading response, stream timed out');
}
$parts = explode(' ', array_shift($headers), 3);
if (count($parts) <= 1) {
throw new NetworkException($request, 'Cannot read the response');
}
$protocol = substr($parts[0], -3);
$status = $parts[1];
if (isset($parts[2])) {
$reason = $parts[2];
}
// Set the size on the stream if it was returned in the response
$responseHeaders = [];
foreach ($headers as $header) {
$headerParts = explode(':', $header, 2);
if (!array_key_exists(trim($headerParts[0]), $responseHeaders)) {
$responseHeaders[trim($headerParts[0])] = [];
}
$responseHeaders[trim($headerParts[0])][] = isset($headerParts[1])
? trim($headerParts[1])
: '';
}
$response = new Response($status, $responseHeaders, null, $protocol, $reason);
$stream = Stream::create($socket);
return $response->withBody($stream);
}
/**
* Replace fwrite behavior as api is broken in PHP.
*
* @see https://secure.phabricator.com/rPHU69490c53c9c2ef2002bc2dd4cecfe9a4b080b497
*
* @param resource $stream The stream resource
*
* @return bool|int false if pipe is broken, number of bytes written otherwise
*/
private function fwrite($stream, string $bytes)
{
if (empty($bytes)) {
return 0;
}
$result = @fwrite($stream, $bytes);
if ($result !== 0) {
// In cases where some bytes are witten (`$result > 0`) or
// an error occurs (`$result === false`), the behavior of fwrite() is
// correct. We can return the value as-is.
return $result;
}
// If we make it here, we performed a 0-length write. Try to distinguish
// between EAGAIN and EPIPE. To do this, we're going to `stream_select()`
// the stream, write to it again if PHP claims that it's writable, and
// consider the pipe broken if the write fails.
$read = [];
$write = [$stream];
$except = [];
$ss = @stream_select($read, $write, $except, 0);
// 这里做了个修改,原来下面是 !$write但静态分析出来它是永久的false所以改成了 !$ss
if (!$ss) {
// The stream isn't writable, so we conclude that it probably really is
// blocked and the underlying error was EAGAIN. Return 0 to indicate that
// no data could be written yet.
return 0;
}
// If we make it here, PHP **just** claimed that this stream is writable, so
// perform a write. If the write also fails, conclude that these failures are
// EPIPE or some other permanent failure.
$result = @fwrite($stream, $bytes);
if ($result !== 0) {
// The write worked or failed explicitly. This value is fine to return.
return $result;
}
// We performed a 0-length write, were told that the stream was writable, and
// then immediately performed another 0-length write. Conclude that the pipe
// is broken and return `false`.
return false;
}
/**
* Return remote socket from the request.
*
* @throws RequestException
*/
private function determineRemoteFromRequest(RequestInterface $request): string
{
if (!$request->hasHeader('Host') && $request->getUri()->getHost() === '') {
throw new RequestException($request, 'Remote is not defined and we cannot determine a connection endpoint for this request (no Host header)');
}
$endpoint = '';
$host = $request->getUri()->getHost();
if (!empty($host)) {
$endpoint .= $host;
if ($request->getUri()->getPort() !== null) {
$endpoint .= ':' . $request->getUri()->getPort();
} elseif ($request->getUri()->getScheme() === 'https') {
$endpoint .= ':443';
} else {
$endpoint .= ':80';
}
}
// If use the host header if present for the endpoint
if (empty($host) && $request->hasHeader('Host')) {
$endpoint = $request->getHeaderLine('Host');
}
return sprintf('tcp://%s', $endpoint);
}
}

View File

@ -0,0 +1,219 @@
<?php
declare(strict_types=1);
namespace Choir\Http\Client;
use Choir\Http\Client\Exception\ClientException;
use Choir\Http\Client\Exception\NetworkException;
use Choir\Http\HttpFactory;
use Choir\WebSocket\FrameFactory;
use Choir\WebSocket\FrameInterface;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\UriInterface;
use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;
use Swoole\WebSocket\Frame;
/**
* Swoole HTTP Client based on PSR-18.
*/
class SwooleClient implements TimeoutInterface, ClientInterface, AsyncClientInterface, UpgradableClientInterface
{
/** @var null|Client Swoole Coroutine Client 对象 */
protected ?Client $client = null;
/** @var int TCP/WebSocket 连接状态标记 */
protected int $status = CHOIR_TCP_INITIAL;
/** @var null|callable|\Closure onMessage 回调 */
protected $on_message;
/** @var null|callable onClose 回调 */
protected $on_close;
private array $set = [];
/**
* @throws ClientException
*/
public function __construct(array $set = [])
{
if (Coroutine::getCid() === -1) {
throw new ClientException('API must be called in the coroutine');
}
$this->withSwooleSet($set);
}
public function withSwooleSet(array $set = []): SwooleClient
{
if (!empty($set)) {
$this->set = $set;
}
return $this;
}
public function setTimeout(int $timeout)
{
$this->set['timeout'] = $timeout / 1000;
}
public function sendRequest(RequestInterface $request): ResponseInterface
{
$this->client = $client = $this->buildBaseClient($request);
if ($client->errCode !== 0) {
throw new NetworkException($request, $client->errMsg, $client->errCode);
}
return HttpFactory::createResponse($client->statusCode, null, $client->getHeaders(), $client->getBody());
}
public function sendRequestAsync(RequestInterface $request, callable $success_callback, callable $error_callback): bool
{
go(function () use ($request, $success_callback, $error_callback) {
$this->client = $client = $this->buildBaseClient($request);
if ($client->errCode !== 0) {
call_user_func($error_callback, $request);
} else {
$response = HttpFactory::createResponse($client->statusCode, null, $client->getHeaders(), $client->getBody());
call_user_func($success_callback, $response);
}
});
return true;
}
/**
* 通过 PSR-7 Request 对象创建一个 Swoole Client
*
* @param RequestInterface $request PSR-7 Request 对象
*/
public function buildBaseClient(RequestInterface $request): Client
{
$uri = $request->getUri();
$client = new Client($uri->getHost(), $uri->getPort() ?? ($uri->getScheme() === 'https' ? 443 : 80), $uri->getScheme() === 'https');
// 设置 Swoole 专有的 set 参数
$client->set($this->set);
// 设置 HTTP Method POST、GET 等)
$client->setMethod($request->getMethod());
// 设置 HTTP Headers
$headers = [];
foreach ($request->getHeaders() as $name => $values) {
$headers[$name] = implode(', ', $values);
}
$client->setHeaders($headers);
// 如果是 POST 带 body则设置 body
if (($data = $request->getBody()->getContents()) !== '') {
$client->setData($data);
}
$uri = $request->getUri()->getPath();
if ($uri === '') {
$uri = '/';
}
if (($query = $request->getUri()->getQuery()) !== '') {
$uri .= '?' . $query;
}
if (($fragment = $request->getUri()->getFragment()) !== '') {
$uri .= '?' . $fragment;
}
$client->execute($uri);
return $client;
}
public function getStatus(): int
{
return $this->status;
}
public function send($frame): bool
{
$swoole_frame = new Frame();
if ($frame instanceof FrameInterface) {
$swoole_frame->data = $frame->getData();
$swoole_frame->opcode = $frame->getOpcode();
} else {
$swoole_frame->data = $frame;
}
return (bool) $this->client->push($swoole_frame);
}
public function onMessage(callable $callback)
{
$this->on_message = $callback;
}
public function onClose(callable $callback)
{
$this->on_close = $callback;
}
public function upgrade(UriInterface $uri, array $headers = [], bool $reconnect = false): bool
{
if ($this->on_message === null) {
$this->on_message = function () {};
}
if ($this->on_close === null) {
$this->on_close = function () {};
}
if (!$reconnect && $this->status !== CHOIR_TCP_INITIAL) {
return false;
}
$this->status = CHOIR_TCP_CONNECTING;
$this->client = new Client($uri->getHost(), $uri->getPort() ?? ($uri->getScheme() === 'https' ? 443 : 80), $uri->getScheme() === 'https');
// 设置 Swoole 参数
$this->set['websocket_mask'] = true;
$this->client->set($this->set);
// 设置请求方法为 GET
$this->client->setMethod('GET');
// 设置 Headers
$headers_total = [];
foreach ($headers as $h_name => $header) {
if (is_array($header)) {
$headers_total[$h_name] = implode(', ', $header);
} else {
$headers_total[$h_name] = $header;
}
}
$this->client->setHeaders($headers_total);
// 设置请求的 URI
$uri_total = $uri->getPath();
if ($uri_total === '') {
$uri_total = '/';
}
if (($query = $uri->getQuery()) !== '') {
$uri_total .= '?' . $query;
}
if (($fragment = $uri->getFragment()) !== '') {
$uri_total .= '?' . $fragment;
}
$code = $this->client->upgrade($uri_total);
if ($this->client->errCode !== 0) {
return false;
}
if (!$code) {
$this->status = CHOIR_TCP_CLOSED;
return $code;
}
go(function () {
while (true) {
$result = $this->client->recv(60);
if ($result === false && !$this->client->connected) {
$this->status = CHOIR_TCP_CLOSED;
go(function () {
$frame = FrameFactory::createCloseFrame($this->client->statusCode, '');
call_user_func($this->on_close, $frame, $this);
});
break;
}
if ($result instanceof Frame) {
go(function () use ($result) {
$frame = new \Choir\WebSocket\Frame($result->data, $result->opcode, true, true);
call_user_func($this->on_message, $frame, $this);
});
}
}
});
$this->status = CHOIR_TCP_ESTABLISHED;
return $code;
}
}

View File

@ -0,0 +1,15 @@
<?php
declare(strict_types=1);
namespace Choir\Http\Client;
interface TimeoutInterface
{
/**
* 设置 Client 的超时时间
*
* @param int $timeout 超时时间(毫秒)
*/
public function setTimeout(int $timeout);
}

View File

@ -0,0 +1,56 @@
<?php
declare(strict_types=1);
namespace Choir\Http\Client;
use Choir\WebSocket\FrameInterface;
use Psr\Http\Message\UriInterface;
interface UpgradableClientInterface
{
/**
* 获取 WebSocket 连接状态
*/
public function getStatus(): int;
/**
* 发送 WebSocket Frame 消息帧
*
* 如果发送的是字符串,则自动生成一个文本类型的帧
* 如果发送的是帧,则直接发送
* 发送失败时返回 False
*
* @param FrameInterface|string $frame 消息帧
*/
public function send($frame): bool;
/**
* 设置接收到对端消息时的回调
*
* @param callable $callback 回调函数
* @return mixed
*/
public function onMessage(callable $callback);
/**
* 设置连接断开时的回调
*
* @param callable $callback 回调函数
* @return mixed
*/
public function onClose(callable $callback);
/**
* 发起一个 WebSocket 连接升级的请求
*
* Uri 必须包含 Scheme、目标地址即完整的 URL例如 http://localhost:8089/
* headers 参数可为空
* reconnect 参数为 False 的时候,必须重新声明 Client 对象才可重新链接,传入 True 时会直接复用资源
*
* @param UriInterface $uri 请求的链接
* @param array $headers 请求的 Headers
* @param bool $reconnect 是否重新链接,默认为 False
*/
public function upgrade(UriInterface $uri, array $headers = [], bool $reconnect = false): bool;
}

View File

@ -2,4 +2,27 @@
declare(strict_types=1); declare(strict_types=1);
const CHOIR_PSR_HTTP_VERSION = '1.0.0'; const CHOIR_PSR_HTTP_VERSION = '1.0.1';
// Choir TCP 连接状态
const CHOIR_TCP_INITIAL = 0;
const CHOIR_TCP_CONNECTING = 1;
const CHOIR_TCP_ESTABLISHED = 2;
const CHOIR_TCP_CLOSING = 4;
const CHOIR_TCP_CLOSED = 8;
// Choir TCP 错误码
const CHOIR_TCP_SEND_FAILED = 2;
const CHOIR_WS_CLOSE_NORMAL = 1000;
const CHOIR_WS_CLOSE_GOING_AWAY = 1001;
const CHOIR_WS_CLOSE_PROTOCOL_ERROR = 1002;
const CHOIR_WS_CLOSE_DATA_ERROR = 1003;
const CHOIR_WS_CLOSE_STATUS_ERROR = 1005;
const CHOIR_WS_CLOSE_ABNORMAL = 1006;
const CHOIR_WS_CLOSE_MESSAGE_ERROR = 1007;
const CHOIR_WS_CLOSE_POLICY_ERROR = 1008;
const CHOIR_WS_CLOSE_MESSAGE_TOO_BIG = 1009;
const CHOIR_WS_CLOSE_EXTENSION_MISSING = 1010;
const CHOIR_WS_CLOSE_SERVER_ERROR = 1011;
const CHOIR_WS_CLOSE_TLS = 1015;