[MP] Handle cleanup of "scene cache" nodes

Make sure we delete the relevant ObjectID from the cache when the nodes
are removed from tree.

(cherry picked from commit 853df2895a)
This commit is contained in:
Fabio Alessandrelli 2023-12-10 14:40:18 +01:00 committed by Yuri Sizov
parent 6e4cbdc144
commit 8544106b7e
2 changed files with 109 additions and 82 deletions

View File

@ -35,25 +35,61 @@
#include "core/io/marshalls.h" #include "core/io/marshalls.h"
#include "scene/main/node.h" #include "scene/main/node.h"
#include "scene/main/window.h" #include "scene/main/window.h"
#include "scene/scene_string_names.h"
SceneCacheInterface::NodeCache &SceneCacheInterface::_track(Node *p_node) {
const ObjectID oid = p_node->get_instance_id();
NodeCache *nc = nodes_cache.getptr(oid);
if (!nc) {
nodes_cache[oid] = NodeCache();
p_node->connect(SceneStringNames::get_singleton()->tree_exited, callable_mp(this, &SceneCacheInterface::_remove_node_cache).bind(oid), Object::CONNECT_ONE_SHOT);
}
return nodes_cache[oid];
}
void SceneCacheInterface::_remove_node_cache(ObjectID p_oid) {
NodeCache *nc = nodes_cache.getptr(p_oid);
if (!nc) {
return;
}
for (KeyValue<int, int> &E : nc->recv_ids) {
PeerInfo *pinfo = peers_info.getptr(E.key);
ERR_CONTINUE(!pinfo);
pinfo->recv_nodes.erase(E.value);
}
for (KeyValue<int, bool> &E : nc->confirmed_peers) {
PeerInfo *pinfo = peers_info.getptr(E.key);
ERR_CONTINUE(!pinfo);
pinfo->sent_nodes.erase(p_oid);
}
nodes_cache.erase(p_oid);
}
void SceneCacheInterface::on_peer_change(int p_id, bool p_connected) { void SceneCacheInterface::on_peer_change(int p_id, bool p_connected) {
if (p_connected) { if (p_connected) {
path_get_cache.insert(p_id, PathGetCache()); peers_info.insert(p_id, PeerInfo());
} else { } else {
// Cleanup get cache. PeerInfo *pinfo = peers_info.getptr(p_id);
path_get_cache.erase(p_id); ERR_FAIL_NULL(pinfo); // Bug.
// Cleanup sent cache. for (KeyValue<int, ObjectID> E : pinfo->recv_nodes) {
// Some refactoring is needed to make this faster and do paths GC. NodeCache *nc = nodes_cache.getptr(E.value);
for (KeyValue<ObjectID, PathSentCache> &E : path_send_cache) { ERR_CONTINUE(!nc);
E.value.confirmed_peers.erase(p_id); nc->recv_ids.erase(E.key);
} }
for (const ObjectID &oid : pinfo->sent_nodes) {
NodeCache *nc = nodes_cache.getptr(oid);
ERR_CONTINUE(!nc);
nc->confirmed_peers.erase(p_id);
}
peers_info.erase(p_id);
} }
} }
void SceneCacheInterface::process_simplify_path(int p_from, const uint8_t *p_packet, int p_packet_len) { void SceneCacheInterface::process_simplify_path(int p_from, const uint8_t *p_packet, int p_packet_len) {
ERR_FAIL_COND(!peers_info.has(p_from)); // Bug.
ERR_FAIL_COND_MSG(p_packet_len < 38, "Invalid packet received. Size too small.");
Node *root_node = SceneTree::get_singleton()->get_root()->get_node(multiplayer->get_root_path()); Node *root_node = SceneTree::get_singleton()->get_root()->get_node(multiplayer->get_root_path());
ERR_FAIL_NULL(root_node); ERR_FAIL_NULL(root_node);
ERR_FAIL_COND_MSG(p_packet_len < 38, "Invalid packet received. Size too small.");
int ofs = 1; int ofs = 1;
String methods_md5; String methods_md5;
@ -63,15 +99,13 @@ void SceneCacheInterface::process_simplify_path(int p_from, const uint8_t *p_pac
int id = decode_uint32(&p_packet[ofs]); int id = decode_uint32(&p_packet[ofs]);
ofs += 4; ofs += 4;
ERR_FAIL_COND_MSG(peers_info[p_from].recv_nodes.has(id), vformat("Duplicate remote cache ID %d for peer %d", id, p_from));
String paths; String paths;
paths.parse_utf8((const char *)(p_packet + ofs), p_packet_len - ofs); paths.parse_utf8((const char *)(p_packet + ofs), p_packet_len - ofs);
const NodePath path = paths; const NodePath path = paths;
if (!path_get_cache.has(p_from)) {
path_get_cache[p_from] = PathGetCache();
}
Node *node = root_node->get_node(path); Node *node = root_node->get_node(path);
ERR_FAIL_NULL(node); ERR_FAIL_NULL(node);
const bool valid_rpc_checksum = multiplayer->get_rpc_md5(node) == methods_md5; const bool valid_rpc_checksum = multiplayer->get_rpc_md5(node) == methods_md5;
@ -79,10 +113,9 @@ void SceneCacheInterface::process_simplify_path(int p_from, const uint8_t *p_pac
ERR_PRINT("The rpc node checksum failed. Make sure to have the same methods on both nodes. Node path: " + path); ERR_PRINT("The rpc node checksum failed. Make sure to have the same methods on both nodes. Node path: " + path);
} }
PathGetCache::NodeInfo ni; peers_info[p_from].recv_nodes.insert(id, node->get_instance_id());
ni.path = node->get_path(); NodeCache &cache = _track(node);
cache.recv_ids.insert(p_from, id);
path_get_cache[p_from].nodes[id] = ni;
// Encode path to send ack. // Encode path to send ack.
CharString pname = String(path).utf8(); CharString pname = String(path).utf8();
@ -122,15 +155,15 @@ void SceneCacheInterface::process_confirm_path(int p_from, const uint8_t *p_pack
Node *node = root_node->get_node(path); Node *node = root_node->get_node(path);
ERR_FAIL_NULL(node); ERR_FAIL_NULL(node);
PathSentCache *psc = path_send_cache.getptr(node->get_instance_id()); NodeCache *cache = nodes_cache.getptr(node->get_instance_id());
ERR_FAIL_NULL_MSG(psc, "Invalid packet received. Tries to confirm a path which was not found in cache."); ERR_FAIL_NULL_MSG(cache, "Invalid packet received. Tries to confirm a node which was not requested.");
HashMap<int, bool>::Iterator E = psc->confirmed_peers.find(p_from); bool *confirmed = cache->confirmed_peers.getptr(p_from);
ERR_FAIL_COND_MSG(!E, "Invalid packet received. Source peer was not found in cache for the given path."); ERR_FAIL_NULL_MSG(confirmed, "Invalid packet received. Tries to confirm a node which was not requested.");
E->value = true; *confirmed = true;
} }
Error SceneCacheInterface::_send_confirm_path(Node *p_node, PathSentCache *psc, const List<int> &p_peers) { Error SceneCacheInterface::_send_confirm_path(Node *p_node, NodeCache &p_cache, const List<int> &p_peers) {
// Encode function name. // Encode function name.
const CharString path = String(multiplayer->get_root_path().rel_path_to(p_node->get_path())).utf8(); const CharString path = String(multiplayer->get_root_path().rel_path_to(p_node->get_path())).utf8();
const int path_len = encode_cstring(path.get_data(), nullptr); const int path_len = encode_cstring(path.get_data(), nullptr);
@ -148,7 +181,7 @@ Error SceneCacheInterface::_send_confirm_path(Node *p_node, PathSentCache *psc,
ofs += encode_cstring(methods_md5.utf8().get_data(), &packet.write[ofs]); ofs += encode_cstring(methods_md5.utf8().get_data(), &packet.write[ofs]);
ofs += encode_uint32(psc->id, &packet.write[ofs]); ofs += encode_uint32(p_cache.cache_id, &packet.write[ofs]);
ofs += encode_cstring(path.get_data(), &packet.write[ofs]); ofs += encode_cstring(path.get_data(), &packet.write[ofs]);
@ -162,80 +195,74 @@ Error SceneCacheInterface::_send_confirm_path(Node *p_node, PathSentCache *psc,
err = multiplayer->send_command(peer_id, packet.ptr(), packet.size()); err = multiplayer->send_command(peer_id, packet.ptr(), packet.size());
ERR_FAIL_COND_V(err != OK, err); ERR_FAIL_COND_V(err != OK, err);
// Insert into confirmed, but as false since it was not confirmed. // Insert into confirmed, but as false since it was not confirmed.
psc->confirmed_peers.insert(peer_id, false); p_cache.confirmed_peers.insert(peer_id, false);
ERR_CONTINUE(!peers_info.has(peer_id));
peers_info[peer_id].sent_nodes.insert(p_node->get_instance_id());
} }
return err; return err;
} }
bool SceneCacheInterface::is_cache_confirmed(Node *p_node, int p_peer) { bool SceneCacheInterface::is_cache_confirmed(Node *p_node, int p_peer) {
ERR_FAIL_NULL_V(p_node, false); ERR_FAIL_NULL_V(p_node, false);
const PathSentCache *psc = path_send_cache.getptr(p_node->get_instance_id()); const ObjectID oid = p_node->get_instance_id();
ERR_FAIL_NULL_V(psc, false); NodeCache *cache = nodes_cache.getptr(oid);
HashMap<int, bool>::ConstIterator F = psc->confirmed_peers.find(p_peer); bool *confirmed = cache ? cache->confirmed_peers.getptr(p_peer) : nullptr;
ERR_FAIL_COND_V(!F, false); // Should never happen. return confirmed && *confirmed;
return F->value;
} }
int SceneCacheInterface::make_object_cache(Object *p_obj) { int SceneCacheInterface::make_object_cache(Object *p_obj) {
Node *node = Object::cast_to<Node>(p_obj); Node *node = Object::cast_to<Node>(p_obj);
ERR_FAIL_NULL_V(node, -1); ERR_FAIL_NULL_V(node, -1);
const ObjectID oid = node->get_instance_id(); NodeCache &cache = _track(node);
// See if the path is cached. if (cache.cache_id == 0) {
PathSentCache *psc = path_send_cache.getptr(oid); cache.cache_id = last_send_cache_id++;
if (!psc) {
// Path is not cached, create.
path_send_cache[oid] = PathSentCache();
psc = path_send_cache.getptr(oid);
psc->id = last_send_cache_id++;
} }
return psc->id; return cache.cache_id;
} }
bool SceneCacheInterface::send_object_cache(Object *p_obj, int p_peer_id, int &r_id) { bool SceneCacheInterface::send_object_cache(Object *p_obj, int p_peer_id, int &r_id) {
Node *node = Object::cast_to<Node>(p_obj); Node *node = Object::cast_to<Node>(p_obj);
ERR_FAIL_NULL_V(node, false); ERR_FAIL_NULL_V(node, false);
const ObjectID oid = node->get_instance_id();
// See if the path is cached. // See if the path is cached.
PathSentCache *psc = path_send_cache.getptr(oid); NodeCache &cache = _track(node);
if (!psc) { if (cache.cache_id == 0) {
// Path is not cached, create. cache.cache_id = last_send_cache_id++;
path_send_cache[oid] = PathSentCache();
psc = path_send_cache.getptr(oid);
psc->id = last_send_cache_id++;
} }
r_id = psc->id; r_id = cache.cache_id;
bool has_all_peers = true; bool has_all_peers = true;
List<int> peers_to_add; // If one is missing, take note to add it. List<int> peers_to_add; // If one is missing, take note to add it.
if (p_peer_id > 0) { if (p_peer_id > 0) {
// Fast single peer check. // Fast single peer check.
HashMap<int, bool>::Iterator F = psc->confirmed_peers.find(p_peer_id); ERR_FAIL_COND_V_MSG(!peers_info.has(p_peer_id), false, "Peer doesn't exist: " + itos(p_peer_id));
if (!F) {
bool *confirmed = cache.confirmed_peers.getptr(p_peer_id);
if (!confirmed) {
peers_to_add.push_back(p_peer_id); // Need to also be notified. peers_to_add.push_back(p_peer_id); // Need to also be notified.
has_all_peers = false; has_all_peers = false;
} else if (!F->value) { } else if (!(*confirmed)) {
has_all_peers = false; has_all_peers = false;
} }
} else { } else {
// Long and painful. // Long and painful.
for (const int &E : multiplayer->get_connected_peers()) { for (KeyValue<int, PeerInfo> &E : peers_info) {
if (p_peer_id < 0 && E == -p_peer_id) { if (p_peer_id < 0 && E.key == -p_peer_id) {
continue; // Continue, excluded. continue; // Continue, excluded.
} }
HashMap<int, bool>::Iterator F = psc->confirmed_peers.find(E); bool *confirmed = cache.confirmed_peers.getptr(E.key);
if (!F) { if (!confirmed) {
peers_to_add.push_back(E); // Need to also be notified. peers_to_add.push_back(E.key); // Need to also be notified.
has_all_peers = false; has_all_peers = false;
} else if (!F->value) { } else if (!(*confirmed)) {
has_all_peers = false; has_all_peers = false;
} }
} }
} }
if (peers_to_add.size()) { if (peers_to_add.size()) {
_send_confirm_path(node, psc, peers_to_add); _send_confirm_path(node, cache, peers_to_add);
} }
return has_all_peers; return has_all_peers;
@ -244,22 +271,23 @@ bool SceneCacheInterface::send_object_cache(Object *p_obj, int p_peer_id, int &r
Object *SceneCacheInterface::get_cached_object(int p_from, uint32_t p_cache_id) { Object *SceneCacheInterface::get_cached_object(int p_from, uint32_t p_cache_id) {
Node *root_node = SceneTree::get_singleton()->get_root()->get_node(multiplayer->get_root_path()); Node *root_node = SceneTree::get_singleton()->get_root()->get_node(multiplayer->get_root_path());
ERR_FAIL_NULL_V(root_node, nullptr); ERR_FAIL_NULL_V(root_node, nullptr);
HashMap<int, PathGetCache>::Iterator E = path_get_cache.find(p_from); PeerInfo *pinfo = peers_info.getptr(p_from);
ERR_FAIL_COND_V_MSG(!E, nullptr, vformat("No cache found for peer %d.", p_from)); ERR_FAIL_NULL_V(pinfo, nullptr);
HashMap<int, PathGetCache::NodeInfo>::Iterator F = E->value.nodes.find(p_cache_id); const ObjectID *oid = pinfo->recv_nodes.getptr(p_cache_id);
ERR_FAIL_COND_V_MSG(!F, nullptr, vformat("ID %d not found in cache of peer %d.", p_cache_id, p_from)); ERR_FAIL_NULL_V_MSG(oid, nullptr, vformat("ID %d not found in cache of peer %d.", p_cache_id, p_from));
Node *node = Object::cast_to<Node>(ObjectDB::get_instance(*oid));
PathGetCache::NodeInfo *ni = &F->value; ERR_FAIL_NULL_V_MSG(node, nullptr, vformat("Failed to get cached node from peer %d with cache ID %d.", p_from, p_cache_id));
Node *node = root_node->get_node(ni->path);
if (!node) {
ERR_PRINT("Failed to get cached path: " + String(ni->path) + ".");
}
return node; return node;
} }
void SceneCacheInterface::clear() { void SceneCacheInterface::clear() {
path_get_cache.clear(); for (KeyValue<ObjectID, NodeCache> &E : nodes_cache) {
path_send_cache.clear(); Object *obj = ObjectDB::get_instance(E.key);
ERR_CONTINUE(!obj);
obj->disconnect(SceneStringNames::get_singleton()->tree_exited, callable_mp(this, &SceneCacheInterface::_remove_node_cache));
}
peers_info.clear();
nodes_cache.clear();
last_send_cache_id = 1; last_send_cache_id = 1;
} }

View File

@ -43,27 +43,26 @@ private:
SceneMultiplayer *multiplayer = nullptr; SceneMultiplayer *multiplayer = nullptr;
//path sent caches //path sent caches
struct PathSentCache { struct NodeCache {
HashMap<int, bool> confirmed_peers; int cache_id;
int id; HashMap<int, int> recv_ids; // peer id, remote cache id
HashMap<int, bool> confirmed_peers; // peer id, confirmed
}; };
//path get caches struct PeerInfo {
struct PathGetCache { HashMap<int, ObjectID> recv_nodes; // remote cache id, ObjectID
struct NodeInfo { HashSet<ObjectID> sent_nodes;
NodePath path;
ObjectID instance;
}; };
HashMap<int, NodeInfo> nodes; HashMap<ObjectID, NodeCache> nodes_cache;
}; HashMap<int, PeerInfo> peers_info;
HashMap<ObjectID, PathSentCache> path_send_cache;
HashMap<int, PathGetCache> path_get_cache;
int last_send_cache_id = 1; int last_send_cache_id = 1;
void _remove_node_cache(ObjectID p_oid);
NodeCache &_track(Node *p_node);
protected: protected:
Error _send_confirm_path(Node *p_node, PathSentCache *psc, const List<int> &p_peers); Error _send_confirm_path(Node *p_node, NodeCache &p_cache, const List<int> &p_peers);
public: public:
void clear(); void clear();