Support persistent connection for server.

master
Chunting Gu 6 years ago
parent 6c4ac75c35
commit 6c5dedb807

@ -19,6 +19,7 @@ set(HEADERS
http_client_pool.h http_client_pool.h
http_client_session.h http_client_session.h
http_connection.h http_connection.h
http_connection_pool.h
http_message.h http_message.h
http_parser.h http_parser.h
http_request.h http_request.h
@ -46,6 +47,7 @@ set(SOURCES
http_client_pool.cc http_client_pool.cc
http_client_session.cc http_client_session.cc
http_connection.cc http_connection.cc
http_connection_pool.cc
http_message.cc http_message.cc
http_parser.cc http_parser.cc
http_request.cc http_request.cc

@ -22,8 +22,8 @@ HttpClient::HttpClient()
bool HttpClient::Request(HttpRequestPtr request, bool connect) { bool HttpClient::Request(HttpRequestPtr request, bool connect) {
io_context_.restart(); io_context_.restart();
response_.reset(new HttpResponse()); response_.reset(new HttpResponse{});
response_parser_.reset(new HttpResponseParser(response_.get())); response_parser_.Init(response_.get());
closed_ = false; closed_ = false;
timer_canceled_ = false; timer_canceled_ = false;
@ -166,7 +166,7 @@ void HttpClient::DoReadResponse(Error* error) {
LOG_VERB("Socket async read handler."); LOG_VERB("Socket async read handler.");
// Stop the deadline timer once the read has started. // Stop the deadline timer once the read has started (or failed).
CancelTimer(); CancelTimer();
if (ec || length == 0) { if (ec || length == 0) {
@ -179,20 +179,17 @@ void HttpClient::DoReadResponse(Error* error) {
LOG_INFO("Read data, length: %u.", length); LOG_INFO("Read data, length: %u.", length);
// Parse the response piece just read. // Parse the response piece just read.
if (!response_parser_->Parse(buffer_.data(), length)) { if (!response_parser_.Parse(buffer_.data(), length)) {
//CancelTimer();
Close(); Close();
*error = kHttpError; *error = kHttpError;
LOG_ERRO("Failed to parse HTTP response."); LOG_ERRO("Failed to parse HTTP response.");
return; return;
} }
if (response_parser_->finished()) { if (response_parser_.finished()) {
// Stop trying to read once all content has been received, because // Stop trying to read once all content has been received, because
// some servers will block extra call to read_some(). // some servers will block extra call to read_some().
//CancelTimer();
if (response_->IsConnectionKeepAlive()) { if (response_->IsConnectionKeepAlive()) {
// Close the timer but keep the socket connection. // Close the timer but keep the socket connection.
LOG_INFO("Keep the socket connection alive."); LOG_INFO("Keep the socket connection alive.");
@ -220,22 +217,24 @@ void HttpClient::DoReadResponse(Error* error) {
} }
void HttpClient::DoWaitTimer() { void HttpClient::DoWaitTimer() {
LOG_VERB("Wait timer asynchronously.");
timer_.async_wait( timer_.async_wait(
std::bind(&HttpClient::OnTimer, this, std::placeholders::_1)); std::bind(&HttpClient::OnTimer, this, std::placeholders::_1));
} }
void HttpClient::OnTimer(boost::system::error_code ec) { void HttpClient::OnTimer(boost::system::error_code ec) {
if (closed_) { LOG_VERB("On deadline timer.");
// timer_.cancel() was called.
if (ec == boost::asio::error::operation_aborted) {
LOG_VERB("Deadline timer canceled.");
return; return;
} }
LOG_VERB("On deadline timer."); if (closed_) {
LOG_VERB("Socket has been closed.");
// NOTE: Can't check this: return;
// if (ec == boost::asio::error::operation_aborted) { }
// LOG_VERB("Deadline timer canceled.");
// return;
// }
if (timer_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) { if (timer_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
// The deadline has passed. // The deadline has passed.

@ -83,7 +83,7 @@ private:
std::unique_ptr<HttpSocketBase> socket_; std::unique_ptr<HttpSocketBase> socket_;
HttpResponsePtr response_; HttpResponsePtr response_;
std::unique_ptr<HttpResponseParser> response_parser_; HttpResponseParser response_parser_;
// Timer for the timeout control. // Timer for the timeout control.
boost::asio::deadline_timer timer_; boost::asio::deadline_timer timer_;

@ -1,12 +1,9 @@
#ifndef WEBCC_HTTP_CLIENT_SESSION_H_ #ifndef WEBCC_HTTP_CLIENT_SESSION_H_
#define WEBCC_HTTP_CLIENT_SESSION_H_ #define WEBCC_HTTP_CLIENT_SESSION_H_
#include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
#include "boost/optional.hpp"
#include "webcc/http_client_pool.h" #include "webcc/http_client_pool.h"
#include "webcc/http_request_builder.h" #include "webcc/http_request_builder.h"
#include "webcc/http_response.h" #include "webcc/http_response.h"

@ -4,6 +4,7 @@
#include "boost/asio/write.hpp" #include "boost/asio/write.hpp"
#include "webcc/http_connection_pool.h"
#include "webcc/http_request_handler.h" #include "webcc/http_request_handler.h"
#include "webcc/logger.h" #include "webcc/logger.h"
@ -11,14 +12,17 @@ using boost::asio::ip::tcp;
namespace webcc { namespace webcc {
HttpConnection::HttpConnection(tcp::socket socket, HttpRequestHandler* handler) HttpConnection::HttpConnection(tcp::socket socket, HttpConnectionPool* pool,
HttpRequestHandler* handler)
: socket_(std::move(socket)), : socket_(std::move(socket)),
pool_(pool),
buffer_(kBufferSize), buffer_(kBufferSize),
request_handler_(handler), request_handler_(handler) {
request_parser_(&request_) {
} }
void HttpConnection::Start() { void HttpConnection::Start() {
request_.reset(new HttpRequest{});
request_parser_.Init(request_.get());
DoRead(); DoRead();
} }
@ -37,8 +41,11 @@ void HttpConnection::SendResponse(HttpResponsePtr response) {
response_ = response; response_ = response;
// TODO: Support keep-alive. if (request_->IsConnectionKeepAlive()) {
response_->SetHeader(http::headers::kConnection, "Close"); response_->SetHeader(http::headers::kConnection, "Keep-Alive");
} else {
response_->SetHeader(http::headers::kConnection, "Close");
}
response_->Prepare(); response_->Prepare();
@ -60,7 +67,7 @@ void HttpConnection::OnRead(boost::system::error_code ec, std::size_t length) {
if (ec) { if (ec) {
LOG_ERRO("Socket read error (%s).", ec.message().c_str()); LOG_ERRO("Socket read error (%s).", ec.message().c_str());
if (ec != boost::asio::error::operation_aborted) { if (ec != boost::asio::error::operation_aborted) {
Close(); pool_->Close(shared_from_this());
} }
return; return;
} }
@ -79,7 +86,7 @@ void HttpConnection::OnRead(boost::system::error_code ec, std::size_t length) {
return; return;
} }
LOG_VERB("HTTP request:\n%s", request_.Dump(4, "> ").c_str()); LOG_VERB("HTTP request:\n%s", request_->Dump(4, "> ").c_str());
// Enqueue this connection. // Enqueue this connection.
// Some worker thread will handle it later. // Some worker thread will handle it later.
@ -104,13 +111,19 @@ void HttpConnection::OnWrite(boost::system::error_code ec, std::size_t length) {
LOG_ERRO("Socket write error (%s).", ec.message().c_str()); LOG_ERRO("Socket write error (%s).", ec.message().c_str());
if (ec != boost::asio::error::operation_aborted) { if (ec != boost::asio::error::operation_aborted) {
Close(); pool_->Close(shared_from_this());
} }
} else { } else {
LOG_INFO("Response has been sent back, length: %u.", length); LOG_INFO("Response has been sent back, length: %u.", length);
Shutdown(); if (request_->IsConnectionKeepAlive()) {
Close(); // Necessary even after shutdown! LOG_INFO("The client asked for keep-alive connection.");
LOG_INFO("Continue to read next request...");
Start();
} else {
Shutdown();
pool_->Close(shared_from_this());
}
} }
} }

@ -15,13 +15,14 @@
namespace webcc { namespace webcc {
class HttpConnection; class HttpConnection;
class HttpConnectionPool;
class HttpRequestHandler; class HttpRequestHandler;
typedef std::shared_ptr<HttpConnection> HttpConnectionPtr; typedef std::shared_ptr<HttpConnection> HttpConnectionPtr;
class HttpConnection : public std::enable_shared_from_this<HttpConnection> { class HttpConnection : public std::enable_shared_from_this<HttpConnection> {
public: public:
HttpConnection(boost::asio::ip::tcp::socket socket, HttpConnection(boost::asio::ip::tcp::socket socket, HttpConnectionPool* pool,
HttpRequestHandler* handler); HttpRequestHandler* handler);
~HttpConnection() = default; ~HttpConnection() = default;
@ -29,7 +30,7 @@ public:
HttpConnection(const HttpConnection&) = delete; HttpConnection(const HttpConnection&) = delete;
HttpConnection& operator=(const HttpConnection&) = delete; HttpConnection& operator=(const HttpConnection&) = delete;
const HttpRequest& request() const { HttpRequestPtr request() const {
return request_; return request_;
} }
@ -57,6 +58,9 @@ private:
// Socket for the connection. // Socket for the connection.
boost::asio::ip::tcp::socket socket_; boost::asio::ip::tcp::socket socket_;
// The pool for this connection.
HttpConnectionPool* pool_;
// Buffer for incoming data. // Buffer for incoming data.
std::vector<char> buffer_; std::vector<char> buffer_;
@ -64,7 +68,7 @@ private:
HttpRequestHandler* request_handler_; HttpRequestHandler* request_handler_;
// The incoming request. // The incoming request.
HttpRequest request_; HttpRequestPtr request_;
// The parser for the incoming request. // The parser for the incoming request.
HttpRequestParser request_parser_; HttpRequestParser request_parser_;

@ -0,0 +1,30 @@
#include "webcc/http_connection_pool.h"
#include "webcc/logger.h"
namespace webcc {
HttpConnectionPool::HttpConnectionPool() {
}
void HttpConnectionPool::Start(HttpConnectionPtr c) {
LOG_VERB("Starting connection...");
connections_.insert(c);
c->Start();
}
void HttpConnectionPool::Close(HttpConnectionPtr c) {
LOG_VERB("Closing connection...");
connections_.erase(c);
c->Close();
}
void HttpConnectionPool::CloseAll() {
LOG_VERB("Closing all (%u) connections...", connections_.size());
for (auto& c : connections_) {
c->Close();
}
connections_.clear();
}
} // namespace webcc

@ -0,0 +1,33 @@
#ifndef WEBCC_HTTP_CONNECTION_POOL_H_
#define WEBCC_HTTP_CONNECTION_POOL_H_
#include <set>
#include "webcc/http_connection.h"
namespace webcc {
class HttpConnectionPool {
public:
HttpConnectionPool(const HttpConnectionPool&) = delete;
HttpConnectionPool& operator=(const HttpConnectionPool&) = delete;
HttpConnectionPool();
// Add a connection to the pool and start it.
void Start(HttpConnectionPtr c);
// Close a connection.
void Close(HttpConnectionPtr c);
// Close all connections.
void CloseAll();
private:
/// The managed connections.
std::set<HttpConnectionPtr> connections_;
};
} // namespace webcc
#endif // WEBCC_HTTP_CONNECTION_POOL_H_

@ -48,6 +48,10 @@ public:
// optional |existed| parameter will be set to false. // optional |existed| parameter will be set to false.
const std::string& Get(const std::string& key, bool* existed = nullptr) const; const std::string& Get(const std::string& key, bool* existed = nullptr) const;
void Clear() {
headers_.clear();
}
private: private:
std::vector<HttpHeader>::iterator Find(const std::string& key); std::vector<HttpHeader>::iterator Find(const std::string& key);

@ -33,6 +33,11 @@ HttpParser::HttpParser(HttpMessage* message)
finished_(false) { finished_(false) {
} }
void HttpParser::Init(HttpMessage* message) {
Reset();
message_ = message;
}
bool HttpParser::Parse(const char* data, std::size_t length) { bool HttpParser::Parse(const char* data, std::size_t length) {
// Append the new data to the pending data. // Append the new data to the pending data.
pending_data_.append(data, length); pending_data_.append(data, length);
@ -63,6 +68,19 @@ bool HttpParser::Parse(const char* data, std::size_t length) {
} }
} }
void HttpParser::Reset() {
pending_data_.clear();
content_.clear();
content_length_ = kInvalidLength;
start_line_parsed_ = false;
content_length_parsed_ = false;
header_ended_ = false;
chunked_ = false;
chunk_size_ = kInvalidLength;
finished_ = false;
}
bool HttpParser::ParseHeaders() { bool HttpParser::ParseHeaders() {
std::size_t off = 0; std::size_t off = 0;

@ -19,6 +19,8 @@ public:
HttpParser(const HttpParser&) = delete; HttpParser(const HttpParser&) = delete;
HttpParser& operator=(const HttpParser&) = delete; HttpParser& operator=(const HttpParser&) = delete;
void Init(HttpMessage* message);
bool finished() const { return finished_; } bool finished() const { return finished_; }
std::size_t content_length() const { return content_length_; } std::size_t content_length() const { return content_length_; }
@ -26,6 +28,9 @@ public:
bool Parse(const char* data, std::size_t length); bool Parse(const char* data, std::size_t length);
protected: protected:
// Reset for next parse.
void Reset();
// Parse headers from pending data. // Parse headers from pending data.
// Return false only on syntax errors. // Return false only on syntax errors.
bool ParseHeaders(); bool ParseHeaders();

@ -64,10 +64,12 @@ public:
return port().empty() ? default_port : port(); return port().empty() ? default_port : port();
} }
// TODO: Remove
std::size_t buffer_size() const { std::size_t buffer_size() const {
return buffer_size_; return buffer_size_;
} }
// TODO: Remove
bool ssl_verify() const { bool ssl_verify() const {
return ssl_verify_; return ssl_verify_;
} }

@ -24,11 +24,10 @@ void HttpRequestHandler::Start(std::size_t count) {
void HttpRequestHandler::Stop() { void HttpRequestHandler::Stop() {
LOG_INFO("Stopping workers..."); LOG_INFO("Stopping workers...");
// Close pending connections. // Clear pending connections.
for (HttpConnectionPtr s = queue_.Pop(); s; s = queue_.Pop()) { // The connections will be closed later (see HttpServer::DoAwaitStop).
LOG_INFO("Closing pending connection..."); LOG_INFO("Clear pending connections...");
s->Close(); queue_.Clear();
}
// Enqueue a null connection to trigger the first worker to stop. // Enqueue a null connection to trigger the first worker to stop.
queue_.Push(HttpConnectionPtr()); queue_.Push(HttpConnectionPtr());

@ -29,7 +29,7 @@ public:
// Start worker threads. // Start worker threads.
void Start(std::size_t count); void Start(std::size_t count);
// Close pending connections and stop worker threads. // Clear pending connections from the queue and stop worker threads.
void Stop(); void Stop();
private: private:

@ -11,6 +11,11 @@ HttpRequestParser::HttpRequestParser(HttpRequest* request)
: HttpParser(request), request_(request) { : HttpParser(request), request_(request) {
} }
void HttpRequestParser::Init(HttpRequest* request) {
HttpParser::Init(request);
request_ = request;
}
bool HttpRequestParser::ParseStartLine(const std::string& line) { bool HttpRequestParser::ParseStartLine(const std::string& line) {
std::vector<std::string> strs; std::vector<std::string> strs;
boost::split(strs, line, boost::is_any_of(" "), boost::token_compress_on); boost::split(strs, line, boost::is_any_of(" "), boost::token_compress_on);

@ -11,12 +11,14 @@ class HttpRequest;
class HttpRequestParser : public HttpParser { class HttpRequestParser : public HttpParser {
public: public:
explicit HttpRequestParser(HttpRequest* request); explicit HttpRequestParser(HttpRequest* request = nullptr);
~HttpRequestParser() override = default; ~HttpRequestParser() override = default;
void Init(HttpRequest* request);
private: private:
bool ParseStartLine(const std::string& line) override; bool ParseStartLine(const std::string& line) final;
HttpRequest* request_; HttpRequest* request_;
}; };

@ -2,8 +2,8 @@
#include "boost/algorithm/string.hpp" #include "boost/algorithm/string.hpp"
#include "webcc/logger.h"
#include "webcc/http_response.h" #include "webcc/http_response.h"
#include "webcc/logger.h"
namespace webcc { namespace webcc {
@ -11,6 +11,11 @@ HttpResponseParser::HttpResponseParser(HttpResponse* response)
: HttpParser(response), response_(response) { : HttpParser(response), response_(response) {
} }
void HttpResponseParser::Init(HttpResponse* response) {
HttpParser::Init(response);
response_ = response;
}
bool HttpResponseParser::ParseStartLine(const std::string& line) { bool HttpResponseParser::ParseStartLine(const std::string& line) {
std::vector<std::string> parts; std::vector<std::string> parts;
boost::split(parts, line, boost::is_any_of(" "), boost::token_compress_on); boost::split(parts, line, boost::is_any_of(" "), boost::token_compress_on);

@ -11,13 +11,15 @@ class HttpResponse;
class HttpResponseParser : public HttpParser { class HttpResponseParser : public HttpParser {
public: public:
explicit HttpResponseParser(HttpResponse* response); explicit HttpResponseParser(HttpResponse* response = nullptr);
~HttpResponseParser() override = default; ~HttpResponseParser() override = default;
void Init(HttpResponse* response);
private: private:
// Parse HTTP start line; E.g., "HTTP/1.1 200 OK". // Parse HTTP start line; E.g., "HTTP/1.1 200 OK".
bool ParseStartLine(const std::string& line) override; bool ParseStartLine(const std::string& line) final;
// The result response message. // The result response message.
HttpResponse* response_; HttpResponse* response_;

@ -96,8 +96,10 @@ void HttpServer::DoAccept() {
if (!ec) { if (!ec) {
LOG_INFO("Accepted a connection."); LOG_INFO("Accepted a connection.");
std::make_shared<HttpConnection>(std::move(socket), auto connection = std::make_shared<HttpConnection>(
GetRequestHandler())->Start(); std::move(socket), &pool_, GetRequestHandler());
pool_.Start(connection);
} }
DoAccept(); DoAccept();
@ -111,8 +113,14 @@ void HttpServer::DoAwaitStop() {
// operations. Once all operations have finished the io_context::run() // operations. Once all operations have finished the io_context::run()
// call will exit. // call will exit.
LOG_INFO("On signal %d, stopping the server...", signo); LOG_INFO("On signal %d, stopping the server...", signo);
acceptor_.close(); acceptor_.close();
// Stop worker threads.
GetRequestHandler()->Stop(); GetRequestHandler()->Stop();
// Close all connections.
pool_.CloseAll();
}); });
} }

@ -9,6 +9,7 @@
#include "webcc/globals.h" #include "webcc/globals.h"
#include "webcc/http_connection.h" #include "webcc/http_connection.h"
#include "webcc/http_connection_pool.h"
namespace webcc { namespace webcc {
@ -47,6 +48,9 @@ private:
// Acceptor used to listen for incoming connections. // Acceptor used to listen for incoming connections.
boost::asio::ip::tcp::acceptor acceptor_; boost::asio::ip::tcp::acceptor acceptor_;
// The connection pool which owns all live connections.
HttpConnectionPool pool_;
// The signal_set is used to register for process termination notifications. // The signal_set is used to register for process termination notifications.
boost::asio::signal_set signals_; boost::asio::signal_set signals_;

@ -41,6 +41,11 @@ public:
return message; return message;
} }
void Clear() {
std::lock_guard<std::mutex> lock(mutex_);
message_list_.clear();
}
void Push(const T& message) { void Push(const T& message) {
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);

@ -15,12 +15,13 @@ bool RestRequestHandler::Bind(RestServicePtr service, const std::string& url,
} }
void RestRequestHandler::HandleConnection(HttpConnectionPtr connection) { void RestRequestHandler::HandleConnection(HttpConnectionPtr connection) {
const HttpRequest& http_request = connection->request(); HttpRequestPtr http_request = connection->request();
assert(http_request);
const Url& url = http_request.url(); const Url& url = http_request->url();
RestRequest rest_request{ RestRequest rest_request{
http_request.method(), http_request.content(), url.query() http_request->method(), http_request->content(), url.query()
}; };
// Get service by URL path. // Get service by URL path.
@ -46,7 +47,7 @@ void RestRequestHandler::HandleConnection(HttpConnectionPtr connection) {
// Only support gzip for response compression. // Only support gzip for response compression.
if (rest_response.content.size() > kGzipThreshold && if (rest_response.content.size() > kGzipThreshold &&
http_request.AcceptEncodingGzip()) { http_request->AcceptEncodingGzip()) {
std::string compressed; std::string compressed;
if (Compress(rest_response.content, &compressed)) { if (Compress(rest_response.content, &compressed)) {
http_response->SetHeader(http::headers::kContentEncoding, "gzip"); http_response->SetHeader(http::headers::kContentEncoding, "gzip");

@ -16,12 +16,15 @@ bool SoapRequestHandler::Bind(SoapServicePtr service, const std::string& url) {
} }
void SoapRequestHandler::HandleConnection(HttpConnectionPtr connection) { void SoapRequestHandler::HandleConnection(HttpConnectionPtr connection) {
HttpRequestPtr http_request = connection->request();
assert(http_request);
auto http_response = std::make_shared<HttpResponse>(); auto http_response = std::make_shared<HttpResponse>();
// TODO: Support keep-alive. // TODO: Support keep-alive.
http_response->SetHeader(http::headers::kConnection, "Close"); http_response->SetHeader(http::headers::kConnection, "Close");
std::string path = "/" + connection->request().url().path(); std::string path = "/" + http_request->url().path();
SoapServicePtr service = GetServiceByUrl(path); SoapServicePtr service = GetServiceByUrl(path);
if (!service) { if (!service) {
http_response->set_status(http::Status::kBadRequest); http_response->set_status(http::Status::kBadRequest);
@ -31,7 +34,7 @@ void SoapRequestHandler::HandleConnection(HttpConnectionPtr connection) {
// Parse the SOAP request XML. // Parse the SOAP request XML.
SoapRequest soap_request; SoapRequest soap_request;
if (!soap_request.FromXml(connection->request().content())) { if (!soap_request.FromXml(http_request->content())) {
http_response->set_status(http::Status::kBadRequest); http_response->set_status(http::Status::kBadRequest);
connection->SendResponse(http_response); connection->SendResponse(http_response);
return; return;

@ -3,7 +3,6 @@
#include <iosfwd> #include <iosfwd>
#include <string> #include <string>
#include <vector>
#include "boost/asio/ip/tcp.hpp" #include "boost/asio/ip/tcp.hpp"
@ -23,25 +22,6 @@ std::string EndpointToString(const TcpEndpoint& endpoint);
// See: https://tools.ietf.org/html/rfc7231#section-7.1.1.2 // See: https://tools.ietf.org/html/rfc7231#section-7.1.1.2
std::string GetHttpDateTimestamp(); std::string GetHttpDateTimestamp();
// Resize a buffer in ctor and restore its original size in dtor.
struct BufferResizer {
BufferResizer(std::vector<char>* buffer, std::size_t new_size)
: buffer_(buffer), old_size_(buffer->size()) {
if (new_size != 0 && new_size != old_size_) {
buffer_->resize(new_size);
}
}
~BufferResizer() {
if (buffer_->size() != old_size_) {
buffer_->resize(old_size_);
}
}
std::vector<char>* buffer_;
std::size_t old_size_;
};
} // namespace webcc } // namespace webcc
#endif // WEBCC_UTILITY_H_ #endif // WEBCC_UTILITY_H_

Loading…
Cancel
Save