Add distributed multi-node fetch and explorer FFI exports

Distribute block fetch requests across up to 4 fastest nodes in the
  NodePool, selected by latency-weighted racing. All configured nodes
  are probed but only the top 4 are used for parallel fetching.

  NodePool (salvium-rpc):
  - Add max_fetch_nodes to PoolConfig (default 4)
  - Add force_race() to probe all nodes on demand
  - Add fetch_batch_distributed() with latency-weighted range splitting
  - Add compute_assignments() with 6 unit tests
  - Add DistributedBatchResult type

  Sync engine (salvium-wallet):
  - Call force_race() at sync start to populate latency data
  - Replace all 3 fetch sites with fetch_batch_distributed
  - Simplify PrefetchResult to use DistributedBatchResult

  Explorer FFI (salvium-ffi):
  - Add salvium_daemon_get_blocks_by_height (JSON heights → blocks)
  - Add salvium_daemon_get_transactions (JSON hashes → tx hex)
  - Add salvium_daemon_add_nodes (batch add from JSON array)
  - Add salvium_daemon_force_race (probe all nodes)

  CLI & bench:
  - Add --nodes flag to salvium-wallet-cli and salvium-sync-bench
  - Wire extra nodes into NodePool for sync commands

  Docs:
  - Document multi-node setup in wallet-sync-spec.md
  - Document FFI block/tx fetching in explorer-spec.md
