/* * This file is part of the Monero P2Pool * Copyright (c) 2021-2026 SChernykh * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, version 3. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "common.h" #include "zmq_reader.h" #include "json_parsers.h" #include "rapidjson_wrapper.h" LOG_CATEGORY(ZMQReader) namespace p2pool { ZMQReader::ZMQReader(const std::string& address, uint32_t zmq_port, const std::string& proxy, MinerCallbackHandler* handler) : m_monitor(nullptr) , m_zmqPort(zmq_port) , m_proxy(proxy) , m_handler(handler) , m_tx() , m_minerData() , m_chainmainData() { if (!m_proxy.empty() && is_localhost(address)) { LOGINFO(5, "not using proxy to connect to localhost address " << log::Gray() << address); m_proxy.clear(); } const bool is_v6 = address.find_first_of(':') != std::string::npos; if (is_v6) { m_publisher.set(zmq::sockopt::ipv6, 1); m_subscriber.set(zmq::sockopt::ipv6, 1); } char addr_buf[log::Stream::BUF_SIZE + 1]; addr_buf[0] = '\0'; std::random_device rd; std::mt19937_64 rng(rd()); for (uint32_t i = 0; i < 100; ++i) { const uint32_t port = 49152 + (rng() % 16384); try { log::Stream s(addr_buf); s << (is_v6 ? "tcp://[::1]:" : "tcp://127.0.0.1:") << port << '\0'; m_publisher.bind(addr_buf); m_publisherPort = static_cast(port); break; } catch (const std::exception& e) { LOGWARN(1, "failed to to bind port " << port << " for ZMQ publisher, error " << e.what()); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } if (!m_publisherPort) { LOGERR(1, "failed to to bind ZMQ publisher port, aborting"); throw zmq::error_t(EFSM); } std::string addr(addr_buf); LOGINFO(5, "listening on " << addr << " for internal communications"); m_subscriber.set(zmq::sockopt::connect_timeout, 1000); if (!connect(addr, false)) { throw zmq::error_t(EFSM); } if (!m_proxy.empty()) { m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer(m_proxy.c_str(), m_proxy.length())); } log::Stream s(addr_buf); s << "tcp://" << address << ':' << m_zmqPort << '\0'; addr.assign(addr_buf); if (!connect(addr, true)) { throw zmq::error_t(EFSM); } m_address = std::move(addr); m_subscriber.set(zmq::sockopt::socks_proxy, zmq::const_buffer()); m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main"); m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data"); m_subscriber.set(zmq::sockopt::subscribe, "json-minimal-txpool_add"); const int err = uv_thread_create(&m_worker, run_wrapper, this); if (err) { LOGERR(1, "failed to start ZMQ worker thread, error " << uv_err_name(err)); throw zmq::error_t(EMTHREAD); } } ZMQReader::~ZMQReader() { LOGINFO(1, "stopping"); stop(); uv_thread_join(&m_worker); delete m_monitor; LOGINFO(1, "stopped"); } void ZMQReader::stop() { if (m_stopped.exchange(true)) { return; } try { static constexpr char dummy_msg[] = "json-minimal-txpool_add:[]"; m_publisher.send(zmq::const_buffer(dummy_msg, sizeof(dummy_msg) - 1)); } catch (const std::exception& e) { LOGERR(1, "exception " << e.what()); } } void ZMQReader::monitor_thread(void* arg) { LOGINFO(1, "monitor thread ready"); set_thread_name("ZMQ monitor"); ZMQReader* r = reinterpret_cast(arg); do {} while (!r->m_stopped && r->m_monitor->m_connected && r->m_monitor->check_event(-1)); // If not connected anymore, shut down ZMQReader entirely r->stop(); LOGINFO(1, "monitor thread stopped"); } void ZMQReader::run_wrapper(void* arg) { reinterpret_cast(arg)->run(); LOGINFO(1, "worker thread stopped"); } void ZMQReader::run() { m_workerThreadRunning = true; ON_SCOPE_LEAVE([this]() { m_workerThreadRunning = false; }); set_thread_name("ZMQ worker"); zmq_msg_t message = {}; try { int rc = zmq_msg_init(&message); if (rc != 0) { throw zmq::error_t(errno); } const int err = uv_thread_create(&m_monitorThread, monitor_thread, this); if (err) { LOGERR(1, "failed to start ZMQ monitor thread, error " << uv_err_name(err)); throw zmq::error_t(EMTHREAD); } ON_SCOPE_LEAVE([this]() { m_monitor->abort(); uv_thread_join(&m_monitorThread); }); LOGINFO(1, "worker thread ready"); do { rc = zmq_msg_recv(&message, m_subscriber, 0); if (rc < 0) { throw zmq::error_t(errno); } if (m_stopped) { break; } parse(reinterpret_cast(zmq_msg_data(&message)), zmq_msg_size(&message)); } while (true); } catch (const std::exception& e) { LOGERR(1, "exception " << e.what()); } zmq_msg_close(&message); } void ZMQReader::Monitor::on_event_connected(const zmq_event_t&, const char* address) { LOGINFO(1, "connected to " << address); m_connected = true; } void ZMQReader::Monitor::on_event_disconnected(const zmq_event_t&, const char* address) { LOGERR(1, "disconnected from " << address); m_connected = false; } bool ZMQReader::connect(const std::string& address, bool keep_monitor) { static uint64_t id = 0; if (!id) { std::random_device rd; id = (static_cast(rd()) << 32) | static_cast(rd()); id >>= 1; // to avoid MAX_UINT64 case } char buf[64]; // cppcheck-suppress uninitvar log::Stream s(buf); s << "inproc://p2pool-connect-mon-" << id << '\0'; ++id; using namespace std::chrono; const auto start_time = steady_clock::now(); Monitor* monitor = new Monitor(); monitor->init(m_subscriber, buf); m_subscriber.connect(address); while (!monitor->m_connected && monitor->check_event(-1)) { if (monitor->m_connected) { break; } if (duration_cast(steady_clock::now() - start_time).count() >= 1000) { LOGERR(1, "failed to connect to " << address); delete monitor; return false; } } if (keep_monitor) { m_monitor = monitor; } else { delete monitor; } return true; } static std::vector construct_monero_block_blob(rapidjson::Value* value, std::vector& out_transaction_hashes) { out_transaction_hashes.clear(); std::vector empty_blob; #define X(type, name) type name; if (!parseValue(*value, #name, name)) return empty_blob; X(uint8_t, major_version); X(uint8_t, minor_version); X(uint64_t, timestamp); X(std::string, prev_id); X(uint32_t, nonce); #undef X auto miner_tx = value->FindMember("miner_tx"); if ((miner_tx == value->MemberEnd()) || !miner_tx->value.IsObject()) { LOGWARN(3, "construct_monero_block_blob: miner_tx not found or is not an object"); return empty_blob; } uint8_t version; if (!parseValue(miner_tx->value, "version", version)) { LOGWARN(3, "construct_monero_block_blob: version not found"); return empty_blob; } uint64_t unlock_height; if (!parseValue(miner_tx->value, "unlock_time", unlock_height)) { LOGWARN(3, "construct_monero_block_blob: unlock_time not found"); return empty_blob; } std::string extra; if (!parseValue(miner_tx->value, "extra", extra)) { LOGWARN(3, "construct_monero_block_blob: extra not found"); return empty_blob; } auto outputs = miner_tx->value.FindMember("outputs"); if ((outputs == miner_tx->value.MemberEnd()) || !outputs->value.IsArray()) { LOGWARN(3, "construct_monero_block_blob: outputs not found or is not an array"); return empty_blob; } auto tx_hashes = value->FindMember("tx_hashes"); if ((tx_hashes == value->MemberEnd()) || !tx_hashes->value.IsArray()) { LOGWARN(3, "construct_monero_block_blob: tx_hashes not found or is not an array"); return empty_blob; } std::vector blob; blob.reserve(16384); blob.push_back(major_version); blob.push_back(minor_version); writeVarint(timestamp, blob); hash h; if (!from_hex(prev_id.c_str(), prev_id.length(), h)) { LOGWARN(3, "construct_monero_block_blob: invalid prev_id " << prev_id); return empty_blob; } blob.insert(blob.end(), h.h, h.h + HASH_SIZE); blob.insert(blob.end(), reinterpret_cast(&nonce), reinterpret_cast(&nonce) + NONCE_SIZE); MoneroBlockBroadcastHeader data; data.header_size = static_cast(blob.size()); blob.insert(blob.end(), version); writeVarint(unlock_height, blob); blob.push_back(1); blob.push_back(TXIN_GEN); writeVarint(unlock_height - MINER_REWARD_UNLOCK_TIME, blob); auto arr = outputs->value.GetArray(); if (arr.Empty()) { LOGWARN(3, "construct_monero_block_blob: outputs array is empty"); return empty_blob; } writeVarint(arr.Size(), blob); for (auto* i = arr.begin(); i != arr.end(); ++i) { auto amount = i->FindMember("amount"); if ((amount == i->MemberEnd()) || !amount->value.IsUint64()) { LOGWARN(3, "construct_monero_block_blob: amount not found or is not UInt64"); return empty_blob; } auto to_tagged_key = i->FindMember("to_tagged_key"); if ((to_tagged_key == i->MemberEnd()) || !to_tagged_key->value.IsObject()) { LOGWARN(3, "construct_monero_block_blob: to_tagged_key not found or is not an object"); return empty_blob; } auto key = to_tagged_key->value.FindMember("key"); if ((key == to_tagged_key->value.MemberEnd()) || !key->value.IsString()) { LOGWARN(3, "construct_monero_block_blob: key not found or is not a string"); return empty_blob; } auto view_tag = to_tagged_key->value.FindMember("view_tag"); if ((view_tag == to_tagged_key->value.MemberEnd()) || !view_tag->value.IsString()) { LOGWARN(3, "construct_monero_block_blob: view_tag not found or is not a string"); return empty_blob; } writeVarint(amount->value.GetUint64(), blob); blob.push_back(TXOUT_TO_TAGGED_KEY); if (!from_hex(key->value.GetString(), key->value.GetStringLength(), h)) { LOGWARN(3, "construct_monero_block_blob: invalid key " << key->value.GetString()); return empty_blob; } blob.insert(blob.end(), h.h, h.h + HASH_SIZE); std::vector t; if (!from_hex(view_tag->value.GetString(), view_tag->value.GetStringLength(), t) || (t.size() != 1)) { LOGWARN(3, "construct_monero_block_blob: invalid view_tag " << view_tag->value.GetString()); return empty_blob; } blob.push_back(t[0]); } std::vector t; if (!from_hex(extra.c_str(), extra.length(), t) || (t.size() < HASH_SIZE + 1)) { LOGWARN(3, "construct_monero_block_blob: invalid extra " << extra); return empty_blob; } writeVarint(t.size(), blob); blob.insert(blob.end(), t.begin(), t.end()); blob.push_back(0); data.miner_tx_size = static_cast(blob.size()) - data.header_size; auto arr2 = tx_hashes->value.GetArray(); writeVarint(arr2.Size(), blob); out_transaction_hashes.reserve(arr2.Size()); for (auto* i = arr2.begin(); i != arr2.end(); ++i) { if (!i->IsString()) { LOGWARN(3, "construct_monero_block_blob: tx_hash is not a string"); return empty_blob; } if (!from_hex(i->GetString(), i->GetStringLength(), h)) { LOGWARN(3, "construct_monero_block_blob: invalid tx_hash " << i->GetString()); return empty_blob; } blob.insert(blob.end(), h.h, h.h + HASH_SIZE); out_transaction_hashes.emplace_back(h); } const uint8_t* p = reinterpret_cast(&data); blob.insert(blob.begin(), p, p + sizeof(data)); return blob; } void ZMQReader::parse(char* data, size_t size) { char* value = data; const char* end = data + size; while ((value < end) && (*value != ':')) { ++value; } if (value >= end) { LOGWARN(1, "ZeroMQ message doesn't have ':' delimiter, skipping it"); return; } *value = '\0'; ++value; using namespace rapidjson; Document doc; if (doc.Parse(value, end - value).HasParseError()) { LOGWARN(1, "ZeroMQ message failed to parse, skipping it"); return; } if (strcmp(data, "json-minimal-txpool_add") == 0) { if (!doc.IsArray()) { LOGWARN(1, "json-minimal-txpool_add is not an array, skipping it"); return; } m_tx.time_received = seconds_since_epoch(); for (SizeType i = 0, n = doc.Size(); i < n; ++i) { const auto& v = doc[i]; if (PARSE(v, m_tx, id) && PARSE(v, m_tx, blob_size) && PARSE(v, m_tx, weight) && PARSE(v, m_tx, fee)) { m_handler->handle_tx(m_tx); } else { LOGWARN(1, "transaction #" << (i + 1) << " in json-minimal-txpool_add failed to parse, skipped it"); } } } else if (strcmp(data, "json-full-miner_data") == 0) { if (!doc.IsObject()) { LOGWARN(1, "json-full-miner_data is not an object, skipping it"); return; } if (!PARSE(doc, m_minerData, major_version) || !PARSE(doc, m_minerData, height) || !PARSE(doc, m_minerData, prev_id) || !PARSE(doc, m_minerData, seed_hash) || !PARSE(doc, m_minerData, median_weight) || !PARSE(doc, m_minerData, already_generated_coins) || !PARSE(doc, m_minerData, difficulty)) { LOGWARN(1, "json-full-miner_data failed to parse, skipping it"); return; } if (!doc.HasMember("tx_backlog")) { LOGWARN(1, "json-full-miner_data doesn't have 'tx_backlog', skipping it"); return; } const auto& tx_backlog = doc["tx_backlog"]; if (!tx_backlog.IsArray()) { LOGWARN(1, "'tx_backlog' in json-full-miner_data is not an array, skipping it"); return; } m_minerData.tx_backlog.clear(); const SizeType n = tx_backlog.Size(); m_minerData.tx_backlog.reserve(n); for (SizeType i = 0; i < n; ++i) { const auto& v = tx_backlog[i]; if (PARSE(v, m_tx, id) && PARSE(v, m_tx, weight) && PARSE(v, m_tx, fee)) { m_minerData.tx_backlog.push_back(m_tx); } else { LOGWARN(1, "transaction #" << (i + 1) << " in json-full-miner_data `tx_backlog` failed to parse, skipped it"); } } m_handler->handle_miner_data(m_minerData); } else if (strcmp(data, "json-full-chain_main") == 0) { if (!doc.IsArray()) { LOGWARN(1, "json-full-chain_main is not an array, skipping it"); return; } auto arr = doc.GetArray(); std::vector> blobs; blobs.reserve(arr.Size()); for (auto* i = arr.begin(); i != arr.end(); ++i) { std::vector tx_hashes; blobs.emplace_back(construct_monero_block_blob(i, tx_hashes)); if (!PARSE(*i, m_chainmainData, timestamp)) { LOGWARN(1, "json-full-chain_main timestamp failed to parse, skipping it"); continue; } auto it = i->FindMember("miner_tx"); if ((it == i->MemberEnd()) || !it->value.IsObject()) { LOGWARN(1, "json-full-chain_main miner_tx not found, skipping it"); continue; } auto extra_it = it->value.FindMember("extra"); if ((extra_it == it->value.MemberEnd()) || !extra_it->value.IsString()) { LOGWARN(1, "json-full-chain_main extra not found, skipping it"); continue; } auto inputs_it = it->value.FindMember("inputs"); if ((inputs_it == it->value.MemberEnd()) || !inputs_it->value.IsArray()) { LOGWARN(1, "json-full-chain_main inputs not found, skipping it"); continue; } // Get block reward from miner_tx outputs m_chainmainData.reward = 0; auto outputs_it = it->value.FindMember("outputs"); if ((outputs_it != it->value.MemberEnd()) && outputs_it->value.IsArray()) { auto outputs = outputs_it->value.GetArray(); for (SizeType j = 0, n = outputs.Size(); j < n; ++j) { if (outputs[j].IsObject()) { auto amount_it = outputs[j].FindMember("amount"); if (amount_it != outputs[j].MemberEnd() && amount_it->value.IsUint64()) { m_chainmainData.reward += amount_it->value.GetUint64(); } } } } else { LOGWARN(1, "json-full-chain_main outputs not found"); } auto inputs = inputs_it->value.GetArray(); if ((inputs.Size() == 0) || !inputs[0].IsObject()) { LOGWARN(1, "json-full-chain_main inputs is not an array, skipping it"); continue; } it = inputs[0].FindMember("gen"); if ((it == inputs[0].MemberEnd()) || !it->value.IsObject()) { LOGWARN(1, "json-full-chain_main gen not found, skipping it"); continue; } if (!PARSE(it->value, m_chainmainData, height)) { LOGWARN(1, "json-full-chain_main height not found, skipping it"); continue; } m_handler->handle_chain_main(m_chainmainData, extra_it->value.GetString(), tx_hashes); } m_handler->handle_monero_block_broadcast(std::move(blobs)); } } } // namespace p2pool