Block cache WIP and other fixes

- Block cache is implemented only on Windows for now
- Tracking of background jobs
- More robust sidechain syncing
This commit is contained in:
SChernykh
2021-08-24 11:42:41 +02:00
parent 86b31ea821
commit aba3bc50b8
13 changed files with 461 additions and 57 deletions
+100 -14
View File
@@ -22,6 +22,7 @@
#include "keccak.h"
#include "side_chain.h"
#include "pool_block.h"
#include "block_cache.h"
#include <fstream>
#include <numeric>
@@ -38,6 +39,8 @@ namespace p2pool {
P2PServer::P2PServer(p2pool* pool)
: TCPServer(P2PClient::allocate, pool->params().m_p2pAddresses)
, m_pool(pool)
, m_cache(new BlockCache())
, m_cacheLoaded(false)
, m_rd{}
, m_rng(m_rd())
, m_block(new PoolBlock())
@@ -49,6 +52,7 @@ P2PServer::P2PServer(p2pool* pool)
uv_mutex_init_checked(&m_blockLock);
uv_mutex_init_checked(&m_peerListLock);
uv_mutex_init_checked(&m_broadcastLock);
uv_rwlock_init_checked(&m_cachedBlocksLock);
int err = uv_async_init(&m_loop, &m_broadcastAsync, on_broadcast);
if (err) {
@@ -64,6 +68,12 @@ P2PServer::P2PServer(p2pool* pool)
panic();
}
if (m_cache) {
WriteLock lock(m_cachedBlocksLock);
m_cache->load_all(m_pool->side_chain(), *this);
m_cacheLoaded = true;
}
m_timer.data = this;
err = uv_timer_start(&m_timer, on_timer, 10000, 2000);
if (err) {
@@ -86,8 +96,33 @@ P2PServer::~P2PServer()
uv_mutex_destroy(&m_blockLock);
uv_mutex_destroy(&m_peerListLock);
uv_mutex_destroy(&m_broadcastLock);
uv_rwlock_destroy(&m_cachedBlocksLock);
delete m_block;
for (auto it : m_cachedBlocks) {
delete it.second;
}
delete m_cache;
}
void P2PServer::add_cached_block(const PoolBlock& block)
{
if (m_cacheLoaded) {
LOGERR(1, "add_cached_block can only be called on startup. Fix the code!");
return;
}
PoolBlock* new_block = new PoolBlock(block);
m_cachedBlocks.insert({ new_block->m_sidechainId, new_block });
}
void P2PServer::store_in_cache(const PoolBlock& block)
{
if (m_cache) {
m_cache->store(block);
}
}
void P2PServer::connect_to_peers(const std::string& peer_list)
@@ -200,13 +235,13 @@ void P2PServer::save_peer_list_async()
const int err = uv_queue_work(&m_loop, &work->req,
[](uv_work_t* req)
{
num_running_jobs.fetch_add(1);
bkg_jobs_tracker.start("P2PServer::save_peer_list_async");
reinterpret_cast<Work*>(req->data)->server->save_peer_list();
},
[](uv_work_t* req, int /*status*/)
{
delete reinterpret_cast<Work*>(req->data);
num_running_jobs.fetch_sub(1);
bkg_jobs_tracker.stop("P2PServer::save_peer_list_async");
});
if (err) {
@@ -384,7 +419,7 @@ void P2PServer::broadcast(const PoolBlock& block)
data->ancestor_hashes = block.m_uncles;
data->ancestor_hashes.push_back(block.m_parent);
LOGINFO(5, "Broadcasting block " << block.m_sidechainId << ": " << data->pruned_blob.size() << '/' << data->blob.size() << " bytes (pruned/full)");
LOGINFO(5, "Broadcasting block " << block.m_sidechainId << " (height " << block.m_sidechainHeight << "): " << data->pruned_blob.size() << '/' << data->blob.size() << " bytes (pruned/full)");
{
MutexLock lock(m_broadcastLock);
@@ -504,12 +539,46 @@ void P2PServer::print_status()
void P2PServer::on_timer()
{
flush_cache();
download_missing_blocks();
update_peer_connections();
update_peer_list();
save_peer_list_async();
}
void P2PServer::flush_cache()
{
if (!m_cache) {
return;
}
struct Work
{
uv_work_t req;
BlockCache* cache;
};
Work* work = new Work{};
work->req.data = work;
work->cache = m_cache;
const int err = uv_queue_work(uv_default_loop(), &work->req,
[](uv_work_t* req)
{
bkg_jobs_tracker.start("P2PServer::flush_cache");
reinterpret_cast<Work*>(req->data)->cache->flush();
},
[](uv_work_t* req, int)
{
delete reinterpret_cast<Work*>(req->data);
bkg_jobs_tracker.stop("P2PServer::flush_cache");
});
if (err) {
LOGERR(1, "flush_cache: uv_queue_work failed, error " << uv_err_name(err));
}
}
void P2PServer::download_missing_blocks()
{
std::vector<hash> missing_blocks;
@@ -822,7 +891,7 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH
const int err = uv_queue_work(&server->m_loop, &work->req,
[](uv_work_t* req)
{
num_running_jobs.fetch_add(1);
bkg_jobs_tracker.start("P2PServer::send_handshake_solution");
Work* work = reinterpret_cast<Work*>(req->data);
const std::vector<uint8_t>& consensus_id = work->server->m_pool->side_chain().consensus_id();
@@ -879,7 +948,7 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH
[work]()
{
delete work;
num_running_jobs.fetch_sub(1);
bkg_jobs_tracker.stop("P2PServer::send_handshake_solution");
});
// We might've been disconnected while working on the challenge, do nothing in this case
@@ -1118,7 +1187,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size)
return false;
}
return handle_incoming_block_async();
return handle_incoming_block_async(server->m_block);
}
bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size)
@@ -1145,13 +1214,13 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size)
if ((server->m_block->m_prevId != server->m_pool->miner_data().prev_id) &&
(server->m_block->m_txinGenHeight < server->m_pool->miner_data().height)){
LOGINFO(4, "peer " << static_cast<char*>(m_addrString) << " broadcasted a stale block, ignoring it");
LOGINFO(4, "peer " << static_cast<char*>(m_addrString) << " broadcasted a stale block (mainchain height " << server->m_block->m_txinGenHeight << ", expected >= " << server->m_pool->miner_data().height << "), ignoring it");
return true;
}
server->m_block->m_wantBroadcast = true;
return handle_incoming_block_async();
return handle_incoming_block_async(server->m_block);
}
bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*)
@@ -1245,15 +1314,17 @@ bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) const
return true;
}
bool P2PServer::P2PClient::handle_incoming_block_async()
bool P2PServer::P2PClient::handle_incoming_block_async(PoolBlock* block)
{
P2PServer* server = static_cast<P2PServer*>(m_owner);
if (server->m_pool->side_chain().block_seen(*server->m_block)) {
LOGINFO(5, "block " << server->m_block->m_sidechainId << " was received before, skipping it");
if (server->m_pool->side_chain().block_seen(*block)) {
LOGINFO(5, "block " << block->m_sidechainId << " was received before, skipping it");
return true;
}
server->store_in_cache(*block);
struct Work
{
uv_work_t req;
@@ -1264,13 +1335,13 @@ bool P2PServer::P2PClient::handle_incoming_block_async()
std::vector<hash> missing_blocks;
};
Work* work = new Work{ {}, *server->m_block, this, server, m_resetCounter.load(), {} };
Work* work = new Work{ {}, *block, this, server, m_resetCounter.load(), {} };
work->req.data = work;
const int err = uv_queue_work(&server->m_loop, &work->req,
[](uv_work_t* req)
{
num_running_jobs.fetch_add(1);
bkg_jobs_tracker.start("P2PServer::handle_incoming_block_async");
Work* work = reinterpret_cast<Work*>(req->data);
work->client->handle_incoming_block(work->server->m_pool, work->block, work->client_reset_counter, work->missing_blocks);
},
@@ -1279,7 +1350,7 @@ bool P2PServer::P2PClient::handle_incoming_block_async()
Work* work = reinterpret_cast<Work*>(req->data);
work->client->post_handle_incoming_block(work->client_reset_counter, work->missing_blocks);
delete work;
num_running_jobs.fetch_sub(1);
bkg_jobs_tracker.stop("P2PServer::handle_incoming_block_async");
});
if (err != 0) {
@@ -1311,7 +1382,22 @@ void P2PServer::P2PClient::post_handle_incoming_block(const uint32_t reset_count
return;
}
if (missing_blocks.empty()) {
return;
}
P2PServer* server = static_cast<P2PServer*>(m_owner);
ReadLock lock(server->m_cachedBlocksLock);
for (const hash& id : missing_blocks) {
auto it = server->m_cachedBlocks.find(id);
if (it != server->m_cachedBlocks.end()) {
LOGINFO(5, "using cached block for id = " << id);
handle_incoming_block_async(it->second);
continue;
}
const bool result = m_owner->send(this,
[this, &id](void* buf)
{