Change llhttp to libcurl
This commit is contained in:
+291
-208
@@ -18,235 +18,318 @@
|
||||
#include "common.h"
|
||||
#include "uv_util.h"
|
||||
#include "json_rpc_request.h"
|
||||
#include "llhttp.h"
|
||||
#include <string>
|
||||
#include <curl/curl.h>
|
||||
|
||||
static constexpr char log_category_prefix[] = "JSONRPCRequest ";
|
||||
|
||||
namespace p2pool {
|
||||
namespace JSONRPCRequest {
|
||||
|
||||
JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
|
||||
: m_socket{}
|
||||
, m_connect{}
|
||||
, m_write{}
|
||||
struct CurlContext
|
||||
{
|
||||
CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop);
|
||||
~CurlContext();
|
||||
|
||||
static int socket_func(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp)
|
||||
{
|
||||
CurlContext* ctx = reinterpret_cast<CurlContext*>(socketp ? socketp : userp);
|
||||
return ctx->on_socket(easy, s, action);
|
||||
}
|
||||
|
||||
static int timer_func(CURLM* multi, long timeout_ms, void* ctx)
|
||||
{
|
||||
return reinterpret_cast<CurlContext*>(ctx)->on_timer(multi, timeout_ms);
|
||||
}
|
||||
|
||||
static size_t write_func(const void* buffer, size_t size, size_t count, void* ctx)
|
||||
{
|
||||
return reinterpret_cast<CurlContext*>(ctx)->on_write(buffer, size, count);
|
||||
}
|
||||
|
||||
int on_socket(CURL* easy, curl_socket_t s, int action);
|
||||
int on_timer(CURLM* multi, long timeout_ms);
|
||||
|
||||
static void on_timeout(uv_handle_t* req);
|
||||
|
||||
size_t on_write(const void* buffer, size_t size, size_t count);
|
||||
|
||||
static void curl_perform(uv_poll_t* req, int status, int events);
|
||||
void check_multi_info();
|
||||
|
||||
static void on_close(uv_handle_t* h);
|
||||
|
||||
uv_poll_t m_pollHandle;
|
||||
curl_socket_t m_socket;
|
||||
|
||||
CallbackBase* m_callback;
|
||||
CallbackBase* m_closeCallback;
|
||||
|
||||
uv_loop_t* m_loop;
|
||||
uv_timer_t m_timer;
|
||||
uv_async_t m_async;
|
||||
CURLM* m_multiHandle;
|
||||
CURL* m_handle;
|
||||
|
||||
std::string m_url;
|
||||
std::string m_req;
|
||||
std::string m_auth;
|
||||
|
||||
std::vector<char> m_response;
|
||||
std::string m_error;
|
||||
};
|
||||
|
||||
CurlContext::CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
|
||||
: m_pollHandle{}
|
||||
, m_socket{}
|
||||
, m_callback(cb)
|
||||
, m_closeCallback(close_cb)
|
||||
, m_contentLength(0)
|
||||
, m_contentLengthHeader(false)
|
||||
, m_readBufInUse(false)
|
||||
, m_valid(true)
|
||||
, m_loop(loop)
|
||||
, m_timer{}
|
||||
, m_async{}
|
||||
, m_multiHandle(nullptr)
|
||||
, m_handle(nullptr)
|
||||
, m_req(req)
|
||||
, m_auth(auth)
|
||||
{
|
||||
m_readBuf[0] = '\0';
|
||||
{
|
||||
char buf[log::Stream::BUF_SIZE + 1];
|
||||
buf[0] = '\0';
|
||||
|
||||
uv_tcp_init(loop ? loop : uv_default_loop_checked(), &m_socket);
|
||||
uv_tcp_nodelay(&m_socket, 1);
|
||||
log::Stream s(buf);
|
||||
s << "http://" << address << ':' << port;
|
||||
|
||||
sockaddr_storage addr;
|
||||
if (uv_ip4_addr(address, port, reinterpret_cast<sockaddr_in*>(&addr)) != 0) {
|
||||
const int err = uv_ip6_addr(address, port, reinterpret_cast<sockaddr_in6*>(&addr));
|
||||
if (err) {
|
||||
LOGERR(1, "invalid IP address " << address << " or port " << port);
|
||||
m_valid = false;
|
||||
return;
|
||||
if (!m_req.empty() && (m_req.front() == '/')) {
|
||||
s << m_req.c_str() << '\0';
|
||||
m_req.clear();
|
||||
}
|
||||
else {
|
||||
s << "/json_rpc\0";
|
||||
}
|
||||
|
||||
m_url = buf;
|
||||
}
|
||||
|
||||
m_socket.data = this;
|
||||
m_connect.data = this;
|
||||
m_write.data = this;
|
||||
|
||||
const char* uri = "/json_rpc";
|
||||
|
||||
size_t len = req ? strlen(req) : 0;
|
||||
if (!len) {
|
||||
LOGERR(1, "Empty JSONRPCRequest, fix the code!");
|
||||
m_valid = false;
|
||||
return;
|
||||
}
|
||||
|
||||
if (req[0] == '/') {
|
||||
uri = req;
|
||||
len = 0;
|
||||
}
|
||||
|
||||
m_request.reserve(std::max<size_t>(len + 128, log::Stream::BUF_SIZE + 1));
|
||||
m_request.resize(log::Stream::BUF_SIZE + 1);
|
||||
|
||||
log::Stream s(m_request.data(), m_request.size());
|
||||
s << "POST " << uri << " HTTP/1.1\nContent-Type: application/json\nContent-Length: " << len << "\n\n";
|
||||
|
||||
m_request.resize(s.m_pos);
|
||||
m_request.insert(m_request.end(), req, req + len);
|
||||
|
||||
m_response.reserve(sizeof(m_readBuf));
|
||||
|
||||
const int err = uv_tcp_connect(&m_connect, &m_socket, reinterpret_cast<const sockaddr*>(&addr), on_connect);
|
||||
int err = uv_timer_init(m_loop, &m_timer);
|
||||
if (err) {
|
||||
LOGERR(1, "failed to initiate tcp connection to " << address << ", error " << uv_err_name(err));
|
||||
m_valid = false;
|
||||
LOGERR(1, "uv_timer_init failed, error " << uv_err_name(err));
|
||||
throw std::runtime_error("uv_timer_init failed");
|
||||
}
|
||||
m_timer.data = this;
|
||||
|
||||
err = uv_async_init(m_loop, &m_async, reinterpret_cast<uv_async_cb>(on_timeout));
|
||||
if (err) {
|
||||
LOGERR(1, "uv_async_init failed, error " << uv_err_name(err));
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
|
||||
throw std::runtime_error("uv_async_init failed");
|
||||
}
|
||||
m_async.data = this;
|
||||
|
||||
m_multiHandle = curl_multi_init();
|
||||
if (!m_multiHandle) {
|
||||
constexpr char msg[] = "curl_multi_init() failed";
|
||||
LOGERR(1, msg);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_async), nullptr);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
|
||||
curl_multi_setopt(m_multiHandle, CURLMOPT_SOCKETFUNCTION, socket_func);
|
||||
curl_multi_setopt(m_multiHandle, CURLMOPT_SOCKETDATA, this);
|
||||
|
||||
curl_multi_setopt(m_multiHandle, CURLMOPT_TIMERFUNCTION, timer_func);
|
||||
curl_multi_setopt(m_multiHandle, CURLMOPT_TIMERDATA, this);
|
||||
|
||||
m_handle = curl_easy_init();
|
||||
if (!m_handle) {
|
||||
constexpr char msg[] = "curl_easy_init() failed";
|
||||
LOGERR(1, msg);
|
||||
curl_multi_cleanup(m_multiHandle);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_async), nullptr);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
|
||||
curl_easy_setopt(m_handle, CURLOPT_WRITEFUNCTION, write_func);
|
||||
curl_easy_setopt(m_handle, CURLOPT_WRITEDATA, this);
|
||||
|
||||
curl_easy_setopt(m_handle, CURLOPT_URL, m_url.c_str());
|
||||
curl_easy_setopt(m_handle, CURLOPT_POSTFIELDS, m_req.c_str());
|
||||
curl_easy_setopt(m_handle, CURLOPT_CONNECTTIMEOUT, 1);
|
||||
curl_easy_setopt(m_handle, CURLOPT_TIMEOUT, 10);
|
||||
|
||||
if (!m_auth.empty()) {
|
||||
curl_easy_setopt(m_handle, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST | CURLAUTH_ONLY);
|
||||
curl_easy_setopt(m_handle, CURLOPT_USERPWD, m_auth.c_str());
|
||||
}
|
||||
|
||||
CURLMcode curl_err = curl_multi_add_handle(m_multiHandle, m_handle);
|
||||
if (curl_err != CURLM_OK) {
|
||||
LOGERR(1, "curl_multi_add_handle failed, error " << curl_multi_strerror(curl_err));
|
||||
curl_easy_cleanup(m_handle);
|
||||
curl_multi_cleanup(m_multiHandle);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_async), nullptr);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), nullptr);
|
||||
throw std::runtime_error("curl_multi_add_handle failed");
|
||||
}
|
||||
}
|
||||
|
||||
void JSONRPCRequest::on_connect(uv_connect_t* req, int status)
|
||||
CurlContext::~CurlContext()
|
||||
{
|
||||
JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(req->data);
|
||||
|
||||
if (status != 0) {
|
||||
pThis->m_error = uv_err_name(status);
|
||||
LOGERR(1, "failed to connect, error " << pThis->m_error);
|
||||
pThis->close();
|
||||
return;
|
||||
if (m_error.empty() && !m_response.empty()) {
|
||||
(*m_callback)(m_response.data(), m_response.size());
|
||||
}
|
||||
|
||||
uv_buf_t buf[1];
|
||||
buf[0].base = pThis->m_request.data();
|
||||
buf[0].len = static_cast<uint32_t>(pThis->m_request.size());
|
||||
|
||||
uv_write(&pThis->m_write, reinterpret_cast<uv_stream_t*>(&pThis->m_socket), buf, 1, on_write);
|
||||
}
|
||||
|
||||
void JSONRPCRequest::on_write(uv_write_t* handle, int status)
|
||||
{
|
||||
JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(handle->data);
|
||||
|
||||
if (status != 0) {
|
||||
pThis->m_error = uv_err_name(status);
|
||||
LOGERR(1, "failed to send request, error " << pThis->m_error);
|
||||
pThis->close();
|
||||
return;
|
||||
}
|
||||
|
||||
uv_read_start(reinterpret_cast<uv_stream_t*>(&pThis->m_socket), on_alloc, on_read);
|
||||
}
|
||||
|
||||
void JSONRPCRequest::on_alloc(uv_handle_t* handle, size_t /*suggested_size*/, uv_buf_t* buf)
|
||||
{
|
||||
JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(handle->data);
|
||||
|
||||
if (pThis->m_readBufInUse) {
|
||||
LOGERR(1, "read buffer is already in use");
|
||||
}
|
||||
|
||||
buf->len = sizeof(pThis->m_readBuf);
|
||||
buf->base = pThis->m_readBuf;
|
||||
pThis->m_readBufInUse = true;
|
||||
}
|
||||
|
||||
void JSONRPCRequest::on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
|
||||
{
|
||||
JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(stream->data);
|
||||
pThis->m_readBufInUse = false;
|
||||
|
||||
if (nread > 0) {
|
||||
pThis->on_read(buf->base, nread);
|
||||
}
|
||||
else if (nread < 0) {
|
||||
if (nread != UV_EOF){
|
||||
pThis->m_error = uv_err_name(static_cast<int>(nread));
|
||||
LOGERR(1, "failed to read response, error " << pThis->m_error);
|
||||
}
|
||||
pThis->close();
|
||||
}
|
||||
}
|
||||
|
||||
void JSONRPCRequest::on_read(const char* data, size_t size)
|
||||
{
|
||||
m_response.append(data, size);
|
||||
|
||||
static constexpr char headers_end[] = "\r\n\r\n";
|
||||
if (m_response.find(headers_end) == std::string::npos) {
|
||||
return;
|
||||
}
|
||||
|
||||
llhttp_settings_t settings{};
|
||||
|
||||
settings.on_status = [](llhttp_t*, const char* at, size_t length)
|
||||
{
|
||||
if ((length == 2) && (!memcmp(at, "Ok", 2) || !memcmp(at, "OK", 2))) {
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
};
|
||||
|
||||
settings.on_header_field = [](llhttp_t* parser, const char* at, size_t length)
|
||||
{
|
||||
JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(parser->data);
|
||||
static const char header[] = "Content-Length";
|
||||
pThis->m_contentLengthHeader = ((length == sizeof(header) - 1) && (memcmp(at, header, length) == 0));
|
||||
return 0;
|
||||
};
|
||||
|
||||
settings.on_header_value = [](llhttp_t* parser, const char* at, size_t length)
|
||||
{
|
||||
JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(parser->data);
|
||||
if (pThis->m_contentLengthHeader) {
|
||||
uint32_t k = 0;
|
||||
for (const char* p = at; p < at + length; ++p) {
|
||||
if ('0' <= *p && *p <= '9') {
|
||||
k = k * 10 + (*p - '0');
|
||||
}
|
||||
else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
if (!k) {
|
||||
return -1;
|
||||
}
|
||||
pThis->m_contentLength = k;
|
||||
}
|
||||
return 0;
|
||||
};
|
||||
|
||||
settings.on_body = [](llhttp_t* parser, const char* at, size_t length)
|
||||
{
|
||||
JSONRPCRequest* pThis = static_cast<JSONRPCRequest*>(parser->data);
|
||||
if (pThis->m_contentLength && (length >= pThis->m_contentLength) && pThis->m_callback) {
|
||||
(*pThis->m_callback)(at, length);
|
||||
delete pThis->m_callback;
|
||||
pThis->m_callback = nullptr;
|
||||
}
|
||||
return 0;
|
||||
};
|
||||
|
||||
llhttp_t parser;
|
||||
llhttp_init(&parser, HTTP_RESPONSE, &settings);
|
||||
|
||||
parser.data = this;
|
||||
|
||||
const llhttp_errno result = llhttp_execute(&parser, m_response.c_str(), m_response.length());
|
||||
if (result != HPE_OK) {
|
||||
m_error = "failed to parse response";
|
||||
LOGERR(1, m_error << ", result = " << static_cast<int>(result));
|
||||
close();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!m_callback) {
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
void JSONRPCRequest::close()
|
||||
{
|
||||
uv_handle_t* h = reinterpret_cast<uv_handle_t*>(&m_socket);
|
||||
if (!uv_is_closing(h)) {
|
||||
uv_close(h, on_close);
|
||||
}
|
||||
}
|
||||
|
||||
void JSONRPCRequest::on_close(uv_handle_t* handle)
|
||||
{
|
||||
JSONRPCRequest* req = static_cast<JSONRPCRequest*>(handle->data);
|
||||
if (req->m_closeCallback) {
|
||||
(*req->m_closeCallback)(req->m_error.c_str(), req->m_error.length());
|
||||
}
|
||||
delete req;
|
||||
}
|
||||
|
||||
JSONRPCRequest::~JSONRPCRequest()
|
||||
{
|
||||
delete m_callback;
|
||||
|
||||
(*m_closeCallback)(m_error.c_str(), m_error.length());
|
||||
delete m_closeCallback;
|
||||
}
|
||||
|
||||
int CurlContext::on_socket(CURL* /*easy*/, curl_socket_t s, int action)
|
||||
{
|
||||
switch (action) {
|
||||
case CURL_POLL_IN:
|
||||
case CURL_POLL_OUT:
|
||||
case CURL_POLL_INOUT:
|
||||
{
|
||||
if (!m_socket) {
|
||||
m_socket = s;
|
||||
curl_multi_assign(m_multiHandle, s, this);
|
||||
}
|
||||
else if (m_socket != s) {
|
||||
LOGERR(1, "This code can't work with multiple parallel requests. Fix the code!");
|
||||
}
|
||||
|
||||
int events = 0;
|
||||
if (action != CURL_POLL_IN) events |= UV_WRITABLE;
|
||||
if (action != CURL_POLL_OUT) events |= UV_READABLE;
|
||||
|
||||
if (!m_pollHandle.data) {
|
||||
uv_poll_init_socket(m_loop, &m_pollHandle, s);
|
||||
m_pollHandle.data = this;
|
||||
}
|
||||
|
||||
uv_poll_start(&m_pollHandle, events, curl_perform);
|
||||
}
|
||||
break;
|
||||
|
||||
case CURL_POLL_REMOVE:
|
||||
default:
|
||||
curl_multi_assign(m_multiHandle, s, nullptr);
|
||||
uv_poll_stop(&m_pollHandle);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_async), on_close);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_timer), on_close);
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(&m_pollHandle), on_close);
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int CurlContext::on_timer(CURLM* /*multi*/, long timeout_ms)
|
||||
{
|
||||
if (timeout_ms < 0) {
|
||||
uv_timer_stop(&m_timer);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (timeout_ms == 0) {
|
||||
// 0 ms timeout, but we can't just call on_timeout() here - we have to kick the UV loop
|
||||
uv_async_send(&m_async);
|
||||
return 0;
|
||||
}
|
||||
|
||||
uv_timer_start(&m_timer, reinterpret_cast<uv_timer_cb>(on_timeout), timeout_ms, 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void CurlContext::on_timeout(uv_handle_t* req)
|
||||
{
|
||||
CurlContext* ctx = reinterpret_cast<CurlContext*>(req->data);
|
||||
|
||||
int running_handles;
|
||||
curl_multi_socket_action(ctx->m_multiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles);
|
||||
ctx->check_multi_info();
|
||||
}
|
||||
|
||||
size_t CurlContext::on_write(const void* buffer, size_t size, size_t count)
|
||||
{
|
||||
const char* p = reinterpret_cast<const char*>(buffer);
|
||||
m_response.insert(m_response.end(), p, p + size * count);
|
||||
return count;
|
||||
}
|
||||
|
||||
void CurlContext::curl_perform(uv_poll_t* req, int status, int events)
|
||||
{
|
||||
int flags = 0;
|
||||
if (status < 0) {
|
||||
flags |= CURL_CSELECT_ERR;
|
||||
LOGERR(1, "uv_poll_start returned error " << uv_err_name(status));
|
||||
}
|
||||
else {
|
||||
if (events & UV_READABLE) flags |= CURL_CSELECT_IN;
|
||||
if (events & UV_WRITABLE) flags |= CURL_CSELECT_OUT;
|
||||
}
|
||||
|
||||
CurlContext* ctx = reinterpret_cast<CurlContext*>(req->data);
|
||||
|
||||
int running_handles;
|
||||
curl_multi_socket_action(ctx->m_multiHandle, ctx->m_socket, flags, &running_handles);
|
||||
ctx->check_multi_info();
|
||||
}
|
||||
|
||||
void CurlContext::check_multi_info()
|
||||
{
|
||||
int pending;
|
||||
while (CURLMsg* message = curl_multi_info_read(m_multiHandle, &pending)) {
|
||||
if (message->msg == CURLMSG_DONE) {
|
||||
if ((message->data.result != CURLE_OK) || m_response.empty()) {
|
||||
m_error = m_response.empty() ? "empty response" : curl_easy_strerror(message->data.result);
|
||||
}
|
||||
|
||||
long http_code = 0;
|
||||
curl_easy_getinfo(message->easy_handle, CURLINFO_RESPONSE_CODE, &http_code);
|
||||
|
||||
if (http_code != 200) {
|
||||
char buf[32] = {};
|
||||
log::Stream s(buf);
|
||||
s << "HTTP error " << static_cast<int>(http_code) << '\0';
|
||||
m_error = buf;
|
||||
}
|
||||
|
||||
curl_multi_remove_handle(m_multiHandle, m_handle);
|
||||
curl_easy_cleanup(m_handle);
|
||||
curl_multi_cleanup(m_multiHandle);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CurlContext::on_close(uv_handle_t* h)
|
||||
{
|
||||
CurlContext* ctx = reinterpret_cast<CurlContext*>(h->data);
|
||||
h->data = nullptr;
|
||||
|
||||
if (ctx->m_timer.data || ctx->m_async.data || ctx->m_pollHandle.data) {
|
||||
return;
|
||||
}
|
||||
|
||||
delete ctx;
|
||||
}
|
||||
|
||||
void Call(const std::string& address, int port, const std::string& req, const std::string& auth, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop)
|
||||
{
|
||||
CallOnLoop(loop,
|
||||
[=]()
|
||||
{
|
||||
try {
|
||||
new CurlContext(address, port, req, auth, cb, close_cb, loop);
|
||||
}
|
||||
catch (const std::exception& e) {
|
||||
const char* msg = e.what();
|
||||
(*close_cb)(msg, strlen(msg));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace JSONRPCRequest
|
||||
} // namespace p2pool
|
||||
|
||||
Reference in New Issue
Block a user