Merge pull request #22940 from Faless/lws_buffers_bis

Better buffering for WebSocket
This commit is contained in:
Rémi Verschelde 2018-11-13 09:54:06 +01:00 committed by GitHub
commit b2f96b2892
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 292 additions and 149 deletions

View File

@ -135,6 +135,12 @@ public:
return p_n; return p_n;
}; };
inline int decrease_write(int p_n) {
p_n = MIN(p_n, data_left());
inc(write_pos, size_mask + 1 - p_n);
return p_n;
}
Error write(const T &p_v) { Error write(const T &p_v) {
ERR_FAIL_COND_V(space_left() < 1, FAILED); ERR_FAIL_COND_V(space_left() < 1, FAILED);
data.write[inc(write_pos, 1)] = p_v; data.write[inc(write_pos, 1)] = p_v;

View File

@ -31,6 +31,7 @@
#include "emws_client.h" #include "emws_client.h"
#include "core/io/ip.h" #include "core/io/ip.h"
#include "core/project_settings.h"
#include "emscripten.h" #include "emscripten.h"
extern "C" { extern "C" {
@ -43,8 +44,9 @@ EMSCRIPTEN_KEEPALIVE void _esws_on_connect(void *obj, char *proto) {
EMSCRIPTEN_KEEPALIVE void _esws_on_message(void *obj, uint8_t *p_data, int p_data_size, int p_is_string) { EMSCRIPTEN_KEEPALIVE void _esws_on_message(void *obj, uint8_t *p_data, int p_data_size, int p_is_string) {
EMWSClient *client = static_cast<EMWSClient *>(obj); EMWSClient *client = static_cast<EMWSClient *>(obj);
static_cast<EMWSPeer *>(*client->get_peer(1))->read_msg(p_data, p_data_size, p_is_string == 1); Error err = static_cast<EMWSPeer *>(*client->get_peer(1))->read_msg(p_data, p_data_size, p_is_string == 1);
client->_on_peer_packet(); if (err == OK)
client->_on_peer_packet();
} }
EMSCRIPTEN_KEEPALIVE void _esws_on_error(void *obj) { EMSCRIPTEN_KEEPALIVE void _esws_on_error(void *obj) {
@ -159,7 +161,7 @@ Error EMWSClient::connect_to_host(String p_host, String p_path, uint16_t p_port,
}, _js_id, str.utf8().get_data(), proto_string.utf8().get_data()); }, _js_id, str.utf8().get_data(), proto_string.utf8().get_data());
/* clang-format on */ /* clang-format on */
static_cast<Ref<EMWSPeer> >(_peer)->set_sock(peer_sock); static_cast<Ref<EMWSPeer> >(_peer)->set_sock(peer_sock, _in_buf_size, _in_pkt_size);
return OK; return OK;
}; };
@ -198,7 +200,13 @@ uint16_t EMWSClient::get_connected_port() const {
return 1025; return 1025;
}; };
int EMWSClient::get_max_packet_size() const {
return (1 << _in_buf_size) - PROTO_SIZE;
}
EMWSClient::EMWSClient() { EMWSClient::EMWSClient() {
_in_buf_size = GLOBAL_GET(WSC_IN_BUF);
_in_pkt_size = GLOBAL_GET(WSC_IN_PKT);
_is_connecting = false; _is_connecting = false;
_peer = Ref<EMWSPeer>(memnew(EMWSPeer)); _peer = Ref<EMWSPeer>(memnew(EMWSPeer));
/* clang-format off */ /* clang-format off */

View File

@ -41,6 +41,8 @@ class EMWSClient : public WebSocketClient {
GDCIIMPL(EMWSClient, WebSocketClient); GDCIIMPL(EMWSClient, WebSocketClient);
private: private:
int _in_buf_size;
int _in_pkt_size;
int _js_id; int _js_id;
public: public:
@ -52,6 +54,7 @@ public:
IP_Address get_connected_host() const; IP_Address get_connected_host() const;
uint16_t get_connected_port() const; uint16_t get_connected_port() const;
virtual ConnectionStatus get_connection_status() const; virtual ConnectionStatus get_connection_status() const;
int get_max_packet_size() const;
virtual void poll(); virtual void poll();
EMWSClient(); EMWSClient();
~EMWSClient(); ~EMWSClient();

View File

@ -32,11 +32,11 @@
#include "emws_peer.h" #include "emws_peer.h"
#include "core/io/ip.h" #include "core/io/ip.h"
void EMWSPeer::set_sock(int p_sock) { void EMWSPeer::set_sock(int p_sock, unsigned int p_in_buf_size, unsigned int p_in_pkt_size) {
peer_sock = p_sock; peer_sock = p_sock;
in_buffer.clear(); _in_buffer.resize(p_in_pkt_size, p_in_buf_size);
queue_count = 0; _packet_buffer.resize((1 << p_in_buf_size));
} }
void EMWSPeer::set_write_mode(WriteMode p_mode) { void EMWSPeer::set_write_mode(WriteMode p_mode) {
@ -47,18 +47,10 @@ EMWSPeer::WriteMode EMWSPeer::get_write_mode() const {
return write_mode; return write_mode;
} }
void EMWSPeer::read_msg(uint8_t *p_data, uint32_t p_size, bool p_is_string) { Error EMWSPeer::read_msg(uint8_t *p_data, uint32_t p_size, bool p_is_string) {
if (in_buffer.space_left() < p_size + 5) {
ERR_EXPLAIN("Buffer full! Dropping data");
ERR_FAIL();
}
uint8_t is_string = p_is_string ? 1 : 0; uint8_t is_string = p_is_string ? 1 : 0;
in_buffer.write((uint8_t *)&p_size, 4); return _in_buffer.write_packet(p_data, p_size, &is_string);
in_buffer.write((uint8_t *)&is_string, 1);
in_buffer.write(p_data, p_size);
queue_count++;
} }
Error EMWSPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size) { Error EMWSPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size) {
@ -89,40 +81,28 @@ Error EMWSPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size) {
Error EMWSPeer::get_packet(const uint8_t **r_buffer, int &r_buffer_size) { Error EMWSPeer::get_packet(const uint8_t **r_buffer, int &r_buffer_size) {
if (queue_count == 0) if (_in_buffer.packets_left() == 0)
return ERR_UNAVAILABLE; return ERR_UNAVAILABLE;
uint32_t to_read = 0; PoolVector<uint8_t>::Write rw = _packet_buffer.write();
uint32_t left = 0; int read = 0;
uint8_t is_string = 0; Error err = _in_buffer.read_packet(rw.ptr(), _packet_buffer.size(), &_is_string, read);
r_buffer_size = 0; ERR_FAIL_COND_V(err != OK, err);
in_buffer.read((uint8_t *)&to_read, 4); *r_buffer = rw.ptr();
--queue_count; r_buffer_size = read;
left = in_buffer.data_left();
if (left < to_read + 1) {
in_buffer.advance_read(left);
return FAILED;
}
in_buffer.read(&is_string, 1);
_was_string = is_string == 1;
in_buffer.read(packet_buffer, to_read);
*r_buffer = packet_buffer;
r_buffer_size = to_read;
return OK; return OK;
}; };
int EMWSPeer::get_available_packet_count() const { int EMWSPeer::get_available_packet_count() const {
return queue_count; return _in_buffer.packets_left();
}; };
bool EMWSPeer::was_string_packet() const { bool EMWSPeer::was_string_packet() const {
return _was_string; return _is_string;
}; };
bool EMWSPeer::is_connected_to_host() const { bool EMWSPeer::is_connected_to_host() const {
@ -143,9 +123,9 @@ void EMWSPeer::close(int p_code, String p_reason) {
}, peer_sock, p_code, p_reason.utf8().get_data()); }, peer_sock, p_code, p_reason.utf8().get_data());
/* clang-format on */ /* clang-format on */
} }
_is_string = 0;
_in_buffer.clear();
peer_sock = -1; peer_sock = -1;
queue_count = 0;
in_buffer.clear();
}; };
IP_Address EMWSPeer::get_connected_host() const { IP_Address EMWSPeer::get_connected_host() const {
@ -162,15 +142,12 @@ uint16_t EMWSPeer::get_connected_port() const {
EMWSPeer::EMWSPeer() { EMWSPeer::EMWSPeer() {
peer_sock = -1; peer_sock = -1;
queue_count = 0;
_was_string = false;
in_buffer.resize(16);
write_mode = WRITE_MODE_BINARY; write_mode = WRITE_MODE_BINARY;
close();
}; };
EMWSPeer::~EMWSPeer() { EMWSPeer::~EMWSPeer() {
in_buffer.resize(0);
close(); close();
}; };

View File

@ -36,6 +36,7 @@
#include "core/io/packet_peer.h" #include "core/io/packet_peer.h"
#include "core/ring_buffer.h" #include "core/ring_buffer.h"
#include "emscripten.h" #include "emscripten.h"
#include "packet_buffer.h"
#include "websocket_peer.h" #include "websocket_peer.h"
class EMWSPeer : public WebSocketPeer { class EMWSPeer : public WebSocketPeer {
@ -43,25 +44,20 @@ class EMWSPeer : public WebSocketPeer {
GDCIIMPL(EMWSPeer, WebSocketPeer); GDCIIMPL(EMWSPeer, WebSocketPeer);
private: private:
enum {
PACKET_BUFFER_SIZE = 65536 - 5 // 4 bytes for the size, 1 for for type
};
int peer_sock; int peer_sock;
WriteMode write_mode; WriteMode write_mode;
uint8_t packet_buffer[PACKET_BUFFER_SIZE]; PoolVector<uint8_t> _packet_buffer;
RingBuffer<uint8_t> in_buffer; PacketBuffer<uint8_t> _in_buffer;
int queue_count; uint8_t _is_string;
bool _was_string;
public: public:
void read_msg(uint8_t *p_data, uint32_t p_size, bool p_is_string); Error read_msg(uint8_t *p_data, uint32_t p_size, bool p_is_string);
void set_sock(int sock); void set_sock(int p_sock, unsigned int p_in_buf_size, unsigned int p_in_pkt_size);
virtual int get_available_packet_count() const; virtual int get_available_packet_count() const;
virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size); virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size);
virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size); virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size);
virtual int get_max_packet_size() const { return PACKET_BUFFER_SIZE; }; virtual int get_max_packet_size() const { return _packet_buffer.size(); };
virtual void close(int p_code = 1000, String p_reason = ""); virtual void close(int p_code = 1000, String p_reason = "");
virtual bool is_connected_to_host() const; virtual bool is_connected_to_host() const;
@ -72,10 +68,6 @@ public:
virtual void set_write_mode(WriteMode p_mode); virtual void set_write_mode(WriteMode p_mode);
virtual bool was_string_packet() const; virtual bool was_string_packet() const;
void set_wsi(struct lws *wsi);
Error read_wsi(void *in, size_t len);
Error write_wsi();
EMWSPeer(); EMWSPeer();
~EMWSPeer(); ~EMWSPeer();
}; };

