From 66cab76ce661b749d4571bb720ed19ec77fb96fd Mon Sep 17 00:00:00 2001 From: Chunting Gu Date: Mon, 12 Aug 2019 17:32:29 +0800 Subject: [PATCH] Rework the data streaming --- webcc/client.cc | 8 +- webcc/parser.cc | 161 ++++++++++++++------------------------- webcc/parser.h | 68 ++++++++--------- webcc/request_parser.cc | 15 ++-- webcc/request_parser.h | 4 +- webcc/response_parser.cc | 21 +++-- webcc/response_parser.h | 12 ++- 7 files changed, 122 insertions(+), 167 deletions(-) diff --git a/webcc/client.cc b/webcc/client.cc index 8336e3b..b47b0bb 100644 --- a/webcc/client.cc +++ b/webcc/client.cc @@ -21,13 +21,7 @@ Error Client::Request(RequestPtr request, bool connect, bool stream) { error_ = Error{}; response_.reset(new Response{}); - - if (!response_parser_.Init(response_.get(), stream)) { - // Failed to generate the temp file for streaming. - // I don't know when this would happen. Keep the error handling here just - // for preciseness. - return Error{ Error::kFileError, "Streaming temp file error" }; - } + response_parser_.Init(response_.get(), stream); if (buffer_.size() != buffer_size_) { LOG_VERB("Resize buffer: %u -> %u.", buffer_.size(), buffer_size_); diff --git a/webcc/parser.cc b/webcc/parser.cc index bc7010d..04d4837 100644 --- a/webcc/parser.cc +++ b/webcc/parser.cc @@ -17,65 +17,29 @@ namespace webcc { // ----------------------------------------------------------------------------- -ParseHandlerBase::ParseHandlerBase(Message* message) - : message_(message), content_length_(kInvalidLength) { -} - -void ParseHandlerBase::OnStartLine(const std::string& start_line) { - message_->set_start_line(start_line); -} - -void ParseHandlerBase::OnContentLength(std::size_t content_length) { - content_length_ = content_length; -} - -void ParseHandlerBase::OnHeader(Header&& header) { - message_->SetHeader(std::move(header)); -} - -bool ParseHandlerBase::IsCompressed() const { +bool BodyHandler::IsCompressed() const { return message_->GetContentEncoding() != ContentEncoding::kUnknown; } // ----------------------------------------------------------------------------- -ParseHandler::ParseHandler(Message* message) : ParseHandlerBase(message) { -} - -void ParseHandler::OnContentLength(std::size_t content_length) { - ParseHandlerBase::OnContentLength(content_length); - - // Reserve memory to avoid frequent reallocation when append. - try { - content_.reserve(content_length_); - } catch (const std::exception& e) { - LOG_ERRO("Failed to reserve content memory: %s.", e.what()); - } -} +// TODO +// // Reserve memory to avoid frequent reallocation when append. +// try { +// content_.reserve(content_length_); +// } catch (const std::exception& e) { +// LOG_ERRO("Failed to reserve content memory: %s.", e.what()); +// } -void ParseHandler::AddContent(const char* data, std::size_t count) { +void StringBodyHandler::AddContent(const char* data, std::size_t count) { content_.append(data, count); } -void ParseHandler::AddContent(const std::string& data) { +void StringBodyHandler::AddContent(const std::string& data) { content_.append(data); } -bool ParseHandler::IsFixedContentFull() const { - if (content_length_ == kInvalidLength) { - // Shouldn't be here. - // See Parser::ParseFixedContent(). - return false; - } - - return content_.length() >= content_length_; -} - -bool ParseHandler::Finish() { - // Could be `kInvalidLength` (chunked). - // Could be `0` (empty body and `Content-Length : 0`). - message_->set_content_length(content_length_); - +bool StringBodyHandler::Finish() { if (content_.empty()) { // The call to message_->SetBody() is not necessary since message is // always initialized with an empty body. @@ -95,60 +59,41 @@ bool ParseHandler::Finish() { #endif // WEBCC_ENABLE_GZIP message_->SetBody(body, false); + return true; } // ----------------------------------------------------------------------------- -StreamedParseHandler::StreamedParseHandler(Message* message) - : ParseHandlerBase(message) { -} - -bool StreamedParseHandler::Init() { +FileBodyHandler::FileBodyHandler(Message* message) : BodyHandler(message) { try { temp_path_ = bfs::temp_directory_path() / bfs::unique_path(); LOG_VERB("Generate a temp path for streaming: %s", temp_path_.string().c_str()); } catch (const bfs::filesystem_error&) { LOG_ERRO("Failed to generate temp path: %s", temp_path_.string().c_str()); - return false; + throw Error{ Error::kFileError }; } ofstream_.open(temp_path_, std::ios::binary); if (ofstream_.fail()) { LOG_ERRO("Failed to open the temp file: %s", temp_path_.string().c_str()); - return false; + throw Error{ Error::kFileError }; } - - return true; } -void StreamedParseHandler::AddContent(const char* data, std::size_t count) { +void FileBodyHandler::AddContent(const char* data, std::size_t count) { ofstream_.write(data, count); streamed_size_ += count; } -void StreamedParseHandler::AddContent(const std::string& data) { +void FileBodyHandler::AddContent(const std::string& data) { ofstream_ << data; streamed_size_ += data.size(); } -bool StreamedParseHandler::IsFixedContentFull() const { - if (content_length_ == kInvalidLength) { - // Shouldn't be here. - // See Parser::ParseFixedContent(). - return false; - } - - return streamed_size_ >= content_length_; -} - -bool StreamedParseHandler::Finish() { - // Could be `kInvalidLength` (chunked). - // Could be `0` (empty body and `Content-Length : 0`). - message_->set_content_length(content_length_); - +bool FileBodyHandler::Finish() { ofstream_.close(); // Create a file body based on the streamed temp file. @@ -157,35 +102,19 @@ bool StreamedParseHandler::Finish() { // TODO: Compress message_->SetBody(body, false); + return true; } // ----------------------------------------------------------------------------- -Parser::Parser() - : start_line_parsed_(false), - content_length_parsed_(false), - header_ended_(false), - chunked_(false), - chunk_size_(kInvalidLength), - finished_(false) { +Parser::Parser() { + Reset(); } -bool Parser::Init(Message* message, bool stream) { +void Parser::Init(Message* message) { Reset(); - - if (stream) { - handler_.reset(new StreamedParseHandler{ message }); - } else { - handler_.reset(new ParseHandler{ message }); - } - - if (!handler_->Init()) { - // Failed to generate temp file for streaming. - return false; - } - - return true; + message_ = message; } bool Parser::Parse(const char* data, std::size_t length) { @@ -207,13 +136,26 @@ bool Parser::Parse(const char* data, std::size_t length) { LOG_INFO("HTTP headers just ended."); + CreateBodyHandler(); + + if (!body_handler_) { + // The only reason to reach here is that it was failed to generate the temp + // file for streaming. Normally, it shouldn't happen. + // TODO: Keep a member |error_| for the user to query. + return false; + } + // The left data, if any, is still in the pending data. return ParseContent("", 0); } void Parser::Reset() { + message_ = nullptr; + body_handler_.reset(); + pending_data_.clear(); + content_length_ = kInvalidLength; content_type_.Reset(); start_line_parsed_ = false; content_length_parsed_ = false; @@ -242,7 +184,7 @@ bool Parser::ParseHeaders() { if (!start_line_parsed_) { start_line_parsed_ = true; - handler_->OnStartLine(line); + message_->set_start_line(line); if (!ParseStartLine(line)) { return false; @@ -297,7 +239,7 @@ bool Parser::ParseHeaderLine(const std::string& line) { } LOG_INFO("Content length: %u.", content_length); - handler_->OnContentLength(content_length); + content_length_ = content_length; } else if (boost::iequals(header.first, headers::kContentType)) { content_type_.Parse(header.second); @@ -312,7 +254,8 @@ bool Parser::ParseHeaderLine(const std::string& line) { } } - handler_->OnHeader(std::move(header)); + message_->SetHeader(std::move(header)); + return true; } @@ -331,21 +274,21 @@ bool Parser::ParseFixedContent(const char* data, std::size_t length) { return true; } - if (handler_->content_length() == kInvalidLength) { + if (content_length_ == kInvalidLength) { // Invalid content length (syntax error). return false; } if (!pending_data_.empty()) { // This is the data left after the headers are parsed. - handler_->AddContent(pending_data_); + body_handler_->AddContent(pending_data_); pending_data_.clear(); } // Don't have to firstly put the data to the pending data. - handler_->AddContent(data, length); + body_handler_->AddContent(data, length); - if (handler_->IsFixedContentFull()) { + if (IsFixedContentFull()) { // All content has been read. Finish(); } @@ -372,7 +315,7 @@ bool Parser::ParseChunkedContent(const char* data, std::size_t length) { } if (chunk_size_ + 2 <= pending_data_.size()) { // +2 for CRLF - handler_->AddContent(pending_data_.c_str(), chunk_size_); + body_handler_->AddContent(pending_data_.c_str(), chunk_size_); pending_data_.erase(0, chunk_size_ + 2); @@ -383,7 +326,7 @@ bool Parser::ParseChunkedContent(const char* data, std::size_t length) { continue; } else if (chunk_size_ > pending_data_.size()) { - handler_->AddContent(pending_data_); + body_handler_->AddContent(pending_data_); chunk_size_ -= pending_data_.size(); @@ -431,9 +374,19 @@ bool Parser::ParseChunkSize() { return true; } +bool Parser::IsFixedContentFull() const { + assert(content_length_ != kInvalidLength); + return body_handler_->GetContentLength() >= content_length_; +} + bool Parser::Finish() { finished_ = true; - return handler_->Finish(); + + // Could be `kInvalidLength` (chunked). + // Could be `0` (empty body and `Content-Length : 0`). + message_->set_content_length(content_length_); + + return body_handler_->Finish(); } } // namespace webcc diff --git a/webcc/parser.h b/webcc/parser.h index ace5729..7294bab 100644 --- a/webcc/parser.h +++ b/webcc/parser.h @@ -14,29 +14,18 @@ class Message; // ----------------------------------------------------------------------------- -class ParseHandlerBase { +class BodyHandler { public: - ParseHandlerBase(Message* message); - - virtual ~ParseHandlerBase() = default; - - virtual bool Init() = 0; - - std::size_t content_length() const { - return content_length_; + explicit BodyHandler(Message* message) : message_(message) { } - void OnStartLine(const std::string& start_line); - - void OnHeader(Header&& header); - - virtual void OnContentLength(std::size_t content_length); + virtual ~BodyHandler() = default; virtual void AddContent(const char* data, std::size_t count) = 0; virtual void AddContent(const std::string& data) = 0; - virtual bool IsFixedContentFull() const = 0; + virtual std::size_t GetContentLength() const = 0; virtual bool Finish() = 0; @@ -45,46 +34,46 @@ protected: protected: Message* message_; - std::size_t content_length_; }; -class ParseHandler : public ParseHandlerBase { -public: - explicit ParseHandler(Message* message); - - ~ParseHandler() override = default; +// ----------------------------------------------------------------------------- - bool Init() override { - return true; +class StringBodyHandler : public BodyHandler { +public: + explicit StringBodyHandler(Message* message) : BodyHandler(message) { } - void OnContentLength(std::size_t content_length) override; + ~StringBodyHandler() override = default; void AddContent(const char* data, std::size_t count) override; void AddContent(const std::string& data) override; - bool IsFixedContentFull() const override; + std::size_t GetContentLength() const override { + return content_.size(); + } + bool Finish() override; private: std::string content_; }; -// If |stream| is true, the data will be streamed to a temp file, and the -// body of the message will be FileBody instead of StringBody. -class StreamedParseHandler : public ParseHandlerBase { -public: - explicit StreamedParseHandler(Message* message); +// ----------------------------------------------------------------------------- - ~StreamedParseHandler() override = default; +class FileBodyHandler : public BodyHandler { +public: + // NOTE: Might throw Error::kFileError. + explicit FileBodyHandler(Message* message); - // Generate a temp file. - bool Init() override; + ~FileBodyHandler() override = default; void AddContent(const char* data, std::size_t count) override; void AddContent(const std::string& data) override; - bool IsFixedContentFull() const override; + std::size_t GetContentLength() const override { + return streamed_size_; + } + bool Finish() override; private: @@ -104,7 +93,7 @@ public: Parser(const Parser&) = delete; Parser& operator=(const Parser&) = delete; - bool Init(Message* message, bool stream = false); + void Init(Message* message); bool finished() const { return finished_; @@ -113,13 +102,14 @@ public: bool Parse(const char* data, std::size_t length); protected: - // Reset for next parse. void Reset(); // Parse headers from pending data. // Return false only on syntax errors. bool ParseHeaders(); + virtual void CreateBodyHandler() = 0; + // Get next line (using delimiter CRLF) from the pending data. // The line will not contain a trailing CRLF. // If |erase| is true, the line, as well as the trailing CRLF, will be erased @@ -137,16 +127,20 @@ protected: bool ParseChunkedContent(const char* data, std::size_t length); bool ParseChunkSize(); + bool IsFixedContentFull() const; + // Return false if the compressed content cannot be decompressed. bool Finish(); protected: - std::unique_ptr handler_; + Message* message_; + std::unique_ptr body_handler_; // Data waiting to be parsed. std::string pending_data_; // Temporary data and helper flags for parsing. + std::size_t content_length_; ContentType content_type_; bool start_line_parsed_; bool content_length_parsed_; diff --git a/webcc/request_parser.cc b/webcc/request_parser.cc index 900c944..c716e35 100644 --- a/webcc/request_parser.cc +++ b/webcc/request_parser.cc @@ -13,13 +13,14 @@ namespace webcc { RequestParser::RequestParser() : request_(nullptr) { } -bool RequestParser::Init(Request* request, bool stream) { - if (!Parser::Init(request, stream)) { - return false; - } - +void RequestParser::Init(Request* request) { + Parser::Init(request); request_ = request; - return true; +} + +// TODO +void RequestParser::CreateBodyHandler() { + body_handler_.reset(new StringBodyHandler{ message_ }); } bool RequestParser::ParseStartLine(const std::string& line) { @@ -54,7 +55,7 @@ bool RequestParser::ParseMultipartContent(const char* data, std::size_t length) { pending_data_.append(data, length); - if (!content_length_parsed_ || handler_->content_length() == kInvalidLength) { + if (!content_length_parsed_ || content_length_ == kInvalidLength) { // Invalid content length (syntax error). return false; } diff --git a/webcc/request_parser.h b/webcc/request_parser.h index 1607f63..61e68b0 100644 --- a/webcc/request_parser.h +++ b/webcc/request_parser.h @@ -15,9 +15,11 @@ public: ~RequestParser() override = default; - bool Init(Request* request, bool stream = false); + void Init(Request* request); private: + void CreateBodyHandler() override; + bool ParseStartLine(const std::string& line) override; // Override to handle multipart form data which is request only. diff --git a/webcc/response_parser.cc b/webcc/response_parser.cc index 4d44bbe..28eba43 100644 --- a/webcc/response_parser.cc +++ b/webcc/response_parser.cc @@ -38,16 +38,23 @@ void SplitStartLine(const std::string& line, std::vector* parts) { // ----------------------------------------------------------------------------- -ResponseParser::ResponseParser() : response_(nullptr) { +void ResponseParser::Init(Response* response, bool stream) { + Parser::Init(response); + + response_ = response; + stream_ = stream; } -bool ResponseParser::Init(Response* response, bool stream) { - if (!Parser::Init(response, stream)) { - return false; +void ResponseParser::CreateBodyHandler() { + if (stream_) { + try { + body_handler_.reset(new FileBodyHandler{ message_ }); + } catch (const Error&) { + body_handler_.reset(); + } + } else { + body_handler_.reset(new StringBodyHandler{ message_ }); } - - response_ = response; - return true; } bool ResponseParser::ParseStartLine(const std::string& line) { diff --git a/webcc/response_parser.h b/webcc/response_parser.h index 1f411a4..d165f6c 100644 --- a/webcc/response_parser.h +++ b/webcc/response_parser.h @@ -11,17 +11,18 @@ class Response; class ResponseParser : public Parser { public: - ResponseParser(); - + ResponseParser() = default; ~ResponseParser() override = default; - bool Init(Response* response, bool stream = false); + void Init(Response* response, bool stream = false); void set_ignroe_body(bool ignroe_body) { ignroe_body_ = ignroe_body; } private: + void CreateBodyHandler() override; + // Parse HTTP start line; E.g., "HTTP/1.1 200 OK". bool ParseStartLine(const std::string& line) override; @@ -30,7 +31,10 @@ private: private: // The result response message. - Response* response_; + Response* response_ = nullptr; + + // Data streaming or not. + bool stream_ = false; // The response for HEAD request could also have `Content-Length` header, // set this flag to ignore it.