Add CAP exchange protocol, Deadlock fix, Sync stuck fix with retry mechanism

This commit is contained in:
Matt Hess
2025-12-17 00:29:41 +00:00
parent 940f1cabe8
commit 11b545e91b
10 changed files with 1101 additions and 117 deletions
+162 -6
View File
@@ -436,7 +436,7 @@ void P2PServer::update_peer_connections()
uint32_t N = m_maxOutgoingPeers;
// Special case: when we can't find p2pool peers, scan through monerod peers (try 25 peers at a time)
// Special case: when we can't find p2pool peers, scan through salviumd peers (try 25 peers at a time)
if (has_good_peers) {
m_seenGoodPeers = true;
}
@@ -1500,7 +1500,7 @@ void P2PServer::check_host()
if (cur_time >= last_active + 300) {
const uint64_t dt = static_cast<uint64_t>(cur_time - last_active);
const Params::Host& host = m_pool->current_host();
LOGERR(1, "no ZMQ messages received from host " << host.m_displayName << " in the last " << dt << " seconds, check your monerod/p2pool/network/firewall setup!!!");
LOGERR(1, "no ZMQ messages received from host " << host.m_displayName << " in the last " << dt << " seconds, check your salviumd/p2pool/network/firewall setup!!!");
m_pool->reconnect_to_host();
}
}
@@ -2262,6 +2262,28 @@ bool P2PServer::P2PClient::on_read(const char* data, uint32_t size)
}
break;
case MessageId::CHECKPOINT_REQUEST:
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(m_addrString) << log::NoColor() << " sent CHECKPOINT_REQUEST");
bytes_read = 1;
if (!on_checkpoint_request()) {
return false;
}
break;
case MessageId::CHECKPOINT_RESPONSE:
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(m_addrString) << log::NoColor() << " sent CHECKPOINT_RESPONSE");
if (bytes_left >= 1 + sizeof(uint32_t)) {
const uint32_t count = read_unaligned(reinterpret_cast<const uint32_t*>(buf + 1));
const uint32_t msg_size = count * (sizeof(uint64_t) + HASH_SIZE + sizeof(uint64_t) + sizeof(uint64_t));
if (bytes_left >= 1 + sizeof(uint32_t) + msg_size) {
bytes_read = 1 + sizeof(uint32_t) + msg_size;
if (!on_checkpoint_response(buf + 1 + sizeof(uint32_t), count)) {
return false;
}
}
}
break;
}
if (bytes_read) {
@@ -2300,11 +2322,11 @@ void P2PServer::P2PClient::on_disconnected()
if (!m_handshakeComplete) {
LOGWARN(5, "peer " << static_cast<char*>(m_addrString) << " disconnected before finishing handshake");
ban(DEFAULT_BAN_TIME);
// ban(DEFAULT_BAN_TIME);
if (server) {
server->remove_peer_from_list(this);
}
}
}
}
bool P2PServer::P2PClient::send_handshake_challenge()
@@ -2637,6 +2659,11 @@ void P2PServer::P2PClient::on_after_handshake(uint8_t* &p)
memcpy(p, &version, sizeof(uint32_t));
p += sizeof(uint32_t);
}
// Request checkpoints from peer (CAP exchange)
// TODO: Only send to peers with compatible protocol version to avoid disconnects
// LOGINFO(5, "sending CHECKPOINT_REQUEST to " << static_cast<char*>(m_addrString));
// *(p++) = static_cast<uint8_t>(MessageId::CHECKPOINT_REQUEST);
}
bool P2PServer::P2PClient::on_listen_port(const uint8_t* buf)
@@ -2825,7 +2852,7 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size,
}
else if (peer_height > our_height) {
if (peer_height >= our_height + 2) {
LOGWARN(3, "peer " << static_cast<char*>(m_addrString) << " is ahead on mainchain (height " << peer_height << ", your height " << our_height << "). Is your monerod stuck or lagging?");
LOGWARN(3, "peer " << static_cast<char*>(m_addrString) << " is ahead on mainchain (height " << peer_height << ", your height " << our_height << "). Is your salviumd stuck or lagging?");
}
}
else {
@@ -2833,7 +2860,10 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size,
}
}
block->m_wantBroadcast = true;
// Only rebroadcast after initial sync complete - avoid relaying stale blocks
if (static_cast<P2PServer*>(m_owner)->m_pool->side_chain().is_ready_to_mine()) {
block->m_wantBroadcast = true;
}
m_lastBroadcastTimestamp = seconds_since_epoch();
@@ -3133,6 +3163,132 @@ bool P2PServer::P2PClient::on_genesis_info(const uint8_t* buf)
return true;
}
bool P2PServer::P2PClient::on_checkpoint_request()
{
P2PServer* server = static_cast<P2PServer*>(m_owner);
SideChain& side_chain = server->m_pool->side_chain();
// Get checkpoint history from sidechain
const std::vector<Checkpoint> checkpoints = side_chain.get_checkpoint_history();
LOGINFO(4, "peer " << log::Gray() << static_cast<char*>(m_addrString) << log::NoColor()
<< " requested checkpoints, sending " << checkpoints.size() << " checkpoints");
// Send checkpoint response
const bool result = server->send(this,
[&checkpoints, this](uint8_t* buf, size_t buf_size) -> size_t
{
const uint32_t count = static_cast<uint32_t>(checkpoints.size());
const size_t msg_size = 1 + sizeof(uint32_t) + count * (sizeof(uint64_t) + HASH_SIZE + sizeof(uint64_t) + sizeof(uint64_t));
if (buf_size < msg_size) {
LOGWARN(3, "Buffer too small for CHECKPOINT_RESPONSE");
return 0;
}
uint8_t* p = buf;
// Message ID
*(p++) = static_cast<uint8_t>(MessageId::CHECKPOINT_RESPONSE);
// Count
memcpy(p, &count, sizeof(uint32_t));
p += sizeof(uint32_t);
// Checkpoint data
for (const Checkpoint& cp : checkpoints) {
// Height
memcpy(p, &cp.height, sizeof(uint64_t));
p += sizeof(uint64_t);
// ID (hash)
memcpy(p, cp.id.h, HASH_SIZE);
p += HASH_SIZE;
// Cumulative difficulty (lo, hi)
memcpy(p, &cp.cumulative_difficulty.lo, sizeof(uint64_t));
p += sizeof(uint64_t);
memcpy(p, &cp.cumulative_difficulty.hi, sizeof(uint64_t));
p += sizeof(uint64_t);
}
LOGINFO(5, "Sent CHECKPOINT_RESPONSE with " << count << " checkpoints to " << static_cast<char*>(m_addrString));
return p - buf;
});
return result;
}
bool P2PServer::P2PClient::on_checkpoint_response(const uint8_t* buf, uint32_t count)
{
P2PServer* server = static_cast<P2PServer*>(m_owner);
SideChain& side_chain = server->m_pool->side_chain();
LOGINFO(4, "peer " << log::Gray() << static_cast<char*>(m_addrString) << log::NoColor()
<< " sent " << count << " checkpoints");
if (count == 0 || count > SideChain::CHECKPOINT_HISTORY) {
LOGWARN(3, "Invalid checkpoint count: " << count);
return false;
}
// Deserialize checkpoints
std::vector<Checkpoint> peer_checkpoints;
peer_checkpoints.reserve(count);
const uint8_t* p = buf;
for (uint32_t i = 0; i < count; ++i) {
Checkpoint cp;
// Height
cp.height = read_unaligned(reinterpret_cast<const uint64_t*>(p));
p += sizeof(uint64_t);
// ID (hash)
memcpy(cp.id.h, p, HASH_SIZE);
p += HASH_SIZE;
// Cumulative difficulty (lo, hi)
cp.cumulative_difficulty.lo = read_unaligned(reinterpret_cast<const uint64_t*>(p));
p += sizeof(uint64_t);
cp.cumulative_difficulty.hi = read_unaligned(reinterpret_cast<const uint64_t*>(p));
p += sizeof(uint64_t);
peer_checkpoints.push_back(cp);
LOGINFO(5, "Received checkpoint: height=" << cp.height << " id=" << cp.id << " diff=" << cp.cumulative_difficulty);
}
// Validate peer checkpoints against our chain
uint32_t mismatch_count = 0;
for (const Checkpoint& peer_cp : peer_checkpoints) {
if (!side_chain.validate_peer_checkpoint(peer_cp.height, peer_cp.id)) {
++mismatch_count;
LOGWARN(3, "Checkpoint mismatch at height " << peer_cp.height
<< ": peer has " << peer_cp.id);
}
}
if (mismatch_count > 0) {
LOGWARN(2, "Checkpoint validation: " << mismatch_count << "/" << count
<< " mismatches with peer " << static_cast<char*>(m_addrString));
// If more than half the checkpoints mismatch, we may be on wrong chain
if (mismatch_count > count / 2) {
LOGERR(1, "DIVERGENCE DETECTED: More than 50% checkpoint mismatch with peer "
<< static_cast<char*>(m_addrString) << " (" << mismatch_count << "/" << count << ")");
// Trigger recovery if we have significantly different checkpoints
if (!peer_checkpoints.empty()) {
side_chain.trigger_recovery(peer_checkpoints[0].height);
}
}
} else if (count > 0) {
LOGINFO(3, "All " << count << " checkpoints validated successfully with peer "
<< static_cast<char*>(m_addrString));
}
return true;
}
bool P2PServer::P2PClient::on_aux_job_donation(const uint8_t* buf, uint32_t size)
{
P2PServer* server = static_cast<P2PServer*>(m_owner);