View File

@ -74,6 +74,10 @@ void EMWSServer::disconnect_peer(int p_peer_id, int p_code, String p_reason) {
void EMWSServer::poll() { void EMWSServer::poll() {
} }
int EMWSServer::get_max_packet_size() const {
return 0;
}
EMWSServer::EMWSServer() { EMWSServer::EMWSServer() {
} }

View File

@ -49,6 +49,7 @@ public:
IP_Address get_peer_address(int p_peer_id) const; IP_Address get_peer_address(int p_peer_id) const;
int get_peer_port(int p_peer_id) const; int get_peer_port(int p_peer_id) const;
void disconnect_peer(int p_peer_id, int p_code = 1000, String p_reason = ""); void disconnect_peer(int p_peer_id, int p_code = 1000, String p_reason = "");
int get_max_packet_size() const;
virtual void poll(); virtual void poll();
virtual PoolVector<String> get_protocols() const; virtual PoolVector<String> get_protocols() const;

View File

@ -32,6 +32,7 @@
#include "lws_client.h" #include "lws_client.h"
#include "core/io/ip.h" #include "core/io/ip.h"
#include "core/io/stream_peer_ssl.h" #include "core/io/stream_peer_ssl.h"
#include "core/project_settings.h"
#include "tls/mbedtls/wrapper/include/openssl/ssl.h" #include "tls/mbedtls/wrapper/include/openssl/ssl.h"
Error LWSClient::connect_to_host(String p_host, String p_path, uint16_t p_port, bool p_ssl, PoolVector<String> p_protocols) { Error LWSClient::connect_to_host(String p_host, String p_path, uint16_t p_port, bool p_ssl, PoolVector<String> p_protocols) {
@ -104,6 +105,10 @@ Error LWSClient::connect_to_host(String p_host, String p_path, uint16_t p_port,
return OK; return OK;
}; };
int LWSClient::get_max_packet_size() const {
return (1 << _out_buf_size) - PROTO_SIZE;
}
void LWSClient::poll() { void LWSClient::poll() {
_lws_poll(); _lws_poll();
@ -124,7 +129,7 @@ int LWSClient::_handle_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
} break; } break;
case LWS_CALLBACK_CLIENT_ESTABLISHED: case LWS_CALLBACK_CLIENT_ESTABLISHED:
peer->set_wsi(wsi); peer->set_wsi(wsi, _in_buf_size, _in_pkt_size, _out_buf_size, _out_pkt_size);
peer_data->peer_id = 0; peer_data->peer_id = 0;
peer_data->force_close = false; peer_data->force_close = false;
peer_data->clean_close = false; peer_data->clean_close = false;
@ -207,6 +212,11 @@ uint16_t LWSClient::get_connected_port() const {
}; };
LWSClient::LWSClient() { LWSClient::LWSClient() {
_in_buf_size = nearest_shift((int)GLOBAL_GET(WSC_IN_BUF) - 1) + 10;
_in_pkt_size = nearest_shift((int)GLOBAL_GET(WSC_IN_PKT) - 1);
_out_buf_size = nearest_shift((int)GLOBAL_GET(WSC_OUT_BUF) - 1) + 10;
_out_pkt_size = nearest_shift((int)GLOBAL_GET(WSC_OUT_PKT) - 1);
context = NULL; context = NULL;
_lws_ref = NULL; _lws_ref = NULL;
_peer = Ref<LWSPeer>(memnew(LWSPeer)); _peer = Ref<LWSPeer>(memnew(LWSPeer));

View File

@ -43,8 +43,15 @@ class LWSClient : public WebSocketClient {
LWS_HELPER(LWSClient); LWS_HELPER(LWSClient);
private:
int _in_buf_size;
int _in_pkt_size;
int _out_buf_size;
int _out_pkt_size;
public: public:
Error connect_to_host(String p_host, String p_path, uint16_t p_port, bool p_ssl, PoolVector<String> p_protocol = PoolVector<String>()); Error connect_to_host(String p_host, String p_path, uint16_t p_port, bool p_ssl, PoolVector<String> p_protocol = PoolVector<String>());
int get_max_packet_size() const;
Ref<WebSocketPeer> get_peer(int p_peer_id) const; Ref<WebSocketPeer> get_peer(int p_peer_id) const;
void disconnect_from_host(int p_code = 1000, String p_reason = ""); void disconnect_from_host(int p_code = 1000, String p_reason = "");
IP_Address get_connected_host() const; IP_Address get_connected_host() const;

View File

@ -41,11 +41,12 @@
#include "drivers/unix/net_socket_posix.h" #include "drivers/unix/net_socket_posix.h"
void LWSPeer::set_wsi(struct lws *p_wsi) { void LWSPeer::set_wsi(struct lws *p_wsi, unsigned int p_in_buf_size, unsigned int p_in_pkt_size, unsigned int p_out_buf_size, unsigned int p_out_pkt_size) {
ERR_FAIL_COND(wsi != NULL); ERR_FAIL_COND(wsi != NULL);
rbw.resize(16); _in_buffer.resize(p_in_pkt_size, p_in_buf_size);
rbr.resize(16); _out_buffer.resize(p_out_pkt_size, p_out_buf_size);
_packet_buffer.resize((1 << MAX(p_in_buf_size, p_out_buf_size)) + LWS_PRE);
wsi = p_wsi; wsi = p_wsi;
}; };
@ -61,24 +62,29 @@ Error LWSPeer::read_wsi(void *in, size_t len) {
ERR_FAIL_COND_V(!is_connected_to_host(), FAILED); ERR_FAIL_COND_V(!is_connected_to_host(), FAILED);
uint32_t size = in_size; if (lws_is_first_fragment(wsi))
uint8_t is_string = lws_frame_is_binary(wsi) ? 0 : 1; _in_size = 0;
else if (_in_size == -1) // Trash this frame
return ERR_FILE_CORRUPT;
if (rbr.space_left() < len + 5) { Error err = _in_buffer.write_packet((const uint8_t *)in, len, NULL);
ERR_EXPLAIN("Buffer full! Dropping data");
ERR_FAIL_V(FAILED); if (err != OK) {
_in_buffer.discard_payload(_in_size);
_in_size = -1;
ERR_FAIL_V(err);
} }
copymem(&(input_buffer[size]), in, len); _in_size += len;
size += len;
in_size = size;
if (lws_is_final_fragment(wsi)) { if (lws_is_final_fragment(wsi)) {
rbr.write((uint8_t *)&size, 4); uint8_t is_string = lws_frame_is_binary(wsi) ? 0 : 1;
rbr.write((uint8_t *)&is_string, 1); err = _in_buffer.write_packet(NULL, _in_size, &is_string);
rbr.write(input_buffer, size); if (err != OK) {
in_count++; _in_buffer.discard_payload(_in_size);
in_size = 0; _in_size = -1;
ERR_FAIL_V(err);
}
} }
return OK; return OK;
@ -89,26 +95,20 @@ Error LWSPeer::write_wsi() {
ERR_FAIL_COND_V(!is_connected_to_host(), FAILED); ERR_FAIL_COND_V(!is_connected_to_host(), FAILED);
PoolVector<uint8_t> tmp; PoolVector<uint8_t> tmp;
int left = rbw.data_left(); int count = _out_buffer.packets_left();
uint32_t to_write = 0;
if (left == 0 || out_count == 0) if (count == 0)
return OK; return OK;
rbw.read((uint8_t *)&to_write, 4); int read = 0;
out_count--; uint8_t is_string;
PoolVector<uint8_t>::Write rw = _packet_buffer.write();
_out_buffer.read_packet(&(rw[LWS_PRE]), _packet_buffer.size() - LWS_PRE, &is_string, read);
if (left < to_write) { enum lws_write_protocol mode = is_string ? LWS_WRITE_TEXT : LWS_WRITE_BINARY;
rbw.advance_read(left); lws_write(wsi, &(rw[LWS_PRE]), read, mode);
return FAILED;
}
tmp.resize(LWS_PRE + to_write); if (count > 1)
rbw.read(&(tmp.write()[LWS_PRE]), to_write);
lws_write(wsi, &(tmp.write()[LWS_PRE]), to_write, (enum lws_write_protocol)write_mode);
tmp.resize(0);
if (out_count > 0)
lws_callback_on_writable(wsi); // we want to write more! lws_callback_on_writable(wsi); // we want to write more!
return OK; return OK;
@ -118,40 +118,27 @@ Error LWSPeer::put_packet(const uint8_t *p_buffer, int p_buffer_size) {
ERR_FAIL_COND_V(!is_connected_to_host(), FAILED); ERR_FAIL_COND_V(!is_connected_to_host(), FAILED);
rbw.write((uint8_t *)&p_buffer_size, 4); uint8_t is_string = write_mode == WRITE_MODE_TEXT;
rbw.write(p_buffer, MIN(p_buffer_size, rbw.space_left())); _out_buffer.write_packet(p_buffer, p_buffer_size, &is_string);
out_count++;
lws_callback_on_writable(wsi); // notify that we want to write lws_callback_on_writable(wsi); // notify that we want to write
return OK; return OK;
}; };
Error LWSPeer::get_packet(const uint8_t **r_buffer, int &r_buffer_size) { Error LWSPeer::get_packet(const uint8_t **r_buffer, int &r_buffer_size) {
ERR_FAIL_COND_V(!is_connected_to_host(), FAILED);
if (in_count == 0)
return ERR_UNAVAILABLE;
uint32_t to_read = 0;
uint32_t left = 0;
uint8_t is_string = 0;
r_buffer_size = 0; r_buffer_size = 0;
rbr.read((uint8_t *)&to_read, 4); ERR_FAIL_COND_V(!is_connected_to_host(), FAILED);
in_count--;
left = rbr.data_left();
if (left < to_read + 1) { if (_in_buffer.packets_left() == 0)
rbr.advance_read(left); return ERR_UNAVAILABLE;
return FAILED;
}
rbr.read(&is_string, 1); int read = 0;
rbr.read(packet_buffer, to_read); PoolVector<uint8_t>::Write rw = _packet_buffer.write();
*r_buffer = packet_buffer; _in_buffer.read_packet(rw.ptr(), _packet_buffer.size(), &_is_string, read);
r_buffer_size = to_read;
_was_string = is_string; *r_buffer = rw.ptr();
r_buffer_size = read;
return OK; return OK;
}; };
@ -161,12 +148,12 @@ int LWSPeer::get_available_packet_count() const {
if (!is_connected_to_host()) if (!is_connected_to_host())
return 0; return 0;
return in_count; return _in_buffer.packets_left();
}; };
bool LWSPeer::was_string_packet() const { bool LWSPeer::was_string_packet() const {
return _was_string; return _is_string;
}; };
bool LWSPeer::is_connected_to_host() const { bool LWSPeer::is_connected_to_host() const {
@ -219,12 +206,11 @@ void LWSPeer::close(int p_code, String p_reason) {
close_reason = ""; close_reason = "";
} }
wsi = NULL; wsi = NULL;
rbw.resize(0); _in_buffer.clear();
rbr.resize(0); _out_buffer.clear();
in_count = 0; _in_size = 0;
in_size = 0; _is_string = 0;
out_count = 0; _packet_buffer.resize(0);
_was_string = false;
}; };
IP_Address LWSPeer::get_connected_host() const { IP_Address LWSPeer::get_connected_host() const {

View File

@ -37,6 +37,7 @@
#include "core/ring_buffer.h" #include "core/ring_buffer.h"
#include "libwebsockets.h" #include "libwebsockets.h"
#include "lws_config.h" #include "lws_config.h"
#include "packet_buffer.h"
#include "websocket_peer.h" #include "websocket_peer.h"
class LWSPeer : public WebSocketPeer { class LWSPeer : public WebSocketPeer {
@ -44,14 +45,16 @@ class LWSPeer : public WebSocketPeer {
GDCIIMPL(LWSPeer, WebSocketPeer); GDCIIMPL(LWSPeer, WebSocketPeer);
private: private:
enum { int _in_size;
PACKET_BUFFER_SIZE = 65536 - 5 // 4 bytes for the size, 1 for the type uint8_t _is_string;
}; // Our packet info is just a boolean (is_string), using uint8_t for it.
PacketBuffer<uint8_t> _in_buffer;
PacketBuffer<uint8_t> _out_buffer;
PoolVector<uint8_t> _packet_buffer;
uint8_t packet_buffer[PACKET_BUFFER_SIZE];
struct lws *wsi; struct lws *wsi;
WriteMode write_mode; WriteMode write_mode;
bool _was_string;
int close_code; int close_code;
String close_reason; String close_reason;
@ -63,17 +66,10 @@ public:
bool clean_close; bool clean_close;
}; };
RingBuffer<uint8_t> rbw;
RingBuffer<uint8_t> rbr;
uint8_t input_buffer[PACKET_BUFFER_SIZE];
uint32_t in_size;
int in_count;
int out_count;
virtual int get_available_packet_count() const; virtual int get_available_packet_count() const;
virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size); virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size);
virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size); virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size);
virtual int get_max_packet_size() const { return PACKET_BUFFER_SIZE; }; virtual int get_max_packet_size() const { return _packet_buffer.size(); };
virtual void close(int p_code = 1000, String p_reason = ""); virtual void close(int p_code = 1000, String p_reason = "");
virtual bool is_connected_to_host() const; virtual bool is_connected_to_host() const;
@ -84,7 +80,7 @@ public:
virtual void set_write_mode(WriteMode p_mode); virtual void set_write_mode(WriteMode p_mode);
virtual bool was_string_packet() const; virtual bool was_string_packet() const;
void set_wsi(struct lws *wsi); void set_wsi(struct lws *wsi, unsigned int _in_buf_size, unsigned int _in_pkt_size, unsigned int _out_buf_size, unsigned int _out_pkt_size);
Error read_wsi(void *in, size_t len); Error read_wsi(void *in, size_t len);
Error write_wsi(); Error write_wsi();
void send_close_status(struct lws *wsi); void send_close_status(struct lws *wsi);

View File

@ -31,6 +31,7 @@
#include "lws_server.h" #include "lws_server.h"
#include "core/os/os.h" #include "core/os/os.h"
#include "core/project_settings.h"
Error LWSServer::listen(int p_port, PoolVector<String> p_protocols, bool gd_mp_api) { Error LWSServer::listen(int p_port, PoolVector<String> p_protocols, bool gd_mp_api) {
@ -67,6 +68,10 @@ bool LWSServer::is_listening() const {
return context != NULL; return context != NULL;
} }
int LWSServer::get_max_packet_size() const {
return (1 << _out_buf_size) - PROTO_SIZE;
}
int LWSServer::_handle_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { int LWSServer::_handle_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) {
LWSPeer::PeerData *peer_data = (LWSPeer::PeerData *)user; LWSPeer::PeerData *peer_data = (LWSPeer::PeerData *)user;
@ -85,7 +90,7 @@ int LWSServer::_handle_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
int32_t id = _gen_unique_id(); int32_t id = _gen_unique_id();
Ref<LWSPeer> peer = Ref<LWSPeer>(memnew(LWSPeer)); Ref<LWSPeer> peer = Ref<LWSPeer>(memnew(LWSPeer));
peer->set_wsi(wsi); peer->set_wsi(wsi, _in_buf_size, _in_pkt_size, _out_buf_size, _out_pkt_size);
_peer_map[id] = peer; _peer_map[id] = peer;
peer_data->peer_id = id; peer_data->peer_id = id;
@ -192,6 +197,10 @@ void LWSServer::disconnect_peer(int p_peer_id, int p_code, String p_reason) {
} }
LWSServer::LWSServer() { LWSServer::LWSServer() {
_in_buf_size = nearest_shift((int)GLOBAL_GET(WSS_IN_BUF) - 1) + 10;
_in_pkt_size = nearest_shift((int)GLOBAL_GET(WSS_IN_PKT) - 1);
_out_buf_size = nearest_shift((int)GLOBAL_GET(WSS_OUT_BUF) - 1) + 10;
_out_pkt_size = nearest_shift((int)GLOBAL_GET(WSS_OUT_PKT) - 1);
context = NULL; context = NULL;
_lws_ref = NULL; _lws_ref = NULL;
} }

View File

@ -45,11 +45,16 @@ class LWSServer : public WebSocketServer {
private: private:
Map<int, Ref<LWSPeer> > peer_map; Map<int, Ref<LWSPeer> > peer_map;
int _in_buf_size;
int _in_pkt_size;
int _out_buf_size;
int _out_pkt_size;
public: public:
Error listen(int p_port, PoolVector<String> p_protocols = PoolVector<String>(), bool gd_mp_api = false); Error listen(int p_port, PoolVector<String> p_protocols = PoolVector<String>(), bool gd_mp_api = false);
void stop(); void stop();
bool is_listening() const; bool is_listening() const;
int get_max_packet_size() const;
bool has_peer(int p_id) const; bool has_peer(int p_id) const;
Ref<WebSocketPeer> get_peer(int p_id) const; Ref<WebSocketPeer> get_peer(int p_id) const;
IP_Address get_peer_address(int p_peer_id) const; IP_Address get_peer_address(int p_peer_id) const;

View File

@ -0,0 +1,122 @@
/*************************************************************************/
/* packet_buffer.h */
/*************************************************************************/
/* This file is part of: */
/* GODOT ENGINE */
/* https://godotengine.org */
/*************************************************************************/
/* Copyright (c) 2007-2018 Juan Linietsky, Ariel Manzur. */
/* Copyright (c) 2014-2018 Godot Engine contributors (cf. AUTHORS.md) */
/* */
/* Permission is hereby granted, free of charge, to any person obtaining */
/* a copy of this software and associated documentation files (the */
/* "Software"), to deal in the Software without restriction, including */
/* without limitation the rights to use, copy, modify, merge, publish, */
/* distribute, sublicense, and/or sell copies of the Software, and to */
/* permit persons to whom the Software is furnished to do so, subject to */
/* the following conditions: */
/* */
/* The above copyright notice and this permission notice shall be */
/* included in all copies or substantial portions of the Software. */
/* */
/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
/*************************************************************************/
#ifndef PACKET_BUFFER_H
#define PACKET_BUFFER_H
#include "core/os/copymem.h"
#include "core/ring_buffer.h"
template <class T>
class PacketBuffer {
private:
typedef struct {
uint32_t size;
T info;
} _Packet;
RingBuffer<_Packet> _packets;
RingBuffer<uint8_t> _payload;
public:
Error write_packet(const uint8_t *p_payload, uint32_t p_size, const T *p_info) {
#ifdef TOOLS_ENABLED
// Verbose buffer warnings
if (p_payload && _payload.space_left() < p_size) {
ERR_PRINT("Buffer payload full! Dropping data.");
ERR_FAIL_V(ERR_OUT_OF_MEMORY);
}
if (p_info && _packets.space_left() < 1) {
ERR_PRINT("Too many packets in queue! Dropping data.");
ERR_FAIL_V(ERR_OUT_OF_MEMORY);
}
#else
ERR_FAIL_COND_V(p_payload && _payload.space_left() < p_size, ERR_OUT_OF_MEMORY);
ERR_FAIL_COND_V(p_info && _packets.space_left() < 1, ERR_OUT_OF_MEMORY);
#endif
// If p_info is NULL, only the payload is written
if (p_info) {
_Packet p;
p.size = p_size;
copymem(&p.info, p_info, sizeof(T));
_packets.write(p);
}
// If p_payload is NULL, only the packet information is written.
if (p_payload) {
_payload.write((const uint8_t *)p_payload, p_size);
}
return OK;
}
Error read_packet(uint8_t *r_payload, int p_bytes, T *r_info, int &r_read) {
ERR_FAIL_COND_V(_packets.data_left() < 1, ERR_UNAVAILABLE);
_Packet p;
_packets.read(&p, 1);
ERR_FAIL_COND_V(_payload.data_left() < p.size, ERR_BUG);
ERR_FAIL_COND_V(p_bytes < p.size, ERR_OUT_OF_MEMORY);
r_read = p.size;
copymem(r_info, &p.info, sizeof(T));
_payload.read(r_payload, p.size);
return OK;
}
void discard_payload(int p_size) {
_packets.decrease_write(p_size);
}
void resize(int p_pkt_shift, int p_buf_shift) {
_packets.resize(p_pkt_shift);
_payload.resize(p_buf_shift);
}
int packets_left() const {
return _packets.data_left();
}
void clear() {
_payload.resize(0);
_packets.resize(0);
}
PacketBuffer() {
clear();
}
~PacketBuffer() {
clear();
}
};
#endif // PACKET_BUFFER_H

View File

@ -29,6 +29,7 @@
/*************************************************************************/ /*************************************************************************/
#include "register_types.h" #include "register_types.h"
#include "core/error_macros.h" #include "core/error_macros.h"
#include "core/project_settings.h"
#ifdef JAVASCRIPT_ENABLED #ifdef JAVASCRIPT_ENABLED
#include "emscripten.h" #include "emscripten.h"
#include "emws_client.h" #include "emws_client.h"
@ -41,6 +42,22 @@
#endif #endif
void register_websocket_types() { void register_websocket_types() {
#define _SET_HINT(NAME, _VAL_, _MAX_) \
GLOBAL_DEF(NAME, _VAL_); \
ProjectSettings::get_singleton()->set_custom_property_info(NAME, PropertyInfo(Variant::INT, NAME, PROPERTY_HINT_RANGE, "2," #_MAX_ ",1,or_greater"));
// Client buffers project settings
_SET_HINT(WSC_IN_BUF, 64, 4096);
_SET_HINT(WSC_IN_PKT, 1024, 16384);
_SET_HINT(WSC_OUT_BUF, 64, 4096);
_SET_HINT(WSC_OUT_PKT, 1024, 16384);
// Server buffers project settings
_SET_HINT(WSS_IN_BUF, 64, 4096);
_SET_HINT(WSS_IN_PKT, 1024, 16384);
_SET_HINT(WSS_OUT_BUF, 64, 4096);
_SET_HINT(WSS_OUT_PKT, 1024, 16384);
#ifdef JAVASCRIPT_ENABLED #ifdef JAVASCRIPT_ENABLED
EM_ASM({ EM_ASM({
var IDHandler = {}; var IDHandler = {};

View File

@ -30,6 +30,16 @@
#ifndef WEBSOCKETMACTOS_H #ifndef WEBSOCKETMACTOS_H
#define WEBSOCKETMACTOS_H #define WEBSOCKETMACTOS_H
#define WSC_IN_BUF "network/limits/websocket_client/max_in_buffer_kb"
#define WSC_IN_PKT "network/limits/websocket_client/max_in_packets"
#define WSC_OUT_BUF "network/limits/websocket_client/max_out_buffer_kb"
#define WSC_OUT_PKT "network/limits/websocket_client/max_out_packets"
#define WSS_IN_BUF "network/limits/websocket_server/max_in_buffer_kb"
#define WSS_IN_PKT "network/limits/websocket_server/max_in_packets"
#define WSS_OUT_BUF "network/limits/websocket_server/max_out_buffer_kb"
#define WSS_OUT_PKT "network/limits/websocket_server/max_out_packets"
/* clang-format off */ /* clang-format off */
#define GDCICLASS(CNAME) \ #define GDCICLASS(CNAME) \
public:\ public:\

View File

@ -100,13 +100,6 @@ int WebSocketMultiplayerPeer::get_available_packet_count() const {
return _incoming_packets.size(); return _incoming_packets.size();
} }
int WebSocketMultiplayerPeer::get_max_packet_size() const {
ERR_FAIL_COND_V(!_is_multiplayer, ERR_UNCONFIGURED);
return MAX_PACKET_SIZE;
}
Error WebSocketMultiplayerPeer::get_packet(const uint8_t **r_buffer, int &r_buffer_size) { Error WebSocketMultiplayerPeer::get_packet(const uint8_t **r_buffer, int &r_buffer_size) {
r_buffer_size = 0; r_buffer_size = 0;

View File

@ -51,9 +51,7 @@ protected:
SYS_DEL = 2, SYS_DEL = 2,
SYS_ID = 3, SYS_ID = 3,
PROTO_SIZE = 9, PROTO_SIZE = 9
SYS_PACKET_SIZE = 13,
MAX_PACKET_SIZE = 65536 - 14 // 5 websocket, 9 multiplayer
}; };
struct Packet { struct Packet {
@ -93,7 +91,7 @@ public:
/* PacketPeer */ /* PacketPeer */
virtual int get_available_packet_count() const; virtual int get_available_packet_count() const;
virtual int get_max_packet_size() const; virtual int get_max_packet_size() const = 0;
virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size); virtual Error get_packet(const uint8_t **r_buffer, int &r_buffer_size);
virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size); virtual Error put_packet(const uint8_t *p_buffer, int p_buffer_size);

View File

@ -32,7 +32,6 @@
#include "core/error_list.h" #include "core/error_list.h"
#include "core/io/packet_peer.h" #include "core/io/packet_peer.h"
#include "core/ring_buffer.h"
#include "websocket_macros.h" #include "websocket_macros.h"
class WebSocketPeer : public PacketPeer { class WebSocketPeer : public PacketPeer {