format fixes

This commit is contained in:
Matt Hess
2026-02-27 19:18:06 +00:00
parent aa30b213d2
commit 098f8a3ab9
3 changed files with 156 additions and 204 deletions
+106 -114
View File
@@ -75,10 +75,7 @@ pub unsafe extern "C" fn salvium_daemon_pool_create(network: i32) -> *mut c_void
))
}
};
let pool = NodePool::new(PoolConfig {
network: net,
..Default::default()
});
let pool = NodePool::new(PoolConfig { network: net, ..Default::default() });
Ok(DaemonHandle::new(pool))
})
}
@@ -87,10 +84,7 @@ pub unsafe extern "C" fn salvium_daemon_pool_create(network: i32) -> *mut c_void
///
/// Returns 0 on success, -1 on error.
#[no_mangle]
pub unsafe extern "C" fn salvium_daemon_add_node(
handle: *mut c_void,
url: *const c_char,
) -> i32 {
pub unsafe extern "C" fn salvium_daemon_add_node(handle: *mut c_void, url: *const c_char) -> i32 {
ffi_try(|| {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let url_str = unsafe { c_str_to_str(url) }?;
@@ -280,82 +274,80 @@ pub unsafe extern "C" fn salvium_daemon_get_blocks_by_height(
out_buf: *mut u8,
out_len: usize,
) -> usize {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| -> Result<usize, String> {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let json_slice =
unsafe { std::slice::from_raw_parts(heights_json, heights_json_len) };
let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?;
let heights: Vec<u64> =
serde_json::from_str(json_str).map_err(|e| format!("invalid heights JSON: {e}"))?;
let result =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| -> Result<usize, String> {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let json_slice = unsafe { std::slice::from_raw_parts(heights_json, heights_json_len) };
let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?;
let heights: Vec<u64> =
serde_json::from_str(json_str).map_err(|e| format!("invalid heights JSON: {e}"))?;
if heights.is_empty() {
let out = b"[]";
if out.len() > out_len {
return Err("output buffer too small".into());
if heights.is_empty() {
let out = b"[]";
if out.len() > out_len {
return Err("output buffer too small".into());
}
unsafe { std::ptr::copy_nonoverlapping(out.as_ptr(), out_buf, out.len()) };
return Ok(out.len());
}
unsafe { std::ptr::copy_nonoverlapping(out.as_ptr(), out_buf, out.len()) };
return Ok(out.len());
}
let rt = crate::runtime();
let rt = crate::runtime();
// Determine if heights form a contiguous range for distributed fetch.
let min_h = *heights.iter().min().unwrap();
let max_h = *heights.iter().max().unwrap();
let is_contiguous = (max_h - min_h + 1) as usize == heights.len();
// Determine if heights form a contiguous range for distributed fetch.
let min_h = *heights.iter().min().unwrap();
let max_h = *heights.iter().max().unwrap();
let is_contiguous = (max_h - min_h + 1) as usize == heights.len();
let (headers, bin_blocks) = if is_contiguous && heights.len() > 1 {
let result = rt
.block_on(dh.pool.fetch_batch_distributed(min_h, max_h))
.map_err(|e| e.to_string())?;
(result.headers, result.bin_blocks)
} else {
let (h, b) = rt.block_on(async {
let h = dh.pool.get_block_headers_range(min_h, max_h).await;
let b = dh.pool.get_blocks_by_height_bin(&heights).await;
(h, b)
});
(h.map_err(|e| e.to_string())?, b.map_err(|e| e.to_string())?)
};
// Build JSON output array.
let mut entries = Vec::with_capacity(heights.len());
for (i, height) in heights.iter().enumerate() {
let header = headers.iter().find(|h| h.height == *height);
let block_blob_hex = if i < bin_blocks.len() {
hex::encode(&bin_blocks[i].block)
let (headers, bin_blocks) = if is_contiguous && heights.len() > 1 {
let result = rt
.block_on(dh.pool.fetch_batch_distributed(min_h, max_h))
.map_err(|e| e.to_string())?;
(result.headers, result.bin_blocks)
} else {
String::new()
let (h, b) = rt.block_on(async {
let h = dh.pool.get_block_headers_range(min_h, max_h).await;
let b = dh.pool.get_blocks_by_height_bin(&heights).await;
(h, b)
});
(h.map_err(|e| e.to_string())?, b.map_err(|e| e.to_string())?)
};
let miner_tx_hash = header
.and_then(|h| h.miner_tx_hash.as_deref())
.unwrap_or("");
let tx_hashes: Vec<&str> = header
.and_then(|h| h.extra.get("tx_hashes"))
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
.unwrap_or_default();
entries.push(serde_json::json!({
"height": height,
"block_blob": block_blob_hex,
"miner_tx_hash": miner_tx_hash,
"tx_hashes": tx_hashes,
}));
}
// Build JSON output array.
let mut entries = Vec::with_capacity(heights.len());
for (i, height) in heights.iter().enumerate() {
let header = headers.iter().find(|h| h.height == *height);
let block_blob_hex = if i < bin_blocks.len() {
hex::encode(&bin_blocks[i].block)
} else {
String::new()
};
let miner_tx_hash = header.and_then(|h| h.miner_tx_hash.as_deref()).unwrap_or("");
let tx_hashes: Vec<&str> = header
.and_then(|h| h.extra.get("tx_hashes"))
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_str()).collect())
.unwrap_or_default();
let json_out = serde_json::to_string(&entries).map_err(|e| e.to_string())?;
let bytes = json_out.as_bytes();
if bytes.len() > out_len {
return Err(format!(
"output buffer too small: need {} bytes, have {}",
bytes.len(),
out_len
));
}
unsafe { std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_buf, bytes.len()) };
Ok(bytes.len())
}));
entries.push(serde_json::json!({
"height": height,
"block_blob": block_blob_hex,
"miner_tx_hash": miner_tx_hash,
"tx_hashes": tx_hashes,
}));
}
let json_out = serde_json::to_string(&entries).map_err(|e| e.to_string())?;
let bytes = json_out.as_bytes();
if bytes.len() > out_len {
return Err(format!(
"output buffer too small: need {} bytes, have {}",
bytes.len(),
out_len
));
}
unsafe { std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_buf, bytes.len()) };
Ok(bytes.len())
}));
match result {
Ok(Ok(n)) => n,
@@ -386,51 +378,51 @@ pub unsafe extern "C" fn salvium_daemon_get_transactions(
out_buf: *mut u8,
out_len: usize,
) -> usize {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| -> Result<usize, String> {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let json_slice =
unsafe { std::slice::from_raw_parts(hashes_json, hashes_json_len) };
let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?;
let hashes: Vec<String> =
serde_json::from_str(json_str).map_err(|e| format!("invalid hashes JSON: {e}"))?;
let result =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| -> Result<usize, String> {
let dh = unsafe { borrow_handle::<DaemonHandle>(handle) }?;
let json_slice = unsafe { std::slice::from_raw_parts(hashes_json, hashes_json_len) };
let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?;
let hashes: Vec<String> =
serde_json::from_str(json_str).map_err(|e| format!("invalid hashes JSON: {e}"))?;
if hashes.is_empty() {
let out = b"[]";
if out.len() > out_len {
return Err("output buffer too small".into());
if hashes.is_empty() {
let out = b"[]";
if out.len() > out_len {
return Err("output buffer too small".into());
}
unsafe { std::ptr::copy_nonoverlapping(out.as_ptr(), out_buf, out.len()) };
return Ok(out.len());
}
unsafe { std::ptr::copy_nonoverlapping(out.as_ptr(), out_buf, out.len()) };
return Ok(out.len());
}
let rt = crate::runtime();
let hash_refs: Vec<&str> = hashes.iter().map(|s| s.as_str()).collect();
let txs = rt
.block_on(dh.pool.get_transactions(&hash_refs, false))
.map_err(|e| e.to_string())?;
let rt = crate::runtime();
let hash_refs: Vec<&str> = hashes.iter().map(|s| s.as_str()).collect();
let txs = rt
.block_on(dh.pool.get_transactions(&hash_refs, false))
.map_err(|e| e.to_string())?;
let entries: Vec<serde_json::Value> = txs
.iter()
.map(|tx| {
serde_json::json!({
"tx_hash": tx.tx_hash,
"as_hex": tx.as_hex,
let entries: Vec<serde_json::Value> = txs
.iter()
.map(|tx| {
serde_json::json!({
"tx_hash": tx.tx_hash,
"as_hex": tx.as_hex,
})
})
})
.collect();
.collect();
let json_out = serde_json::to_string(&entries).map_err(|e| e.to_string())?;
let bytes = json_out.as_bytes();
if bytes.len() > out_len {
return Err(format!(
"output buffer too small: need {} bytes, have {}",
bytes.len(),
out_len
));
}
unsafe { std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_buf, bytes.len()) };
Ok(bytes.len())
}));
let json_out = serde_json::to_string(&entries).map_err(|e| e.to_string())?;
let bytes = json_out.as_bytes();
if bytes.len() > out_len {
return Err(format!(
"output buffer too small: need {} bytes, have {}",
bytes.len(),
out_len
));
}
unsafe { std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_buf, bytes.len()) };
Ok(bytes.len())
}));
match result {
Ok(Ok(n)) => n,
+35 -68
View File
@@ -154,8 +154,7 @@ impl NodePool {
let mut inner = self.inner.write().await;
let already = inner.nodes.iter().any(|n| n.url == url);
if !already {
let daemon =
make_daemon(url, &inner.config.username, &inner.config.password);
let daemon = make_daemon(url, &inner.config.username, &inner.config.password);
inner.nodes.push(NodeState {
daemon,
url: url.to_string(),
@@ -218,12 +217,7 @@ impl NodePool {
// Collect (index, daemon_clone) for all nodes.
let node_daemons: Vec<(usize, DaemonRpc)> = {
let inner = self.inner.read().await;
inner
.nodes
.iter()
.enumerate()
.map(|(i, n)| (i, n.daemon.clone()))
.collect()
inner.nodes.iter().enumerate().map(|(i, n)| (i, n.daemon.clone())).collect()
};
// Results: (node_index, latency | None).
@@ -355,16 +349,19 @@ impl NodePool {
) -> Result<DistributedBatchResult, RpcError> {
let total = (end_height - start_height + 1) as usize;
if total == 0 {
return Ok(DistributedBatchResult {
headers: Vec::new(),
bin_blocks: Vec::new(),
});
return Ok(DistributedBatchResult { headers: Vec::new(), bin_blocks: Vec::new() });
}
// Gather assignments under a read lock.
let assignments = {
let inner = self.inner.read().await;
compute_assignments(&inner.nodes, inner.active_index, inner.config.max_fetch_nodes, start_height, total)
compute_assignments(
&inner.nodes,
inner.active_index,
inner.config.max_fetch_nodes,
start_height,
total,
)
};
if assignments.len() <= 1 {
@@ -379,16 +376,15 @@ impl NodePool {
(Ok(_), Ok(_)) => self.report_success().await,
_ => self.report_failure().await,
}
return Ok(DistributedBatchResult {
headers: h?,
bin_blocks: b?,
});
return Ok(DistributedBatchResult { headers: h?, bin_blocks: b? });
}
// Multi-node path: spawn one task per sub-range.
log::info!(
"NodePool: distributed fetch [{}-{}] across {} nodes",
start_height, end_height, assignments.len()
start_height,
end_height,
assignments.len()
);
let mut set = tokio::task::JoinSet::new();
@@ -413,18 +409,16 @@ impl NodePool {
while let Some(join_result) = set.join_next().await {
match join_result {
Ok((node_idx, ss, se, h_res, b_res)) => {
match (h_res, b_res) {
(Ok(h), Ok(b)) => {
self.report_success_for(node_idx).await;
results.push((ss, h, b));
}
_ => {
self.report_failure_for(node_idx).await;
failed_ranges.push((ss, se));
}
Ok((node_idx, ss, se, h_res, b_res)) => match (h_res, b_res) {
(Ok(h), Ok(b)) => {
self.report_success_for(node_idx).await;
results.push((ss, h, b));
}
}
_ => {
self.report_failure_for(node_idx).await;
failed_ranges.push((ss, se));
}
},
Err(_) => {
// JoinError (panic) — we don't know which node, just collect
// the sub-range from assignments if we can figure it out.
@@ -597,10 +591,7 @@ impl NodePool {
result
}
pub async fn get_block_header_by_height(
&self,
height: u64,
) -> Result<BlockHeader, RpcError> {
pub async fn get_block_header_by_height(&self, height: u64) -> Result<BlockHeader, RpcError> {
let daemon = self.active().await;
let result = daemon.get_block_header_by_height(height).await;
match &result {
@@ -676,10 +667,7 @@ impl NodePool {
result
}
pub async fn get_fee_estimate(
&self,
grace_blocks: u64,
) -> Result<FeeEstimate, RpcError> {
pub async fn get_fee_estimate(&self, grace_blocks: u64) -> Result<FeeEstimate, RpcError> {
let daemon = self.active().await;
let result = daemon.get_fee_estimate(grace_blocks).await;
match &result {
@@ -719,9 +707,7 @@ impl NodePool {
result
}
pub async fn network_type(
&self,
) -> Result<salvium_types::constants::Network, RpcError> {
pub async fn network_type(&self) -> Result<salvium_types::constants::Network, RpcError> {
let daemon = self.active().await;
let result = daemon.network_type().await;
match &result {
@@ -777,10 +763,8 @@ fn compute_assignments(
}
// Speed weights: speed_i = 1.0 / latency_i (in seconds).
let speeds: Vec<f64> = candidates
.iter()
.map(|(_, lat)| 1.0 / lat.as_secs_f64().max(0.001))
.collect();
let speeds: Vec<f64> =
candidates.iter().map(|(_, lat)| 1.0 / lat.as_secs_f64().max(0.001)).collect();
let total_speed: f64 = speeds.iter().sum();
// Blocks per node (proportional), ensuring at least 1 each.
@@ -793,12 +777,8 @@ fn compute_assignments(
let sum: usize = block_counts.iter().sum();
if sum != total_blocks {
// Find the node with the largest allocation.
let max_idx = block_counts
.iter()
.enumerate()
.max_by_key(|(_, c)| **c)
.map(|(i, _)| i)
.unwrap_or(0);
let max_idx =
block_counts.iter().enumerate().max_by_key(|(_, c)| **c).map(|(i, _)| i).unwrap_or(0);
if sum > total_blocks {
let excess = sum - total_blocks;
block_counts[max_idx] = block_counts[max_idx].saturating_sub(excess).max(1);
@@ -861,10 +841,7 @@ mod tests {
#[tokio::test]
async fn pool_creates_seed_nodes() {
let pool = NodePool::new(PoolConfig {
network: Network::Testnet,
..Default::default()
});
let pool = NodePool::new(PoolConfig { network: Network::Testnet, ..Default::default() });
let inner = pool.inner.read().await;
assert_eq!(inner.nodes.len(), crate::seed_nodes::TESTNET.len());
assert!(inner.nodes.iter().all(|n| n.is_seed));
@@ -887,10 +864,7 @@ mod tests {
#[tokio::test]
async fn add_node_dedup() {
let pool = NodePool::new(PoolConfig {
network: Network::Testnet,
..Default::default()
});
let pool = NodePool::new(PoolConfig { network: Network::Testnet, ..Default::default() });
let initial_count = pool.inner.read().await.nodes.len();
pool.add_node("http://new-node:29081").await;
@@ -923,23 +897,16 @@ mod tests {
#[test]
fn compute_assignments_single_node_fallback() {
// Only 1 node with latency data → single-node path.
let nodes = vec![
fake_node(Some(100), true),
fake_node(None, true),
fake_node(None, true),
];
let nodes = vec![fake_node(Some(100), true), fake_node(None, true), fake_node(None, true)];
let a = compute_assignments(&nodes, 0, 4, 1, 100);
assert_eq!(a.len(), 1);
assert_eq!(a[0].1, 1); // sub_start
assert_eq!(a[0].1, 1); // sub_start
assert_eq!(a[0].2, 100); // sub_end
}
#[test]
fn compute_assignments_no_latency_data() {
let nodes = vec![
fake_node(None, true),
fake_node(None, true),
];
let nodes = vec![fake_node(None, true), fake_node(None, true)];
let a = compute_assignments(&nodes, 0, 4, 1, 50);
assert_eq!(a.len(), 1);
}
+15 -22
View File
@@ -309,22 +309,17 @@ impl SyncEngine {
let next_end = batch_end + next_size as u64;
let pool_clone = pool.clone();
prefetch = Some(tokio::spawn(async move {
let result = pool_clone
.fetch_batch_distributed(next_start, next_end)
.await;
PrefetchResult {
batch_start: next_start,
batch_end: next_end,
result,
}
let result = pool_clone.fetch_batch_distributed(next_start, next_end).await;
PrefetchResult { batch_start: next_start, batch_end: next_end, result }
}));
}
// ── 3. Parallel parse + sequential store ────────────────────
// Estimate batch bytes for throughput tracking.
let batch_bytes: usize = bin_blocks.iter().map(|e| {
e.block.len() + e.txs.iter().map(|t| t.len()).sum::<usize>()
}).sum();
let batch_bytes: usize = bin_blocks
.iter()
.map(|e| e.block.len() + e.txs.iter().map(|t| t.len()).sum::<usize>())
.sum();
// Phase 1: Parse + scan all blocks in parallel via spawn_blocking.
let scan_ctx_clone = scan_ctx.clone();
@@ -492,10 +487,7 @@ impl SyncEngine {
// Count outputs: coinbase/protocol + regular.
total_outputs_found += pr.outputs.len()
+ pr.regular_txs
.iter()
.map(|t| t.found_outputs.len())
.sum::<usize>();
+ pr.regular_txs.iter().map(|t| t.found_outputs.len()).sum::<usize>();
// Update sync height per block for crash safety.
if pr.height > 0 {
@@ -670,8 +662,6 @@ fn parse_and_scan_block(
block_height: height,
tx_type: scan_data.tx_type,
unlock_time: scan_data.unlock_time,
},
));
}
@@ -727,8 +717,7 @@ fn parse_and_scan_block(
// CARROT/CN scanning for protocol TX.
if let Some(ref ptx_hash) = protocol_tx_hash_opt {
if let Some(scan_data) =
parse_tx_for_scanning(protocol_tx_json, ptx_hash, height, true)
if let Some(scan_data) = parse_tx_for_scanning(protocol_tx_json, ptx_hash, height, true)
{
let found = scanner::scan_transaction(&scan_ctx, &scan_data);
for fo in &found {
@@ -742,8 +731,6 @@ fn parse_and_scan_block(
block_height: height,
tx_type: scan_data.tx_type,
unlock_time: scan_data.unlock_time,
},
));
}
@@ -878,7 +865,13 @@ fn store_found_output_row(
unlock_time: info.unlock_time,
};
store_found_outputs(db, scan_ctx, std::slice::from_ref(found), &scan_data, info.block_timestamp)?;
store_found_outputs(
db,
scan_ctx,
std::slice::from_ref(found),
&scan_data,
info.block_timestamp,
)?;
Ok(())
}