Files
p2pool-salvium/src/redis_storage.cpp
T

543 lines
14 KiB
C++

/*
* This file is part of p2pool-salvium-redis
* Redis storage backend for p2pool-salvium observer mode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3.
*/
#ifdef WITH_REDIS
#include "common.h"
#include "redis_storage.h"
#include <hiredis/hiredis.h>
#include <cstring>
LOG_CATEGORY(RedisStorage)
namespace p2pool {
// Global instance
static RedisStorage* g_redis_storage = nullptr;
RedisStorage& get_redis_storage() {
if (!g_redis_storage) {
g_redis_storage = new RedisStorage();
}
return *g_redis_storage;
}
RedisStorage::RedisStorage()
: m_ctx(nullptr)
, m_port(6379)
, m_db(0)
, m_prefix("p2pool:")
, m_connected(false)
{
}
RedisStorage::~RedisStorage() {
disconnect();
}
bool RedisStorage::connect(const std::string& host, int port, int db) {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_connected && m_ctx) {
return true;
}
m_host = host;
m_port = port;
m_db = db;
struct timeval timeout = { 5, 0 }; // 5 seconds
m_ctx = redisConnectWithTimeout(host.c_str(), port, timeout);
if (!m_ctx) {
LOGERR(1, "Failed to allocate Redis context");
return false;
}
if (m_ctx->err) {
LOGERR(1, "Redis connection error: " << static_cast<const char*>(m_ctx->errstr));
redisFree(m_ctx);
m_ctx = nullptr;
return false;
}
// Select database
if (db != 0) {
redisReply* reply = static_cast<redisReply*>(redisCommand(m_ctx, "SELECT %d", db));
if (!reply || reply->type == REDIS_REPLY_ERROR) {
LOGERR(1, "Failed to select Redis database " << db);
if (reply) freeReplyObject(reply);
redisFree(m_ctx);
m_ctx = nullptr;
return false;
}
freeReplyObject(reply);
}
m_connected = true;
LOGINFO(1, "Connected to Redis at " << host << ":" << port << " db=" << db);
return true;
}
void RedisStorage::disconnect() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_ctx) {
redisFree(m_ctx);
m_ctx = nullptr;
}
m_connected = false;
}
bool RedisStorage::reconnect_if_needed() {
if (m_connected && m_ctx && m_ctx->err == 0) {
return true;
}
LOGWARN(3, "Redis connection lost, reconnecting...");
if (m_ctx) {
redisFree(m_ctx);
m_ctx = nullptr;
}
m_connected = false;
// Try to reconnect (without lock - caller holds it)
struct timeval timeout = { 5, 0 };
m_ctx = redisConnectWithTimeout(m_host.c_str(), m_port, timeout);
if (!m_ctx || m_ctx->err) {
LOGERR(1, "Redis reconnection failed");
if (m_ctx) {
redisFree(m_ctx);
m_ctx = nullptr;
}
return false;
}
// Select database
if (m_db != 0) {
redisReply* reply = static_cast<redisReply*>(redisCommand(m_ctx, "SELECT %d", m_db));
if (!reply || reply->type == REDIS_REPLY_ERROR) {
if (reply) freeReplyObject(reply);
redisFree(m_ctx);
m_ctx = nullptr;
return false;
}
freeReplyObject(reply);
}
m_connected = true;
LOGINFO(1, "Redis reconnected successfully");
return true;
}
std::string RedisStorage::prefixed_key(const std::string& key) const {
return m_prefix + key;
}
bool RedisStorage::ping() {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
redisReply* reply = static_cast<redisReply*>(redisCommand(m_ctx, "PING"));
if (!reply) return false;
bool ok = (reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
freeReplyObject(reply);
return ok;
}
bool RedisStorage::set(const std::string& key, const std::string& value) {
return set(key, reinterpret_cast<const uint8_t*>(value.data()), value.size());
}
bool RedisStorage::set(const std::string& key, const uint8_t* data, size_t size) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "SET %s %b", pkey.c_str(), data, size)
);
if (!reply) {
LOGERR(3, "Redis SET failed for key: " << key);
return false;
}
bool ok = (reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "OK") == 0);
freeReplyObject(reply);
return ok;
}
bool RedisStorage::get(const std::string& key, std::string& value) {
std::vector<uint8_t> data;
if (!get(key, data)) return false;
value.assign(data.begin(), data.end());
return true;
}
bool RedisStorage::get(const std::string& key, std::vector<uint8_t>& data) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "GET %s", pkey.c_str())
);
if (!reply) {
return false;
}
if (reply->type == REDIS_REPLY_NIL) {
freeReplyObject(reply);
return false;
}
if (reply->type != REDIS_REPLY_STRING) {
freeReplyObject(reply);
return false;
}
data.assign(
reinterpret_cast<uint8_t*>(reply->str),
reinterpret_cast<uint8_t*>(reply->str) + reply->len
);
freeReplyObject(reply);
return true;
}
bool RedisStorage::del(const std::string& key) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "DEL %s", pkey.c_str())
);
if (!reply) return false;
freeReplyObject(reply);
return true;
}
bool RedisStorage::exists(const std::string& key) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "EXISTS %s", pkey.c_str())
);
if (!reply) return false;
bool exists = (reply->type == REDIS_REPLY_INTEGER && reply->integer > 0);
freeReplyObject(reply);
return exists;
}
bool RedisStorage::hset(const std::string& key, const std::string& field, const uint8_t* data, size_t size) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "HSET %s %s %b", pkey.c_str(), field.c_str(), data, size)
);
if (!reply) {
LOGERR(3, "Redis HSET failed for key: " << key << " field: " << field);
return false;
}
freeReplyObject(reply);
return true;
}
bool RedisStorage::hget(const std::string& key, const std::string& field, std::vector<uint8_t>& data) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "HGET %s %s", pkey.c_str(), field.c_str())
);
if (!reply || reply->type == REDIS_REPLY_NIL) {
if (reply) freeReplyObject(reply);
return false;
}
if (reply->type != REDIS_REPLY_STRING) {
freeReplyObject(reply);
return false;
}
data.assign(
reinterpret_cast<uint8_t*>(reply->str),
reinterpret_cast<uint8_t*>(reply->str) + reply->len
);
freeReplyObject(reply);
return true;
}
bool RedisStorage::hdel(const std::string& key, const std::string& field) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "HDEL %s %s", pkey.c_str(), field.c_str())
);
if (!reply) return false;
freeReplyObject(reply);
return true;
}
bool RedisStorage::hgetall(const std::string& key, std::vector<std::pair<std::string, std::vector<uint8_t>>>& result) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "HGETALL %s", pkey.c_str())
);
if (!reply || reply->type != REDIS_REPLY_ARRAY) {
if (reply) freeReplyObject(reply);
return false;
}
result.clear();
for (size_t i = 0; i + 1 < reply->elements; i += 2) {
redisReply* field = reply->element[i];
redisReply* value = reply->element[i + 1];
if (field->type == REDIS_REPLY_STRING && value->type == REDIS_REPLY_STRING) {
std::string fieldStr(field->str, field->len);
std::vector<uint8_t> valueData(
reinterpret_cast<uint8_t*>(value->str),
reinterpret_cast<uint8_t*>(value->str) + value->len
);
result.emplace_back(std::move(fieldStr), std::move(valueData));
}
}
freeReplyObject(reply);
return true;
}
int64_t RedisStorage::hlen(const std::string& key) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return -1;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "HLEN %s", pkey.c_str())
);
if (!reply || reply->type != REDIS_REPLY_INTEGER) {
if (reply) freeReplyObject(reply);
return -1;
}
int64_t len = reply->integer;
freeReplyObject(reply);
return len;
}
bool RedisStorage::lpush(const std::string& key, const std::string& value) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "LPUSH %s %s", pkey.c_str(), value.c_str())
);
if (!reply) return false;
freeReplyObject(reply);
return true;
}
bool RedisStorage::rpush(const std::string& key, const std::string& value) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "RPUSH %s %s", pkey.c_str(), value.c_str())
);
if (!reply) return false;
freeReplyObject(reply);
return true;
}
bool RedisStorage::lrange(const std::string& key, int64_t start, int64_t stop, std::vector<std::string>& result) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "LRANGE %s %lld %lld", pkey.c_str(), start, stop)
);
if (!reply || reply->type != REDIS_REPLY_ARRAY) {
if (reply) freeReplyObject(reply);
return false;
}
result.clear();
for (size_t i = 0; i < reply->elements; ++i) {
if (reply->element[i]->type == REDIS_REPLY_STRING) {
result.emplace_back(reply->element[i]->str, reply->element[i]->len);
}
}
freeReplyObject(reply);
return true;
}
int64_t RedisStorage::llen(const std::string& key) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return -1;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "LLEN %s", pkey.c_str())
);
if (!reply || reply->type != REDIS_REPLY_INTEGER) {
if (reply) freeReplyObject(reply);
return -1;
}
int64_t len = reply->integer;
freeReplyObject(reply);
return len;
}
bool RedisStorage::sadd(const std::string& key, const std::string& member) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "SADD %s %s", pkey.c_str(), member.c_str())
);
if (!reply) return false;
freeReplyObject(reply);
return true;
}
bool RedisStorage::srem(const std::string& key, const std::string& member) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "SREM %s %s", pkey.c_str(), member.c_str())
);
if (!reply) return false;
freeReplyObject(reply);
return true;
}
bool RedisStorage::smembers(const std::string& key, std::vector<std::string>& result) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "SMEMBERS %s", pkey.c_str())
);
if (!reply || reply->type != REDIS_REPLY_ARRAY) {
if (reply) freeReplyObject(reply);
return false;
}
result.clear();
for (size_t i = 0; i < reply->elements; ++i) {
if (reply->element[i]->type == REDIS_REPLY_STRING) {
result.emplace_back(reply->element[i]->str, reply->element[i]->len);
}
}
freeReplyObject(reply);
return true;
}
bool RedisStorage::sismember(const std::string& key, const std::string& member) {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
std::string pkey = prefixed_key(key);
redisReply* reply = static_cast<redisReply*>(
redisCommand(m_ctx, "SISMEMBER %s %s", pkey.c_str(), member.c_str())
);
if (!reply || reply->type != REDIS_REPLY_INTEGER) {
if (reply) freeReplyObject(reply);
return false;
}
bool is_member = (reply->integer > 0);
freeReplyObject(reply);
return is_member;
}
bool RedisStorage::flushdb() {
std::lock_guard<std::mutex> lock(m_mutex);
if (!reconnect_if_needed()) return false;
redisReply* reply = static_cast<redisReply*>(redisCommand(m_ctx, "FLUSHDB"));
if (!reply) return false;
freeReplyObject(reply);
LOGWARN(1, "Redis database flushed!");
return true;
}
} // namespace p2pool
#endif // WITH_REDIS