This commit is contained in:
Matt Hess
2026-02-27 18:56:31 +00:00
parent 8c02a452e8
commit aa30b213d2
23 changed files with 2236 additions and 797 deletions
Generated
+1
View File
@@ -2187,6 +2187,7 @@ version = "1.0.7-r012"
dependencies = [
"base64",
"hex",
"log",
"reqwest",
"salvium-types",
"serde",
+10 -3
View File
@@ -121,9 +121,16 @@ pub async fn save_bc(ctx: &AppContext) -> Result {
pub async fn sync_wallet(ctx: &AppContext) -> Result {
let mut wallet = open_wallet(ctx)?;
let daemon = DaemonRpc::new(&ctx.daemon_url);
let pool = salvium_rpc::NodePool::new(salvium_rpc::PoolConfig {
network: wallet.network(),
primary_url: Some(ctx.daemon_url.clone()),
..Default::default()
});
for url in &ctx.extra_nodes {
pool.add_node(url.trim()).await;
}
let info = daemon.get_info().await?;
let info = pool.get_info().await?;
println!("Connected to daemon at {} (height: {})", ctx.daemon_url, info.height);
let wallet_height = wallet.sync_height().unwrap_or(0);
@@ -182,7 +189,7 @@ pub async fn sync_wallet(ctx: &AppContext) -> Result {
});
let no_cancel = std::sync::atomic::AtomicBool::new(false);
let _final_height = wallet.sync(&daemon, Some(&tx), &no_cancel).await?;
let _final_height = wallet.sync(&pool, Some(&tx), &no_cancel).await?;
drop(tx);
let _ = progress_task.await;
+6 -1
View File
@@ -19,6 +19,10 @@ struct Cli {
#[arg(long)]
daemon: Option<String>,
/// Additional node URLs (comma-separated) for distributed sync.
#[arg(long, value_delimiter = ',')]
nodes: Vec<String>,
/// Wallet file path.
#[arg(long)]
wallet_file: Option<String>,
@@ -824,6 +828,7 @@ enum MmsAction {
pub struct AppContext {
pub network: Network,
pub daemon_url: String,
pub extra_nodes: Vec<String>,
pub wallet_path: PathBuf,
}
@@ -838,7 +843,7 @@ impl AppContext {
default_wallet_dir(&cli.network).join("wallet.db")
};
Self { network, daemon_url, wallet_path }
Self { network, daemon_url, extra_nodes: cli.nodes.clone(), wallet_path }
}
}
+328 -13
View File
@@ -3,26 +3,26 @@
use std::ffi::{c_char, c_void};
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::error::{ffi_try_ptr, ffi_try_string};
use crate::error::{ffi_try, ffi_try_ptr, ffi_try_string};
use crate::handles::{borrow_handle, drop_handle};
use crate::strings::c_str_to_str;
use salvium_rpc::DaemonRpc;
use salvium_rpc::{NodePool, PoolConfig};
/// Wrapper that pairs a DaemonRpc with a usage counter.
/// Wrapper that pairs a NodePool with a usage counter.
///
/// `in_use` tracks how many long-running operations (sync) currently hold a
/// reference. `salvium_daemon_close` waits for `in_use == 0` before dropping,
/// preventing use-after-free when the app closes the daemon while a sync is
/// still running on another thread.
pub(crate) struct DaemonHandle {
pub daemon: DaemonRpc,
pub pool: NodePool,
pub in_use: AtomicUsize,
}
impl DaemonHandle {
fn new(daemon: DaemonRpc) -> Self {
Self { daemon, in_use: AtomicUsize::new(0) }
fn new(pool: NodePool) -> Self {
Self { pool, in_use: AtomicUsize::new(0) }
}
}
@@ -39,12 +39,118 @@ impl Drop for DaemonUseGuard<'_> {
///
/// - `url`: null-terminated URL (e.g. "http://127.0.0.1:19081")
///
/// Creates a NodePool with seed nodes for the detected network (based on port)
/// plus the provided URL as the primary/active node.
///
/// Returns an opaque daemon handle, or null on error.
#[no_mangle]
pub unsafe extern "C" fn salvium_daemon_connect(url: *const c_char) -> *mut c_void {
ffi_try_ptr(|| {
let url_str = unsafe { c_str_to_str(url) }?;
Ok(DaemonHandle::new(DaemonRpc::new(url_str)))
let network = detect_network_from_url(url_str);
let pool = NodePool::new(PoolConfig {
network,
primary_url: Some(url_str.to_string()),
..Default::default()
});
Ok(DaemonHandle::new(pool))
})
}
/// Create a pool with seed nodes for a given network.
///
/// - `network`: 0 = Mainnet, 1 = Testnet, 2 = Stagenet
///
/// Returns an opaque daemon handle, or null on error.
#[no_mangle]
pub unsafe extern "C" fn salvium_daemon_pool_create(network: i32) -> *mut c_void {
ffi_try_ptr(|| {
let net = match network {
0 => salvium_types::constants::Network::Mainnet,
1 => salvium_types::constants::Network::Testnet,
2 => salvium_types::constants::Network::Stagenet,
_ => {
return Err(format!(
"invalid network: {network} (expected 0=Mainnet, 1=Testnet, 2=Stagenet)"
))
}
};
let pool = NodePool::new(PoolConfig {
network: net,
..Default::default()
});
Ok(DaemonHandle::new(pool))
})
}
/// Add a user node to the pool.
///
/// Returns 0 on success, -1 on error.
#[no_mangle]
pub unsafe extern "C" fn salvium_daemon_add_node(
handle: *mut c_void,
url: *const c_char,
) -> i32 {
ffi_try(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let url_str = unsafe { c_str_to_str(url) }?;
let rt = crate::runtime();
rt.block_on(dh.pool.add_node(url_str));
Ok(())
})
}
/// Get the URL of the currently active node.
///
/// Returns a string the caller must free with `salvium_string_free()`.
#[no_mangle]
pub unsafe extern "C" fn salvium_daemon_active_node(handle: *mut c_void) -> *mut c_char {
ffi_try_string(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let rt = crate::runtime();
Ok(rt.block_on(dh.pool.active_url()))
})
}
/// Add multiple user nodes to the pool.
///
/// - `urls_json` / `urls_json_len`: UTF-8 JSON array of URL strings,
/// e.g. `["http://node1:19081", "http://node2:19081"]`
///
/// Returns 0 on success, -1 on error.
#[no_mangle]
pub unsafe extern "C" fn salvium_daemon_add_nodes(
handle: *mut c_void,
urls_json: *const u8,
urls_json_len: usize,
) -> i32 {
ffi_try(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let json_slice = unsafe { std::slice::from_raw_parts(urls_json, urls_json_len) };
let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?;
let urls: Vec<String> =
serde_json::from_str(json_str).map_err(|e| format!("invalid URLs JSON: {e}"))?;
let rt = crate::runtime();
for url in &urls {
rt.block_on(dh.pool.add_node(url.trim()));
}
Ok(())
})
}
/// Force a race across all nodes to populate latency data.
///
/// Call once after connecting / adding nodes so that distributed fetch
/// can distribute work effectively. Blocks until all probes complete.
///
/// Returns 0 on success, -1 on error.
#[no_mangle]
pub unsafe extern "C" fn salvium_daemon_force_race(handle: *mut c_void) -> i32 {
ffi_try(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let rt = crate::runtime();
rt.block_on(dh.pool.force_race());
Ok(())
})
}
@@ -76,7 +182,7 @@ pub unsafe extern "C" fn salvium_daemon_get_info(handle: *mut c_void) -> *mut c_
ffi_try_string(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let rt = crate::runtime();
let info = rt.block_on(dh.daemon.get_info()).map_err(|e| e.to_string())?;
let info = rt.block_on(dh.pool.get_info()).map_err(|e| e.to_string())?;
serde_json::to_string(&info).map_err(|e| e.to_string())
})
}
@@ -89,7 +195,7 @@ pub unsafe extern "C" fn salvium_daemon_get_height(handle: *mut c_void) -> u64 {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }.ok()?;
let rt = crate::runtime();
rt.block_on(dh.daemon.get_height()).ok()
rt.block_on(dh.pool.get_height()).ok()
}));
match result {
Ok(Some(h)) => h,
@@ -105,7 +211,7 @@ pub unsafe extern "C" fn salvium_daemon_is_synchronized(handle: *mut c_void) ->
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }.ok()?;
let rt = crate::runtime();
rt.block_on(dh.daemon.is_synchronized()).ok()
rt.block_on(dh.pool.is_synchronized()).ok()
}));
match result {
Ok(Some(true)) => 1,
@@ -123,7 +229,7 @@ pub unsafe extern "C" fn salvium_daemon_get_fee_estimate(handle: *mut c_void) ->
ffi_try_string(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let rt = crate::runtime();
let fee = rt.block_on(dh.daemon.get_fee_estimate(0)).map_err(|e| e.to_string())?;
let fee = rt.block_on(dh.pool.get_fee_estimate(0)).map_err(|e| e.to_string())?;
serde_json::to_string(&fee).map_err(|e| e.to_string())
})
}
@@ -136,7 +242,7 @@ pub unsafe extern "C" fn salvium_daemon_get_supply_info(handle: *mut c_void) ->
ffi_try_string(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let rt = crate::runtime();
let info = rt.block_on(dh.daemon.get_supply_info()).map_err(|e| e.to_string())?;
let info = rt.block_on(dh.pool.get_supply_info()).map_err(|e| e.to_string())?;
serde_json::to_string(&info).map_err(|e| e.to_string())
})
}
@@ -149,7 +255,216 @@ pub unsafe extern "C" fn salvium_daemon_get_yield_info(handle: *mut c_void) -> *
ffi_try_string(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let rt = crate::runtime();
let info = rt.block_on(dh.daemon.get_yield_info()).map_err(|e| e.to_string())?;
let info = rt.block_on(dh.pool.get_yield_info()).map_err(|e| e.to_string())?;
serde_json::to_string(&info).map_err(|e| e.to_string())
})
}
// ─────────────────────────────────────────────────────────────────────────────
// Explorer endpoints
// ─────────────────────────────────────────────────────────────────────────────
/// Fetch blocks by height for explorer use.
///
/// - `heights_json` / `heights_json_len`: UTF-8 JSON array of heights, e.g. `[100, 101, 102]`
/// - `out_buf` / `out_len`: caller-allocated output buffer
///
/// Returns bytes written to `out_buf`, or 0 on error (check `salvium_last_error()`).
///
/// Output JSON: `[{"height":N,"block_blob":"hex...","miner_tx_hash":"...","tx_hashes":["..."]}]`
#[no_mangle]
pub unsafe extern "C" fn salvium_daemon_get_blocks_by_height(
handle: *mut c_void,
heights_json: *const u8,
heights_json_len: usize,
out_buf: *mut u8,
out_len: usize,
) -> usize {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| -> Result<usize, String> {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let json_slice =
unsafe { std::slice::from_raw_parts(heights_json, heights_json_len) };
let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?;
let heights: Vec<u64> =
serde_json::from_str(json_str).map_err(|e| format!("invalid heights JSON: {e}"))?;
if heights.is_empty() {
let out = b"[]";
if out.len() > out_len {
return Err("output buffer too small".into());
}
unsafe { std::ptr::copy_nonoverlapping(out.as_ptr(), out_buf, out.len()) };
return Ok(out.len());
}
let rt = crate::runtime();
// Determine if heights form a contiguous range for distributed fetch.
let min_h = *heights.iter().min().unwrap();
let max_h = *heights.iter().max().unwrap();
let is_contiguous = (max_h - min_h + 1) as usize == heights.len();
let (headers, bin_blocks) = if is_contiguous && heights.len() > 1 {
let result = rt
.block_on(dh.pool.fetch_batch_distributed(min_h, max_h))
.map_err(|e| e.to_string())?;
(result.headers, result.bin_blocks)
} else {
let (h, b) = rt.block_on(async {
let h = dh.pool.get_block_headers_range(min_h, max_h).await;
let b = dh.pool.get_blocks_by_height_bin(&heights).await;
(h, b)
});
(h.map_err(|e| e.to_string())?, b.map_err(|e| e.to_string())?)
};
// Build JSON output array.
let mut entries = Vec::with_capacity(heights.len());
for (i, height) in heights.iter().enumerate() {
let header = headers.iter().find(|h| h.height == *height);
let block_blob_hex = if i < bin_blocks.len() {
hex::encode(&bin_blocks[i].block)
} else {
String::new()
};
let miner_tx_hash = header
.and_then(|h| h.miner_tx_hash.as_deref())
.unwrap_or("");
let tx_hashes: Vec<&str> = header
.and_then(|h| h.extra.get("tx_hashes"))
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
.unwrap_or_default();
entries.push(serde_json::json!({
"height": height,
"block_blob": block_blob_hex,
"miner_tx_hash": miner_tx_hash,
"tx_hashes": tx_hashes,
}));
}
let json_out = serde_json::to_string(&entries).map_err(|e| e.to_string())?;
let bytes = json_out.as_bytes();
if bytes.len() > out_len {
return Err(format!(
"output buffer too small: need {} bytes, have {}",
bytes.len(),
out_len
));
}
unsafe { std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_buf, bytes.len()) };
Ok(bytes.len())
}));
match result {
Ok(Ok(n)) => n,
Ok(Err(msg)) => {
crate::error::set_last_error(&msg);
0
}
Err(_) => {
crate::error::set_last_error("panic in salvium_daemon_get_blocks_by_height");
0
}
}
}
/// Fetch transactions by hash for explorer use.
///
/// - `hashes_json` / `hashes_json_len`: UTF-8 JSON array of tx hash strings
/// - `out_buf` / `out_len`: caller-allocated output buffer
///
/// Returns bytes written to `out_buf`, or 0 on error (check `salvium_last_error()`).
///
/// Output JSON: `[{"tx_hash":"...","as_hex":"..."}]`
#[no_mangle]
pub unsafe extern "C" fn salvium_daemon_get_transactions(
handle: *mut c_void,
hashes_json: *const u8,
hashes_json_len: usize,
out_buf: *mut u8,
out_len: usize,
) -> usize {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| -> Result<usize, String> {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let json_slice =
unsafe { std::slice::from_raw_parts(hashes_json, hashes_json_len) };
let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?;
let hashes: Vec<String> =
serde_json::from_str(json_str).map_err(|e| format!("invalid hashes JSON: {e}"))?;
if hashes.is_empty() {
let out = b"[]";
if out.len() > out_len {
return Err("output buffer too small".into());
}
unsafe { std::ptr::copy_nonoverlapping(out.as_ptr(), out_buf, out.len()) };
return Ok(out.len());
}
let rt = crate::runtime();
let hash_refs: Vec<&str> = hashes.iter().map(|s| s.as_str()).collect();
let txs = rt
.block_on(dh.pool.get_transactions(&hash_refs, false))
.map_err(|e| e.to_string())?;
let entries: Vec<serde_json::Value> = txs
.iter()
.map(|tx| {
serde_json::json!({
"tx_hash": tx.tx_hash,
"as_hex": tx.as_hex,
})
})
.collect();
let json_out = serde_json::to_string(&entries).map_err(|e| e.to_string())?;
let bytes = json_out.as_bytes();
if bytes.len() > out_len {
return Err(format!(
"output buffer too small: need {} bytes, have {}",
bytes.len(),
out_len
));
}
unsafe { std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_buf, bytes.len()) };
Ok(bytes.len())
}));
match result {
Ok(Ok(n)) => n,
Ok(Err(msg)) => {
crate::error::set_last_error(&msg);
0
}
Err(_) => {
crate::error::set_last_error("panic in salvium_daemon_get_transactions");
0
}
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Helpers
// ─────────────────────────────────────────────────────────────────────────────
/// Detect network from the port in a daemon URL.
fn detect_network_from_url(url: &str) -> salvium_types::constants::Network {
use salvium_rpc::ports;
use salvium_types::constants::Network;
// Try to extract port from URL.
if let Some(port_str) = url.rsplit(':').next() {
// Strip trailing path components.
let port_str = port_str.split('/').next().unwrap_or(port_str);
if let Ok(port) = port_str.parse::<u16>() {
return match port {
p if p == ports::DAEMON_TESTNET => Network::Testnet,
p if p == ports::DAEMON_STAGENET => Network::Stagenet,
_ => Network::Mainnet,
};
}
}
Network::Mainnet
}
+20 -5
View File
@@ -129,7 +129,10 @@ pub unsafe extern "C" fn salvium_wallet_transfer(
let priority = parse_priority(&params.priority);
let rt = crate::runtime();
rt.block_on(async { do_transfer(&wh.wallet, &dh.daemon, &params, priority).await })
rt.block_on(async {
let daemon = dh.pool.active_daemon().await;
do_transfer(&wh.wallet, &daemon, &params, priority).await
})
})
}
@@ -169,7 +172,10 @@ pub unsafe extern "C" fn salvium_wallet_stake(
let priority = parse_priority(&params.priority);
let rt = crate::runtime();
rt.block_on(async { do_stake(&wh.wallet, &dh.daemon, &params, priority, false).await })
rt.block_on(async {
let daemon = dh.pool.active_daemon().await;
do_stake(&wh.wallet, &daemon, &params, priority, false).await
})
})
}
@@ -202,7 +208,10 @@ pub unsafe extern "C" fn salvium_wallet_stake_dry_run(
let priority = parse_priority(&params.priority);
let rt = crate::runtime();
rt.block_on(async { do_stake(&wh.wallet, &dh.daemon, &params, priority, true).await })
rt.block_on(async {
let daemon = dh.pool.active_daemon().await;
do_stake(&wh.wallet, &daemon, &params, priority, true).await
})
})
}
@@ -244,7 +253,10 @@ pub unsafe extern "C" fn salvium_wallet_sweep(
let priority = parse_priority(&params.priority);
let rt = crate::runtime();
rt.block_on(async { do_sweep(&wh.wallet, &dh.daemon, &params, priority).await })
rt.block_on(async {
let daemon = dh.pool.active_daemon().await;
do_sweep(&wh.wallet, &daemon, &params, priority).await
})
})
}
@@ -281,7 +293,10 @@ pub unsafe extern "C" fn salvium_wallet_transfer_dry_run(
let priority = parse_priority(&params.priority);
let rt = crate::runtime();
rt.block_on(async { do_transfer(&wh.wallet, &dh.daemon, &params, priority).await })
rt.block_on(async {
let daemon = dh.pool.active_daemon().await;
do_transfer(&wh.wallet, &daemon, &params, priority).await
})
})
}
+2 -2
View File
@@ -402,7 +402,7 @@ pub unsafe extern "C" fn salvium_wallet_sync(
}
});
let result = handle.wallet.sync(&dh.daemon, Some(&tx), &handle.sync_cancel).await;
let result = handle.wallet.sync(&dh.pool, Some(&tx), &handle.sync_cancel).await;
drop(tx); // Close channel so forwarder exits.
let _ = forwarder.await;
@@ -410,7 +410,7 @@ pub unsafe extern "C" fn salvium_wallet_sync(
} else {
handle
.wallet
.sync(&dh.daemon, None, &handle.sync_cancel)
.sync(&dh.pool, None, &handle.sync_cancel)
.await
.map(|_| ())
.map_err(|e| e.to_string())
+6 -2
View File
@@ -45,7 +45,7 @@
//! cargo test -p salvium-ffi --test wallet_sync -- --ignored --nocapture
//! ```
use salvium_rpc::DaemonRpc;
use salvium_rpc::{NodePool, PoolConfig};
use salvium_types::constants::Network;
use salvium_wallet::{SyncEvent, Wallet, WalletKeys};
use std::time::Instant;
@@ -178,7 +178,11 @@ async fn wallet_sync_and_balance_check() {
Wallet::open(keys, db_path.to_str().unwrap(), &db_key).expect("failed to open wallet");
// ── Connect to daemon ───────────────────────────────────────────────
let daemon = DaemonRpc::new(&daemon_url);
let daemon = NodePool::new(PoolConfig {
network,
primary_url: Some(daemon_url.clone()),
..Default::default()
});
let info = daemon.get_info().await.expect("failed to connect to daemon");
println!("\n Daemon height: {}", info.height);
println!(" Daemon synchronized: {}", info.synchronized);
+1
View File
@@ -13,6 +13,7 @@ serde_json = "1"
thiserror = "2"
hex = "0.4"
base64 = "0.22"
log.workspace = true
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
+2
View File
@@ -20,12 +20,14 @@
pub mod client;
pub mod daemon;
pub mod error;
pub mod pool;
pub mod portable_storage;
pub mod wallet_rpc;
pub use client::RpcClient;
pub use daemon::DaemonRpc;
pub use error::RpcError;
pub use pool::{DistributedBatchResult, NodePool, PoolConfig};
pub use wallet_rpc::WalletRpc;
/// Seed nodes per network.
File diff suppressed because it is too large Load Diff
+25 -11
View File
@@ -9,7 +9,7 @@
use clap::Parser;
use salvium_crypto::storage::{OutputQuery, TxQuery};
use salvium_rpc::DaemonRpc;
use salvium_rpc::{NodePool, PoolConfig};
use salvium_types::constants::{self, Network};
use salvium_wallet::{SyncEvent, Wallet, WalletKeys, WalletType};
use std::time::Instant;
@@ -40,10 +40,14 @@ struct Args {
#[arg(long)]
spend_pub: Option<String>,
/// Daemon RPC URL
/// Daemon RPC URL (primary node)
#[arg(long)]
daemon: Option<String>,
/// Additional node URLs (comma-separated) for distributed fetch
#[arg(long, value_delimiter = ',')]
nodes: Vec<String>,
/// Network: mainnet, testnet, stagenet
#[arg(long, default_value = "mainnet")]
network: String,
@@ -130,10 +134,17 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
// ── 1. Determine wallet type and source label ───────────────────────
let source_label;
// ── 2. Connect to daemon ────────────────────────────────────────────
let daemon = DaemonRpc::new(daemon_url);
// ── 2. Connect to daemon (via NodePool) ──────────────────────────────
let pool = NodePool::new(PoolConfig {
network,
primary_url: Some(daemon_url.to_string()),
..Default::default()
});
for url in &args.nodes {
pool.add_node(url.trim()).await;
}
let info =
daemon.get_info().await.map_err(|e| format!("cannot reach daemon at {daemon_url}: {e}"))?;
pool.get_info().await.map_err(|e| format!("cannot reach daemon at {daemon_url}: {e}"))?;
let daemon_height = info.height;
let synchronized = info.synchronized;
@@ -211,8 +222,9 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
println!();
// ── 4. Sync with progress ───────────────────────────────────────────
let blocks_to_sync = daemon_height.saturating_sub(args.restore_height);
println!("Syncing {} blocks...", blocks_to_sync);
let sync_start_height = wallet.sync_height().unwrap_or(0);
let blocks_to_sync = daemon_height.saturating_sub(sync_start_height);
println!("Syncing {} blocks (from height {})...", blocks_to_sync, sync_start_height);
let (tx, mut rx) = mpsc::channel::<SyncEvent>(64);
let start = Instant::now();
@@ -246,7 +258,8 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
} else {
100.0
};
let bps = if elapsed > 0.0 { current_height as f64 / elapsed } else { 0.0 };
let blocks_synced = current_height.saturating_sub(sync_start_height);
let bps = if elapsed > 0.0 { blocks_synced as f64 / elapsed } else { 0.0 };
let err_suffix = if parse_errors > 0 {
format!(" | {} parse errors", parse_errors)
} else {
@@ -275,7 +288,8 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
}
SyncEvent::Complete { height } => {
let elapsed = start.elapsed().as_secs_f64();
let bps = if elapsed > 0.0 { height as f64 / elapsed } else { 0.0 };
let blocks_synced = height.saturating_sub(sync_start_height);
let bps = if elapsed > 0.0 { blocks_synced as f64 / elapsed } else { 0.0 };
println!(
" Height {:>6}/{} (100.0%) | sync complete | {:.1}s | {:.0} blocks/s",
height, height, elapsed, bps
@@ -292,7 +306,7 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
});
let no_cancel = std::sync::atomic::AtomicBool::new(false);
let sync_result = wallet.sync(&daemon, Some(&tx), &no_cancel).await;
let sync_result = wallet.sync(&pool, Some(&tx), &no_cancel).await;
drop(tx); // close channel so progress task finishes
let (final_parse_errors, final_empty_blobs) = progress_handle.await.unwrap_or((0, 0));
@@ -300,7 +314,7 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
let elapsed = start.elapsed();
// ── 5. Gather and print results ─────────────────────────────────────
let actual_blocks_synced = sync_height.saturating_sub(args.restore_height);
let actual_blocks_synced = sync_height.saturating_sub(sync_start_height);
let blocks_per_sec = if elapsed.as_secs_f64() > 0.0 {
actual_blocks_synced as f64 / elapsed.as_secs_f64()
} else {
+1
View File
@@ -7,6 +7,7 @@
use crate::keys::WalletKeys;
/// Keys and subaddress maps needed for output scanning.
#[derive(Clone)]
pub struct ScanContext {
// CryptoNote scanning.
pub cn_view_secret: [u8; 32],
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -204,13 +204,13 @@ impl Wallet {
#[cfg(not(target_arch = "wasm32"))]
pub async fn sync(
&mut self,
daemon: &salvium_rpc::DaemonRpc,
pool: &salvium_rpc::NodePool,
event_tx: Option<&tokio::sync::mpsc::Sender<SyncEvent>>,
cancel: &std::sync::atomic::AtomicBool,
) -> Result<u64, WalletError> {
let lock_period =
salvium_types::constants::network_config(self.network()).stake_lock_period;
SyncEngine::sync(daemon, &self.db, &mut self.scan_context, lock_period, event_tx, cancel)
SyncEngine::sync(pool, &self.db, &mut self.scan_context, lock_period, event_tx, cancel)
.await
}
+14 -9
View File
@@ -18,7 +18,8 @@
//! Resume from a specific fork:
//! RESUME_FROM_HF=6 cargo test -p salvium-wallet --test full_testnet -- --ignored --nocapture
use salvium_rpc::daemon::{DaemonRpc, OutputRequest};
use salvium_rpc::daemon::OutputRequest;
use salvium_rpc::{NodePool, PoolConfig};
use salvium_tx::builder::{Destination, PreparedInput, TransactionBuilder};
use salvium_tx::decoy::{DecoySelector, DEFAULT_RING_SIZE};
use salvium_tx::fee::{self, FeePriority};
@@ -216,7 +217,7 @@ fn auto_resume_hf(daemon_height: u64) -> u8 {
resume
}
async fn get_daemon_height(daemon: &DaemonRpc) -> u64 {
async fn get_daemon_height(daemon: &NodePool) -> u64 {
let info = daemon.get_info().await.expect("failed to get daemon info");
info.height
}
@@ -249,7 +250,7 @@ struct MiningStats {
/// Mine blocks until the daemon reaches `target_height`.
async fn mine_to(
daemon: &DaemonRpc,
daemon: &NodePool,
target_height: u64,
address: &str,
daemon_url: &str,
@@ -397,7 +398,7 @@ impl TestFixture {
// ─── Test Transactor ─────────────────────────────────────────────────────────
struct TestTransactor<'a> {
daemon: &'a DaemonRpc,
daemon: &'a NodePool,
wallet: &'a Wallet,
}
@@ -409,7 +410,7 @@ struct TxResult {
}
impl<'a> TestTransactor<'a> {
fn new(daemon: &'a DaemonRpc, wallet: &'a Wallet) -> Self {
fn new(daemon: &'a NodePool, wallet: &'a Wallet) -> Self {
Self { daemon, wallet }
}
@@ -1035,7 +1036,7 @@ struct SyncStats {
outputs_found: usize,
}
async fn sync_wallet_checked(wallet: &mut Wallet, daemon: &DaemonRpc, label: &str) -> SyncStats {
async fn sync_wallet_checked(wallet: &mut Wallet, daemon: &NodePool, label: &str) -> SyncStats {
println!(" Syncing wallet {}...", label);
let (event_tx, mut event_rx) = tokio::sync::mpsc::channel::<SyncEvent>(256);
@@ -1438,7 +1439,7 @@ fn diagnose_carrot_scan(tx_bytes: &[u8], wallet: &Wallet, label: &str) {
/// Run full TX tests for era boundaries (HF2, HF6, HF10).
async fn run_full_tests(
daemon: &DaemonRpc,
daemon: &NodePool,
daemon_url: &str,
fixture: &mut TestFixture,
fork: &ForkSpec,
@@ -1672,7 +1673,7 @@ async fn run_full_tests(
/// Run lightweight transfer test at intermediate forks.
async fn run_lightweight_test(
daemon: &DaemonRpc,
daemon: &NodePool,
fixture: &mut TestFixture,
fork: &ForkSpec,
stats: &mut RunStats,
@@ -1702,7 +1703,11 @@ async fn full_testnet_hardfork_progression() {
let url = daemon_url();
let env_resume_hf = resume_from_hf();
let daemon = DaemonRpc::new(&url);
let daemon = NodePool::new(PoolConfig {
network: Network::Testnet,
primary_url: Some(url.clone()),
..Default::default()
});
let info = daemon.get_info().await.expect("cannot connect to daemon");
println!(" Daemon: {}", url);
+8 -3
View File
@@ -8,7 +8,8 @@
//!
//! Ported from: test/burn-integration.test.js + test/burn-transaction.test.js
use salvium_rpc::daemon::{DaemonRpc, OutputRequest};
use salvium_rpc::daemon::OutputRequest;
use salvium_rpc::{NodePool, PoolConfig};
use salvium_tx::builder::{Destination, PreparedInput, TransactionBuilder};
use salvium_tx::decoy::{DecoySelector, DEFAULT_RING_SIZE};
use salvium_tx::fee::{self, FeePriority};
@@ -22,9 +23,13 @@ use std::path::PathBuf;
const DAEMON_URL: &str = "http://node12.whiskymine.io:29081";
const BURN_AMOUNT: u64 = 10_000_000; // 0.1 SAL
fn daemon() -> DaemonRpc {
fn daemon() -> NodePool {
let url = std::env::var("TESTNET_DAEMON_URL").unwrap_or_else(|_| DAEMON_URL.to_string());
DaemonRpc::new(&url)
NodePool::new(PoolConfig {
network: salvium_types::constants::Network::Testnet,
primary_url: Some(url),
..Default::default()
})
}
fn testnet_wallet_dir() -> PathBuf {
@@ -8,7 +8,8 @@
//!
//! Ported from: test/convert-integration.test.js + test/convert-transaction.test.js
use salvium_rpc::daemon::{DaemonRpc, OutputRequest};
use salvium_rpc::daemon::OutputRequest;
use salvium_rpc::{NodePool, PoolConfig};
use salvium_tx::builder::{Destination, PreparedInput, TransactionBuilder};
use salvium_tx::decoy::{DecoySelector, DEFAULT_RING_SIZE};
use salvium_tx::fee::{self, FeePriority};
@@ -22,9 +23,13 @@ use std::path::PathBuf;
const DAEMON_URL: &str = "http://node12.whiskymine.io:29081";
const CONVERT_AMOUNT: u64 = 1_000_000_000; // 1 SAL
fn daemon() -> DaemonRpc {
fn daemon() -> NodePool {
let url = std::env::var("TESTNET_DAEMON_URL").unwrap_or_else(|_| DAEMON_URL.to_string());
DaemonRpc::new(&url)
NodePool::new(PoolConfig {
network: salvium_types::constants::Network::Testnet,
primary_url: Some(url),
..Default::default()
})
}
fn testnet_wallet_dir() -> PathBuf {
+8 -3
View File
@@ -8,7 +8,8 @@
//!
//! Ported from: test/stake-integration.test.js + test/stake-transaction.test.js
use salvium_rpc::daemon::{DaemonRpc, OutputRequest};
use salvium_rpc::daemon::OutputRequest;
use salvium_rpc::{NodePool, PoolConfig};
use salvium_tx::builder::{Destination, PreparedInput, TransactionBuilder};
use salvium_tx::decoy::{DecoySelector, DEFAULT_RING_SIZE};
use salvium_tx::fee::{self, FeePriority};
@@ -22,9 +23,13 @@ use std::path::PathBuf;
const DAEMON_URL: &str = "http://node12.whiskymine.io:29081";
const STAKE_AMOUNT: u64 = 1_000_000_000; // 1 SAL
fn daemon() -> DaemonRpc {
fn daemon() -> NodePool {
let url = std::env::var("TESTNET_DAEMON_URL").unwrap_or_else(|_| DAEMON_URL.to_string());
DaemonRpc::new(&url)
NodePool::new(PoolConfig {
network: salvium_types::constants::Network::Testnet,
primary_url: Some(url),
..Default::default()
})
}
fn testnet_wallet_dir() -> PathBuf {
@@ -7,7 +7,7 @@
//!
//! Ported from: test/integration-subaddress.test.js
use salvium_rpc::daemon::DaemonRpc;
use salvium_rpc::{NodePool, PoolConfig};
use salvium_tx::builder::{Destination, TransactionBuilder};
use salvium_tx::types::*;
use salvium_types::address::{create_address_raw, parse_address, to_integrated_address};
@@ -19,9 +19,13 @@ use std::path::PathBuf;
const DAEMON_URL: &str = "http://node12.whiskymine.io:29081";
fn daemon() -> DaemonRpc {
fn daemon() -> NodePool {
let url = std::env::var("TESTNET_DAEMON_URL").unwrap_or_else(|_| DAEMON_URL.to_string());
DaemonRpc::new(&url)
NodePool::new(PoolConfig {
network: Network::Testnet,
primary_url: Some(url),
..Default::default()
})
}
fn testnet_wallet_dir() -> PathBuf {
+7 -3
View File
@@ -7,7 +7,7 @@
//!
//! Ported from: test/integration-sync.test.js
use salvium_rpc::daemon::DaemonRpc;
use salvium_rpc::{NodePool, PoolConfig};
use salvium_types::constants::Network;
use salvium_wallet::{decrypt_js_wallet, Wallet, WalletKeys};
@@ -15,9 +15,13 @@ use std::path::PathBuf;
const DAEMON_URL: &str = "http://node12.whiskymine.io:29081";
fn daemon() -> DaemonRpc {
fn daemon() -> NodePool {
let url = std::env::var("TESTNET_DAEMON_URL").unwrap_or_else(|_| DAEMON_URL.to_string());
DaemonRpc::new(&url)
NodePool::new(PoolConfig {
network: Network::Testnet,
primary_url: Some(url),
..Default::default()
})
}
fn testnet_wallet_dir() -> PathBuf {
@@ -9,7 +9,8 @@
//!
//! Run with: cargo test -p salvium-wallet --test testnet_transfer -- --ignored --nocapture
use salvium_rpc::daemon::{DaemonRpc, OutputRequest};
use salvium_rpc::daemon::OutputRequest;
use salvium_rpc::{NodePool, PoolConfig};
use salvium_tx::builder::{Destination, PreparedInput, TransactionBuilder};
use salvium_tx::decoy::{DecoySelector, DEFAULT_RING_SIZE};
use salvium_tx::fee::{self, FeePriority};
@@ -73,7 +74,11 @@ async fn test_real_testnet_transfer() {
// ── Step 3: Sync against daemon ─────────────────────────────────────────
println!("\n[3/8] Syncing wallet against {}...", DAEMON_URL);
let daemon = DaemonRpc::new(DAEMON_URL);
let daemon = NodePool::new(PoolConfig {
network: Network::Testnet,
primary_url: Some(DAEMON_URL.to_string()),
..Default::default()
});
let info = daemon.get_info().await.expect("cannot connect to daemon");
println!(" Daemon height: {}, synchronized: {}", info.height, info.synchronized);
+88 -4
View File
@@ -344,7 +344,91 @@ verify_rct_signatures_wasm(
): Uint8Array // Returns binary: [0x01] = valid, [0x00, idx_le_4bytes] = invalid at index, [0xFF] = error
```
## 6. Typical Explorer Workflow
## 6. Native FFI — Block & Transaction Fetching
The explorer can also use the native FFI (`libsalvium_ffi.so`) to fetch blocks and
transactions directly from daemon nodes, with automatic multi-node distribution.
### Setup
```c
// Create a pool with seed nodes
void* daemon = salvium_daemon_pool_create(0); // 0=Mainnet
// Add custom nodes (any number — the pool races all, uses fastest 4)
const char* urls = "[\"http://node1:19081\",\"http://node2:19081\"]";
salvium_daemon_add_nodes(daemon, (const uint8_t*)urls, strlen(urls));
// Probe all nodes for latency
salvium_daemon_force_race(daemon);
```
### Fetch Blocks by Height
```c
const char* heights = "[100, 101, 102, 103]";
uint8_t buf[1024 * 1024]; // 1MB output buffer
size_t n = salvium_daemon_get_blocks_by_height(
daemon,
(const uint8_t*)heights, strlen(heights),
buf, sizeof(buf)
);
// n = bytes written, 0 = error (check salvium_last_error())
```
**Output JSON:**
```json
[
{
"height": 100,
"block_blob": "hex-encoded raw block bytes",
"miner_tx_hash": "abc123...",
"tx_hashes": ["def456...", "789abc..."]
}
]
```
Contiguous height ranges are automatically distributed across multiple nodes.
### Fetch Transactions by Hash
```c
const char* hashes = "[\"abc123...\", \"def456...\"]";
uint8_t buf[1024 * 1024];
size_t n = salvium_daemon_get_transactions(
daemon,
(const uint8_t*)hashes, strlen(hashes),
buf, sizeof(buf)
);
```
**Output JSON:**
```json
[
{
"tx_hash": "abc123...",
"as_hex": "hex-encoded raw transaction"
}
]
```
### Multi-Node FFI Functions
| Function | Description |
|----------|-------------|
| `salvium_daemon_pool_create(network)` | Create pool with seed nodes |
| `salvium_daemon_add_node(handle, url)` | Add one node |
| `salvium_daemon_add_nodes(handle, json, len)` | Add multiple nodes (JSON array) |
| `salvium_daemon_force_race(handle)` | Probe all nodes for latency |
| `salvium_daemon_get_blocks_by_height(handle, heights, len, buf, buflen)` | Fetch blocks |
| `salvium_daemon_get_transactions(handle, hashes, len, buf, buflen)` | Fetch transactions |
| `salvium_daemon_close(handle)` | Close and free |
All buffer-based functions return bytes written (0 on error). Check `salvium_last_error()` for details.
## 7. Typical Explorer Workflow
### Block Page
@@ -424,7 +508,7 @@ if (wasm_is_valid_address(userAddress)) {
}
```
## 7. Data Types — All Inputs are Raw Bytes
## 8. Data Types — All Inputs are Raw Bytes
Every function that takes transaction or block data expects **raw binary** (`Uint8Array`), not hex strings. If your daemon returns hex, decode first:
@@ -443,7 +527,7 @@ const txBytes: Uint8Array = hexToBytes(txHex);
const result = parse_and_analyze_tx(txBytes);
```
## 8. Error Handling
## 9. Error Handling
All string-returning functions follow the same pattern:
@@ -462,7 +546,7 @@ For functions returning `Uint8Array`: an empty array (`length === 0`) indicates
For functions returning `boolean`: they return `false` on invalid input.
## 9. Memory / Performance Notes
## 10. Memory / Performance Notes
- The WASM module is ~4MB (uncompressed). Cloudflare Workers supports this.
- All functions are synchronous — no async/await needed after `initSync()`.
+43
View File
@@ -38,6 +38,49 @@ int synced = salvium_daemon_is_synchronized(daemon);
// 1 = synced, 0 = still syncing, -1 = error
```
### Multi-Node Setup (Distributed Fetch)
The daemon handle wraps a `NodePool` that automatically includes 3 seed nodes for the
detected network. You can add any number of custom nodes — the pool races all of them
but only uses the fastest 4 for block fetching.
```c
// Option A: Connect with a primary node (seeds included automatically)
void* daemon = salvium_daemon_connect("http://mynode:19081");
// Option B: Pool with seeds only (no primary)
void* daemon = salvium_daemon_pool_create(0); // 0=Mainnet, 1=Testnet, 2=Stagenet
// Add custom nodes (one at a time)
salvium_daemon_add_node(daemon, "http://fast-node:19081");
// Add custom nodes (batch — JSON array)
const char* urls = "[\"http://node1:19081\",\"http://node2:19081\",\"http://node3:19081\"]";
salvium_daemon_add_nodes(daemon, (const uint8_t*)urls, strlen(urls));
// Race all nodes to measure latency (call once after setup)
salvium_daemon_force_race(daemon);
// Now sync — block fetches are automatically distributed across the fastest 4 nodes
int rc = salvium_wallet_sync(wallet, daemon, callback);
```
**How it works:**
- `force_race()` probes every node in the pool via `get_info` and records latency
- During sync, each batch of blocks is split across up to 4 nodes proportional to their speed
- If a node fails its sub-range, the active node retries it automatically
- Periodic re-racing (every 60s) adapts to changing network conditions
- Single-node fallback when fewer than 2 nodes have latency data
**FFI functions:**
| Function | Description |
|----------|-------------|
| `salvium_daemon_add_node(handle, url)` | Add one node (returns 0/-1) |
| `salvium_daemon_add_nodes(handle, json, len)` | Add multiple nodes from JSON array (returns 0/-1) |
| `salvium_daemon_force_race(handle)` | Probe all nodes for latency (returns 0/-1) |
| `salvium_daemon_active_node(handle)` | Get URL of current fastest node (free with `salvium_string_free`) |
## 3. Create / Open Wallet
Three options — all return an opaque `void*` handle (null on error).