[HTTPRequest] Body streaming decompresison.

Using a StreamPeerGZIP (which acts as a ringbuffer).
This commit is contained in:
Fabio Alessandrelli 2022-07-31 18:02:41 +02:00
parent 273ba0794f
commit 98047e791a
2 changed files with 42 additions and 38 deletions

View File

@ -48,6 +48,7 @@ Error HTTPRequest::_parse_url(const String &p_url) {
body_len = -1;
body.clear();
downloaded.set(0);
final_body_size.set(0);
redirections = 0;
String scheme;
@ -198,6 +199,7 @@ void HTTPRequest::cancel_request() {
}
file.unref();
decompressor.unref();
client->close();
body.clear();
got_response = false;
@ -219,6 +221,9 @@ bool HTTPRequest::_handle_response(bool *ret_value) {
client->get_response_headers(&rheaders);
response_headers.clear();
downloaded.set(0);
final_body_size.set(0);
decompressor.unref();
for (const String &E : rheaders) {
response_headers.push_back(E);
}
@ -259,6 +264,7 @@ bool HTTPRequest::_handle_response(bool *ret_value) {
body_len = -1;
body.clear();
downloaded.set(0);
final_body_size.set(0);
redirections = new_redirs;
*ret_value = false;
return true;
@ -266,6 +272,19 @@ bool HTTPRequest::_handle_response(bool *ret_value) {
}
}
// Check if we need to start streaming decompression.
String content_encoding;
if (accept_gzip) {
content_encoding = get_header_value(response_headers, "Content-Encoding").to_lower();
}
if (content_encoding == "gzip") {
decompressor.instantiate();
decompressor->start_decompression(false, get_download_chunk_size() * 2);
} else if (content_encoding == "deflate") {
decompressor.instantiate();
decompressor->start_decompression(true, get_download_chunk_size() * 2);
}
return false;
}
@ -375,9 +394,28 @@ bool HTTPRequest::_update_connection() {
}
PackedByteArray chunk = client->read_response_body_chunk();
downloaded.add(chunk.size());
// Decompress chunk if needed.
if (decompressor.is_valid()) {
Error err = decompressor->put_data(chunk.ptr(), chunk.size());
if (err == OK) {
chunk.resize(decompressor->get_available_bytes());
err = decompressor->get_data(chunk.ptrw(), chunk.size());
}
if (err != OK) {
call_deferred(SNAME("_request_done"), RESULT_BODY_DECOMPRESS_FAILED, response_code, response_headers, PackedByteArray());
return true;
}
}
final_body_size.add(chunk.size());
if (body_size_limit >= 0 && final_body_size.get() > body_size_limit) {
call_deferred(SNAME("_request_done"), RESULT_BODY_SIZE_LIMIT_EXCEEDED, response_code, response_headers, PackedByteArray());
return true;
}
if (chunk.size()) {
downloaded.add(chunk.size());
if (file.is_valid()) {
const uint8_t *r = chunk.ptr();
file->store_buffer(r, chunk.size());
@ -390,11 +428,6 @@ bool HTTPRequest::_update_connection() {
}
}
if (body_size_limit >= 0 && downloaded.get() > body_size_limit) {
call_deferred(SNAME("_request_done"), RESULT_BODY_SIZE_LIMIT_EXCEEDED, response_code, response_headers, PackedByteArray());
return true;
}
if (body_len >= 0) {
if (downloaded.get() == body_len) {
call_deferred(SNAME("_request_done"), RESULT_SUCCESS, response_code, response_headers, body);
@ -425,38 +458,6 @@ bool HTTPRequest::_update_connection() {
void HTTPRequest::_request_done(int p_status, int p_code, const PackedStringArray &p_headers, const PackedByteArray &p_data) {
cancel_request();
// Determine if the request body is compressed.
bool is_compressed;
String content_encoding = get_header_value(p_headers, "Content-Encoding").to_lower();
Compression::Mode mode;
if (content_encoding == "gzip") {
mode = Compression::Mode::MODE_GZIP;
is_compressed = true;
} else if (content_encoding == "deflate") {
mode = Compression::Mode::MODE_DEFLATE;
is_compressed = true;
} else {
is_compressed = false;
}
if (accept_gzip && is_compressed && p_data.size() > 0) {
// Decompress request body
PackedByteArray decompressed;
int result = Compression::decompress_dynamic(&decompressed, body_size_limit, p_data.ptr(), p_data.size(), mode);
if (result == OK) {
emit_signal(SNAME("request_completed"), p_status, p_code, p_headers, decompressed);
return;
} else if (result == -5) {
WARN_PRINT("Decompressed size of HTTP response body exceeded body_size_limit");
p_status = RESULT_BODY_SIZE_LIMIT_EXCEEDED;
// Just return the raw data if we failed to decompress it.
} else {
WARN_PRINT("Failed to decompress HTTP response body");
p_status = RESULT_BODY_DECOMPRESS_FAILED;
// Just return the raw data if we failed to decompress it.
}
}
emit_signal(SNAME("request_completed"), p_status, p_code, p_headers, p_data);
}

View File

@ -32,6 +32,7 @@
#define HTTP_REQUEST_H
#include "core/io/http_client.h"
#include "core/io/stream_peer_gzip.h"
#include "core/os/thread.h"
#include "core/templates/safe_refcount.h"
#include "scene/main/node.h"
@ -84,10 +85,12 @@ private:
String download_to_file;
Ref<StreamPeerGZIP> decompressor;
Ref<FileAccess> file;
int body_len = -1;
SafeNumeric<int> downloaded;
SafeNumeric<int> final_body_size;
int body_size_limit = -1;
int redirections = 0;