From ef91fb3aeb4bdb3f4883e1c0db174f8ecb001504 Mon Sep 17 00:00:00 2001 From: crazywhalecc Date: Mon, 26 Dec 2022 19:29:47 +0800 Subject: [PATCH] add base client --- .../Http/Client/AsyncClientInterface.php | 19 ++ src/Choir/Http/Client/CurlClient.php | 129 +++++++ .../Http/Client/Exception/ClientException.php | 11 + .../Client/Exception/NetworkException.php | 24 ++ .../Client/Exception/RequestException.php | 27 ++ src/Choir/Http/Client/StreamClient.php | 323 ++++++++++++++++++ src/Choir/Http/Client/SwooleClient.php | 219 ++++++++++++ src/Choir/Http/Client/TimeoutInterface.php | 15 + .../Http/Client/UpgradableClientInterface.php | 56 +++ src/Choir/globals.php | 25 +- 10 files changed, 847 insertions(+), 1 deletion(-) create mode 100644 src/Choir/Http/Client/AsyncClientInterface.php create mode 100644 src/Choir/Http/Client/CurlClient.php create mode 100644 src/Choir/Http/Client/Exception/ClientException.php create mode 100644 src/Choir/Http/Client/Exception/NetworkException.php create mode 100644 src/Choir/Http/Client/Exception/RequestException.php create mode 100644 src/Choir/Http/Client/StreamClient.php create mode 100644 src/Choir/Http/Client/SwooleClient.php create mode 100644 src/Choir/Http/Client/TimeoutInterface.php create mode 100644 src/Choir/Http/Client/UpgradableClientInterface.php diff --git a/src/Choir/Http/Client/AsyncClientInterface.php b/src/Choir/Http/Client/AsyncClientInterface.php new file mode 100644 index 0000000..4139713 --- /dev/null +++ b/src/Choir/Http/Client/AsyncClientInterface.php @@ -0,0 +1,19 @@ +curl_options = $curl_options; + } + + /** + * {@inheritDoc} + */ + public function setTimeout(int $timeout) + { + $this->curl_options[CURLOPT_TIMEOUT_MS] = $timeout; + } + + /** + * {@inheritDoc} + */ + public function sendRequest(RequestInterface $request): ResponseInterface + { + $handle = $this->createHandle($request); + $success = curl_exec($handle); + if ($success === false) { + throw new NetworkException($request, curl_error($handle), curl_errno($handle)); + } + $response = $this->createResponse($handle); + curl_close($handle); + return $response; + } + + /** + * @throws ClientException + * @return \CurlHandle|false|resource + */ + private function createHandle(RequestInterface $request) /* @phpstan-ignore-line */ + { + $this->curl_options[CURLOPT_RETURNTRANSFER] = true; // 返回的内容作为变量储存,而不是直接输出 + $this->curl_options[CURLOPT_HEADER] = true; // 获取结果返回时包含Header数据 + $this->curl_options[CURLOPT_CUSTOMREQUEST] = $request->getMethod(); // 设置请求方式 + $this->curl_options[CURLOPT_URL] = (string) $request->getUri(); // 设置请求的URL + $this->curl_options[CURLOPT_POSTFIELDS] = (string) $request->getBody(); // 设置请求的Body + $this->curl_options[CURLOPT_SSL_VERIFYHOST] = false; // 取消认证ssl + $this->curl_options[CURLOPT_SSL_VERIFYPEER] = false; // 取消认证ssl + // 设置请求头 + foreach ($request->getHeaders() as $name => $values) { + foreach ($values as $value) { + $this->curl_options[CURLOPT_HTTPHEADER][] = sprintf('%s: %s', $name, $value); + } + } + + /** @var \CurlHandle|false|resource $curl_handle */ + $curl_handle = curl_init(); /* @phpstan-ignore-line */ + if ($curl_handle === false) { + throw new ClientException('Unable to initialize a cURL handle'); + } + + $success = curl_setopt_array($curl_handle, $this->curl_options); + if ($success === false) { + throw new ClientException('Unable to configure a cURL handle'); + } + + return $curl_handle; + } + + /** + * @param \CurlHandle|int|resource $handle + */ + private function createResponse($handle): ResponseInterface /* @phpstan-ignore-line */ + { + $status_code = curl_getinfo($handle, CURLINFO_RESPONSE_CODE); + $response = HttpFactory::createResponse($status_code); + + /** @var null|string $message */ + $message = curl_multi_getcontent($handle); /* @phpstan-ignore-line */ + if ($message === null) { + return $response; + } + + $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE); + $header = substr($message, 0, $header_size); + + $fields = explode("\n", $header); + foreach ($fields as $field) { + $colpos = strpos($field, ':'); + if ($colpos === false) { // Status Line + continue; + } + if ($colpos === 0) { // HTTP/2 Field + continue; + } + + [$name, $value] = explode(':', $field, 2); + + $response = $response->withAddedHeader(trim($name), trim($value)); + } + + $body = substr($message, $header_size); + $response->getBody()->write($body); + + return $response; + } +} diff --git a/src/Choir/Http/Client/Exception/ClientException.php b/src/Choir/Http/Client/Exception/ClientException.php new file mode 100644 index 0000000..081bbcf --- /dev/null +++ b/src/Choir/Http/Client/Exception/ClientException.php @@ -0,0 +1,11 @@ +request = $request; + parent::__construct($message, $code, $previous); + } + + public function getRequest(): RequestInterface + { + return $this->request; + } +} diff --git a/src/Choir/Http/Client/Exception/RequestException.php b/src/Choir/Http/Client/Exception/RequestException.php new file mode 100644 index 0000000..d28205c --- /dev/null +++ b/src/Choir/Http/Client/Exception/RequestException.php @@ -0,0 +1,27 @@ +request = $request; + parent::__construct($message, $code, $previous); + } + + /** + * {@inheritDoc} + */ + public function getRequest(): RequestInterface + { + return $this->request; + } +} diff --git a/src/Choir/Http/Client/StreamClient.php b/src/Choir/Http/Client/StreamClient.php new file mode 100644 index 0000000..08948bf --- /dev/null +++ b/src/Choir/Http/Client/StreamClient.php @@ -0,0 +1,323 @@ + null, + 'timeout' => 1000, // 单位:毫秒 + 'stream_context_options' => [], + 'stream_context_param' => [], + 'ssl' => null, + 'write_buffer_size' => 8192, + 'ssl_method' => STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT, + 'event_loop' => null, + ]; + + /** + * Constructor. + */ + public function __construct(array $config = []) + { + $this->config = array_merge($this->config, $config); + $this->config['stream_context'] = stream_context_create($this->config['stream_context_options'], $this->config['stream_context_param']); + } + + public function setTimeout(int $timeout): void + { + $this->config['timeout'] = $timeout; + } + + /** + * {@inheritdoc} + * @throws \Exception + */ + public function sendRequest(RequestInterface $request): ResponseInterface + { + $remote = $this->config['remote_socket']; + $useSsl = $this->config['ssl']; + + if (!$request->hasHeader('Connection')) { + $request = $request->withHeader('Connection', 'close'); + } + + if ($remote === null) { + $remote = $this->determineRemoteFromRequest($request); + } + + if ($useSsl === null) { + $useSsl = ($request->getUri()->getScheme() === 'https'); + } + + $socket = $this->createSocket($request, $remote, $useSsl); + stream_set_timeout($socket, (int) floor($this->config['timeout'] / 1000), $this->config['timeout'] % 1000); + + try { + $this->writeRequest($socket, $request, $this->config['write_buffer_size']); + $response = $this->readResponse($request, $socket); + } catch (\Exception $e) { + $this->closeSocket($socket); + throw $e; + } + + return $response; + } + + /** + * Create the socket to write request and read response on it. + * + * @param RequestInterface $request Request for + * @param string $remote Entrypoint for the connection + * @param bool $useSsl Whether to use ssl or not + * + * @throws NetworkException + * @return resource Socket resource + */ + protected function createSocket(RequestInterface $request, string $remote, bool $useSsl) + { + $errNo = null; + $errMsg = null; + $socket = @stream_socket_client($remote, $errNo, $errMsg, floor($this->config['timeout'] / 1000), STREAM_CLIENT_CONNECT, $this->config['stream_context']); + + if ($socket === false) { + if ($errNo === 110) { + throw new NetworkException($request, $errMsg); + } + + throw new NetworkException($request, $errMsg); + } + + if ($useSsl && @stream_socket_enable_crypto($socket, true, $this->config['ssl_method']) === false) { + throw new NetworkException($request, sprintf('Cannot enable tls: %s', error_get_last()['message'])); + } + + return $socket; + } + + /** + * Close the socket, used when having an error. + * + * @param resource $socket + */ + protected function closeSocket($socket): void + { + fclose($socket); + } + + /** + * Write a request to a socket. + * + * @param resource $socket + * @throws NetworkException + */ + protected function writeRequest($socket, RequestInterface $request, int $bufferSize = 8192): void + { + if ($this->fwrite($socket, $this->transformRequestHeadersToString($request)) === false) { + throw new NetworkException($request, 'Failed to send request, underlying socket not accessible, (BROKEN EPIPE)'); + } + + if ($request->getBody()->isReadable()) { + $this->writeBody($socket, $request, $bufferSize); + } + } + + /** + * Write Body of the request. + * + * @param resource $socket + * @throws NetworkException + */ + protected function writeBody($socket, RequestInterface $request, int $bufferSize = 8192): void + { + $body = $request->getBody(); + + if ($body->isSeekable()) { + $body->rewind(); + } + + while (!$body->eof()) { + $buffer = $body->read($bufferSize); + + if ($this->fwrite($socket, $buffer) === false) { + throw new NetworkException($request, 'An error occur when writing request to client (BROKEN EPIPE)'); + } + } + } + + /** + * Produce the header of request as a string based on a PSR Request. + */ + protected function transformRequestHeadersToString(RequestInterface $request): string + { + $message = vsprintf('%s %s HTTP/%s', [ + strtoupper($request->getMethod()), + $request->getRequestTarget(), + $request->getProtocolVersion(), + ]) . "\r\n"; + + foreach ($request->getHeaders() as $name => $values) { + $message .= $name . ': ' . implode(', ', $values) . "\r\n"; + } + + $message .= "\r\n"; + + return $message; + } + + /** + * Read a response from a socket. + * + * @param resource $socket + * + * @throws NetworkException + */ + protected function readResponse(RequestInterface $request, $socket): ResponseInterface + { + $headers = []; + $reason = null; + + while (false !== ($line = fgets($socket))) { + if (rtrim($line) === '') { + break; + } + $headers[] = trim($line); + } + + $metadatas = stream_get_meta_data($socket); + + if (array_key_exists('timed_out', $metadatas) && $metadatas['timed_out'] === true) { + throw new NetworkException($request, 'Error while reading response, stream timed out'); + } + + $parts = explode(' ', array_shift($headers), 3); + + if (count($parts) <= 1) { + throw new NetworkException($request, 'Cannot read the response'); + } + + $protocol = substr($parts[0], -3); + $status = $parts[1]; + + if (isset($parts[2])) { + $reason = $parts[2]; + } + + // Set the size on the stream if it was returned in the response + $responseHeaders = []; + + foreach ($headers as $header) { + $headerParts = explode(':', $header, 2); + + if (!array_key_exists(trim($headerParts[0]), $responseHeaders)) { + $responseHeaders[trim($headerParts[0])] = []; + } + + $responseHeaders[trim($headerParts[0])][] = isset($headerParts[1]) + ? trim($headerParts[1]) + : ''; + } + + $response = new Response($status, $responseHeaders, null, $protocol, $reason); + $stream = Stream::create($socket); + + return $response->withBody($stream); + } + + /** + * Replace fwrite behavior as api is broken in PHP. + * + * @see https://secure.phabricator.com/rPHU69490c53c9c2ef2002bc2dd4cecfe9a4b080b497 + * + * @param resource $stream The stream resource + * + * @return bool|int false if pipe is broken, number of bytes written otherwise + */ + private function fwrite($stream, string $bytes) + { + if (empty($bytes)) { + return 0; + } + $result = @fwrite($stream, $bytes); + if ($result !== 0) { + // In cases where some bytes are witten (`$result > 0`) or + // an error occurs (`$result === false`), the behavior of fwrite() is + // correct. We can return the value as-is. + return $result; + } + // If we make it here, we performed a 0-length write. Try to distinguish + // between EAGAIN and EPIPE. To do this, we're going to `stream_select()` + // the stream, write to it again if PHP claims that it's writable, and + // consider the pipe broken if the write fails. + $read = []; + $write = [$stream]; + $except = []; + $ss = @stream_select($read, $write, $except, 0); + // 这里做了个修改,原来下面是 !$write,但静态分析出来它是永久的false,所以改成了 !$ss + if (!$ss) { + // The stream isn't writable, so we conclude that it probably really is + // blocked and the underlying error was EAGAIN. Return 0 to indicate that + // no data could be written yet. + return 0; + } + // If we make it here, PHP **just** claimed that this stream is writable, so + // perform a write. If the write also fails, conclude that these failures are + // EPIPE or some other permanent failure. + $result = @fwrite($stream, $bytes); + if ($result !== 0) { + // The write worked or failed explicitly. This value is fine to return. + return $result; + } + // We performed a 0-length write, were told that the stream was writable, and + // then immediately performed another 0-length write. Conclude that the pipe + // is broken and return `false`. + return false; + } + + /** + * Return remote socket from the request. + * + * @throws RequestException + */ + private function determineRemoteFromRequest(RequestInterface $request): string + { + if (!$request->hasHeader('Host') && $request->getUri()->getHost() === '') { + throw new RequestException($request, 'Remote is not defined and we cannot determine a connection endpoint for this request (no Host header)'); + } + + $endpoint = ''; + + $host = $request->getUri()->getHost(); + if (!empty($host)) { + $endpoint .= $host; + if ($request->getUri()->getPort() !== null) { + $endpoint .= ':' . $request->getUri()->getPort(); + } elseif ($request->getUri()->getScheme() === 'https') { + $endpoint .= ':443'; + } else { + $endpoint .= ':80'; + } + } + + // If use the host header if present for the endpoint + if (empty($host) && $request->hasHeader('Host')) { + $endpoint = $request->getHeaderLine('Host'); + } + + return sprintf('tcp://%s', $endpoint); + } +} diff --git a/src/Choir/Http/Client/SwooleClient.php b/src/Choir/Http/Client/SwooleClient.php new file mode 100644 index 0000000..a122a2d --- /dev/null +++ b/src/Choir/Http/Client/SwooleClient.php @@ -0,0 +1,219 @@ +withSwooleSet($set); + } + + public function withSwooleSet(array $set = []): SwooleClient + { + if (!empty($set)) { + $this->set = $set; + } + return $this; + } + + public function setTimeout(int $timeout) + { + $this->set['timeout'] = $timeout / 1000; + } + + public function sendRequest(RequestInterface $request): ResponseInterface + { + $this->client = $client = $this->buildBaseClient($request); + if ($client->errCode !== 0) { + throw new NetworkException($request, $client->errMsg, $client->errCode); + } + return HttpFactory::createResponse($client->statusCode, null, $client->getHeaders(), $client->getBody()); + } + + public function sendRequestAsync(RequestInterface $request, callable $success_callback, callable $error_callback): bool + { + go(function () use ($request, $success_callback, $error_callback) { + $this->client = $client = $this->buildBaseClient($request); + if ($client->errCode !== 0) { + call_user_func($error_callback, $request); + } else { + $response = HttpFactory::createResponse($client->statusCode, null, $client->getHeaders(), $client->getBody()); + call_user_func($success_callback, $response); + } + }); + return true; + } + + /** + * 通过 PSR-7 的 Request 对象创建一个 Swoole Client + * + * @param RequestInterface $request PSR-7 Request 对象 + */ + public function buildBaseClient(RequestInterface $request): Client + { + $uri = $request->getUri(); + $client = new Client($uri->getHost(), $uri->getPort() ?? ($uri->getScheme() === 'https' ? 443 : 80), $uri->getScheme() === 'https'); + // 设置 Swoole 专有的 set 参数 + $client->set($this->set); + // 设置 HTTP Method (POST、GET 等) + $client->setMethod($request->getMethod()); + // 设置 HTTP Headers + $headers = []; + foreach ($request->getHeaders() as $name => $values) { + $headers[$name] = implode(', ', $values); + } + $client->setHeaders($headers); + // 如果是 POST 带 body,则设置 body + if (($data = $request->getBody()->getContents()) !== '') { + $client->setData($data); + } + $uri = $request->getUri()->getPath(); + if ($uri === '') { + $uri = '/'; + } + if (($query = $request->getUri()->getQuery()) !== '') { + $uri .= '?' . $query; + } + if (($fragment = $request->getUri()->getFragment()) !== '') { + $uri .= '?' . $fragment; + } + $client->execute($uri); + return $client; + } + + public function getStatus(): int + { + return $this->status; + } + + public function send($frame): bool + { + $swoole_frame = new Frame(); + if ($frame instanceof FrameInterface) { + $swoole_frame->data = $frame->getData(); + $swoole_frame->opcode = $frame->getOpcode(); + } else { + $swoole_frame->data = $frame; + } + return (bool) $this->client->push($swoole_frame); + } + + public function onMessage(callable $callback) + { + $this->on_message = $callback; + } + + public function onClose(callable $callback) + { + $this->on_close = $callback; + } + + public function upgrade(UriInterface $uri, array $headers = [], bool $reconnect = false): bool + { + if ($this->on_message === null) { + $this->on_message = function () {}; + } + if ($this->on_close === null) { + $this->on_close = function () {}; + } + if (!$reconnect && $this->status !== CHOIR_TCP_INITIAL) { + return false; + } + $this->status = CHOIR_TCP_CONNECTING; + $this->client = new Client($uri->getHost(), $uri->getPort() ?? ($uri->getScheme() === 'https' ? 443 : 80), $uri->getScheme() === 'https'); + // 设置 Swoole 参数 + $this->set['websocket_mask'] = true; + $this->client->set($this->set); + // 设置请求方法为 GET + $this->client->setMethod('GET'); + // 设置 Headers + $headers_total = []; + foreach ($headers as $h_name => $header) { + if (is_array($header)) { + $headers_total[$h_name] = implode(', ', $header); + } else { + $headers_total[$h_name] = $header; + } + } + $this->client->setHeaders($headers_total); + // 设置请求的 URI + $uri_total = $uri->getPath(); + if ($uri_total === '') { + $uri_total = '/'; + } + if (($query = $uri->getQuery()) !== '') { + $uri_total .= '?' . $query; + } + if (($fragment = $uri->getFragment()) !== '') { + $uri_total .= '?' . $fragment; + } + $code = $this->client->upgrade($uri_total); + if ($this->client->errCode !== 0) { + return false; + } + if (!$code) { + $this->status = CHOIR_TCP_CLOSED; + return $code; + } + go(function () { + while (true) { + $result = $this->client->recv(60); + if ($result === false && !$this->client->connected) { + $this->status = CHOIR_TCP_CLOSED; + go(function () { + $frame = FrameFactory::createCloseFrame($this->client->statusCode, ''); + call_user_func($this->on_close, $frame, $this); + }); + break; + } + if ($result instanceof Frame) { + go(function () use ($result) { + $frame = new \Choir\WebSocket\Frame($result->data, $result->opcode, true, true); + call_user_func($this->on_message, $frame, $this); + }); + } + } + }); + $this->status = CHOIR_TCP_ESTABLISHED; + return $code; + } +} diff --git a/src/Choir/Http/Client/TimeoutInterface.php b/src/Choir/Http/Client/TimeoutInterface.php new file mode 100644 index 0000000..7f9f4b0 --- /dev/null +++ b/src/Choir/Http/Client/TimeoutInterface.php @@ -0,0 +1,15 @@ +