From 098f8a3ab92c2dc5b73785fa430b345f394b01bd Mon Sep 17 00:00:00 2001 From: Matt Hess Date: Fri, 27 Feb 2026 19:18:06 +0000 Subject: [PATCH] format fixes --- crates/salvium-ffi/src/daemon.rs | 220 ++++++++++++++---------------- crates/salvium-rpc/src/pool.rs | 103 +++++--------- crates/salvium-wallet/src/sync.rs | 37 ++--- 3 files changed, 156 insertions(+), 204 deletions(-) diff --git a/crates/salvium-ffi/src/daemon.rs b/crates/salvium-ffi/src/daemon.rs index 85d4209..ad378db 100644 --- a/crates/salvium-ffi/src/daemon.rs +++ b/crates/salvium-ffi/src/daemon.rs @@ -75,10 +75,7 @@ pub unsafe extern "C" fn salvium_daemon_pool_create(network: i32) -> *mut c_void )) } }; - let pool = NodePool::new(PoolConfig { - network: net, - ..Default::default() - }); + let pool = NodePool::new(PoolConfig { network: net, ..Default::default() }); Ok(DaemonHandle::new(pool)) }) } @@ -87,10 +84,7 @@ pub unsafe extern "C" fn salvium_daemon_pool_create(network: i32) -> *mut c_void /// /// Returns 0 on success, -1 on error. #[no_mangle] -pub unsafe extern "C" fn salvium_daemon_add_node( - handle: *mut c_void, - url: *const c_char, -) -> i32 { +pub unsafe extern "C" fn salvium_daemon_add_node(handle: *mut c_void, url: *const c_char) -> i32 { ffi_try(|| { let dh = unsafe { borrow_handle::(handle) }?; let url_str = unsafe { c_str_to_str(url) }?; @@ -280,82 +274,80 @@ pub unsafe extern "C" fn salvium_daemon_get_blocks_by_height( out_buf: *mut u8, out_len: usize, ) -> usize { - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| -> Result { - let dh = unsafe { borrow_handle::(handle) }?; - let json_slice = - unsafe { std::slice::from_raw_parts(heights_json, heights_json_len) }; - let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?; - let heights: Vec = - serde_json::from_str(json_str).map_err(|e| format!("invalid heights JSON: {e}"))?; + let result = + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| -> Result { + let dh = unsafe { borrow_handle::(handle) }?; + let json_slice = unsafe { std::slice::from_raw_parts(heights_json, heights_json_len) }; + let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?; + let heights: Vec = + serde_json::from_str(json_str).map_err(|e| format!("invalid heights JSON: {e}"))?; - if heights.is_empty() { - let out = b"[]"; - if out.len() > out_len { - return Err("output buffer too small".into()); + if heights.is_empty() { + let out = b"[]"; + if out.len() > out_len { + return Err("output buffer too small".into()); + } + unsafe { std::ptr::copy_nonoverlapping(out.as_ptr(), out_buf, out.len()) }; + return Ok(out.len()); } - unsafe { std::ptr::copy_nonoverlapping(out.as_ptr(), out_buf, out.len()) }; - return Ok(out.len()); - } - let rt = crate::runtime(); + let rt = crate::runtime(); - // Determine if heights form a contiguous range for distributed fetch. - let min_h = *heights.iter().min().unwrap(); - let max_h = *heights.iter().max().unwrap(); - let is_contiguous = (max_h - min_h + 1) as usize == heights.len(); + // Determine if heights form a contiguous range for distributed fetch. + let min_h = *heights.iter().min().unwrap(); + let max_h = *heights.iter().max().unwrap(); + let is_contiguous = (max_h - min_h + 1) as usize == heights.len(); - let (headers, bin_blocks) = if is_contiguous && heights.len() > 1 { - let result = rt - .block_on(dh.pool.fetch_batch_distributed(min_h, max_h)) - .map_err(|e| e.to_string())?; - (result.headers, result.bin_blocks) - } else { - let (h, b) = rt.block_on(async { - let h = dh.pool.get_block_headers_range(min_h, max_h).await; - let b = dh.pool.get_blocks_by_height_bin(&heights).await; - (h, b) - }); - (h.map_err(|e| e.to_string())?, b.map_err(|e| e.to_string())?) - }; - - // Build JSON output array. - let mut entries = Vec::with_capacity(heights.len()); - for (i, height) in heights.iter().enumerate() { - let header = headers.iter().find(|h| h.height == *height); - let block_blob_hex = if i < bin_blocks.len() { - hex::encode(&bin_blocks[i].block) + let (headers, bin_blocks) = if is_contiguous && heights.len() > 1 { + let result = rt + .block_on(dh.pool.fetch_batch_distributed(min_h, max_h)) + .map_err(|e| e.to_string())?; + (result.headers, result.bin_blocks) } else { - String::new() + let (h, b) = rt.block_on(async { + let h = dh.pool.get_block_headers_range(min_h, max_h).await; + let b = dh.pool.get_blocks_by_height_bin(&heights).await; + (h, b) + }); + (h.map_err(|e| e.to_string())?, b.map_err(|e| e.to_string())?) }; - let miner_tx_hash = header - .and_then(|h| h.miner_tx_hash.as_deref()) - .unwrap_or(""); - let tx_hashes: Vec<&str> = header - .and_then(|h| h.extra.get("tx_hashes")) - .and_then(|v| v.as_array()) - .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect()) - .unwrap_or_default(); - entries.push(serde_json::json!({ - "height": height, - "block_blob": block_blob_hex, - "miner_tx_hash": miner_tx_hash, - "tx_hashes": tx_hashes, - })); - } + // Build JSON output array. + let mut entries = Vec::with_capacity(heights.len()); + for (i, height) in heights.iter().enumerate() { + let header = headers.iter().find(|h| h.height == *height); + let block_blob_hex = if i < bin_blocks.len() { + hex::encode(&bin_blocks[i].block) + } else { + String::new() + }; + let miner_tx_hash = header.and_then(|h| h.miner_tx_hash.as_deref()).unwrap_or(""); + let tx_hashes: Vec<&str> = header + .and_then(|h| h.extra.get("tx_hashes")) + .and_then(|v| v.as_array()) + .map(|arr| arr.iter().filter_map(|v| v.as_str()).collect()) + .unwrap_or_default(); - let json_out = serde_json::to_string(&entries).map_err(|e| e.to_string())?; - let bytes = json_out.as_bytes(); - if bytes.len() > out_len { - return Err(format!( - "output buffer too small: need {} bytes, have {}", - bytes.len(), - out_len - )); - } - unsafe { std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_buf, bytes.len()) }; - Ok(bytes.len()) - })); + entries.push(serde_json::json!({ + "height": height, + "block_blob": block_blob_hex, + "miner_tx_hash": miner_tx_hash, + "tx_hashes": tx_hashes, + })); + } + + let json_out = serde_json::to_string(&entries).map_err(|e| e.to_string())?; + let bytes = json_out.as_bytes(); + if bytes.len() > out_len { + return Err(format!( + "output buffer too small: need {} bytes, have {}", + bytes.len(), + out_len + )); + } + unsafe { std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_buf, bytes.len()) }; + Ok(bytes.len()) + })); match result { Ok(Ok(n)) => n, @@ -386,51 +378,51 @@ pub unsafe extern "C" fn salvium_daemon_get_transactions( out_buf: *mut u8, out_len: usize, ) -> usize { - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| -> Result { - let dh = unsafe { borrow_handle::(handle) }?; - let json_slice = - unsafe { std::slice::from_raw_parts(hashes_json, hashes_json_len) }; - let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?; - let hashes: Vec = - serde_json::from_str(json_str).map_err(|e| format!("invalid hashes JSON: {e}"))?; + let result = + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| -> Result { + let dh = unsafe { borrow_handle::(handle) }?; + let json_slice = unsafe { std::slice::from_raw_parts(hashes_json, hashes_json_len) }; + let json_str = std::str::from_utf8(json_slice).map_err(|e| e.to_string())?; + let hashes: Vec = + serde_json::from_str(json_str).map_err(|e| format!("invalid hashes JSON: {e}"))?; - if hashes.is_empty() { - let out = b"[]"; - if out.len() > out_len { - return Err("output buffer too small".into()); + if hashes.is_empty() { + let out = b"[]"; + if out.len() > out_len { + return Err("output buffer too small".into()); + } + unsafe { std::ptr::copy_nonoverlapping(out.as_ptr(), out_buf, out.len()) }; + return Ok(out.len()); } - unsafe { std::ptr::copy_nonoverlapping(out.as_ptr(), out_buf, out.len()) }; - return Ok(out.len()); - } - let rt = crate::runtime(); - let hash_refs: Vec<&str> = hashes.iter().map(|s| s.as_str()).collect(); - let txs = rt - .block_on(dh.pool.get_transactions(&hash_refs, false)) - .map_err(|e| e.to_string())?; + let rt = crate::runtime(); + let hash_refs: Vec<&str> = hashes.iter().map(|s| s.as_str()).collect(); + let txs = rt + .block_on(dh.pool.get_transactions(&hash_refs, false)) + .map_err(|e| e.to_string())?; - let entries: Vec = txs - .iter() - .map(|tx| { - serde_json::json!({ - "tx_hash": tx.tx_hash, - "as_hex": tx.as_hex, + let entries: Vec = txs + .iter() + .map(|tx| { + serde_json::json!({ + "tx_hash": tx.tx_hash, + "as_hex": tx.as_hex, + }) }) - }) - .collect(); + .collect(); - let json_out = serde_json::to_string(&entries).map_err(|e| e.to_string())?; - let bytes = json_out.as_bytes(); - if bytes.len() > out_len { - return Err(format!( - "output buffer too small: need {} bytes, have {}", - bytes.len(), - out_len - )); - } - unsafe { std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_buf, bytes.len()) }; - Ok(bytes.len()) - })); + let json_out = serde_json::to_string(&entries).map_err(|e| e.to_string())?; + let bytes = json_out.as_bytes(); + if bytes.len() > out_len { + return Err(format!( + "output buffer too small: need {} bytes, have {}", + bytes.len(), + out_len + )); + } + unsafe { std::ptr::copy_nonoverlapping(bytes.as_ptr(), out_buf, bytes.len()) }; + Ok(bytes.len()) + })); match result { Ok(Ok(n)) => n, diff --git a/crates/salvium-rpc/src/pool.rs b/crates/salvium-rpc/src/pool.rs index 9f2b9fa..2d14340 100644 --- a/crates/salvium-rpc/src/pool.rs +++ b/crates/salvium-rpc/src/pool.rs @@ -154,8 +154,7 @@ impl NodePool { let mut inner = self.inner.write().await; let already = inner.nodes.iter().any(|n| n.url == url); if !already { - let daemon = - make_daemon(url, &inner.config.username, &inner.config.password); + let daemon = make_daemon(url, &inner.config.username, &inner.config.password); inner.nodes.push(NodeState { daemon, url: url.to_string(), @@ -218,12 +217,7 @@ impl NodePool { // Collect (index, daemon_clone) for all nodes. let node_daemons: Vec<(usize, DaemonRpc)> = { let inner = self.inner.read().await; - inner - .nodes - .iter() - .enumerate() - .map(|(i, n)| (i, n.daemon.clone())) - .collect() + inner.nodes.iter().enumerate().map(|(i, n)| (i, n.daemon.clone())).collect() }; // Results: (node_index, latency | None). @@ -355,16 +349,19 @@ impl NodePool { ) -> Result { let total = (end_height - start_height + 1) as usize; if total == 0 { - return Ok(DistributedBatchResult { - headers: Vec::new(), - bin_blocks: Vec::new(), - }); + return Ok(DistributedBatchResult { headers: Vec::new(), bin_blocks: Vec::new() }); } // Gather assignments under a read lock. let assignments = { let inner = self.inner.read().await; - compute_assignments(&inner.nodes, inner.active_index, inner.config.max_fetch_nodes, start_height, total) + compute_assignments( + &inner.nodes, + inner.active_index, + inner.config.max_fetch_nodes, + start_height, + total, + ) }; if assignments.len() <= 1 { @@ -379,16 +376,15 @@ impl NodePool { (Ok(_), Ok(_)) => self.report_success().await, _ => self.report_failure().await, } - return Ok(DistributedBatchResult { - headers: h?, - bin_blocks: b?, - }); + return Ok(DistributedBatchResult { headers: h?, bin_blocks: b? }); } // Multi-node path: spawn one task per sub-range. log::info!( "NodePool: distributed fetch [{}-{}] across {} nodes", - start_height, end_height, assignments.len() + start_height, + end_height, + assignments.len() ); let mut set = tokio::task::JoinSet::new(); @@ -413,18 +409,16 @@ impl NodePool { while let Some(join_result) = set.join_next().await { match join_result { - Ok((node_idx, ss, se, h_res, b_res)) => { - match (h_res, b_res) { - (Ok(h), Ok(b)) => { - self.report_success_for(node_idx).await; - results.push((ss, h, b)); - } - _ => { - self.report_failure_for(node_idx).await; - failed_ranges.push((ss, se)); - } + Ok((node_idx, ss, se, h_res, b_res)) => match (h_res, b_res) { + (Ok(h), Ok(b)) => { + self.report_success_for(node_idx).await; + results.push((ss, h, b)); } - } + _ => { + self.report_failure_for(node_idx).await; + failed_ranges.push((ss, se)); + } + }, Err(_) => { // JoinError (panic) — we don't know which node, just collect // the sub-range from assignments if we can figure it out. @@ -597,10 +591,7 @@ impl NodePool { result } - pub async fn get_block_header_by_height( - &self, - height: u64, - ) -> Result { + pub async fn get_block_header_by_height(&self, height: u64) -> Result { let daemon = self.active().await; let result = daemon.get_block_header_by_height(height).await; match &result { @@ -676,10 +667,7 @@ impl NodePool { result } - pub async fn get_fee_estimate( - &self, - grace_blocks: u64, - ) -> Result { + pub async fn get_fee_estimate(&self, grace_blocks: u64) -> Result { let daemon = self.active().await; let result = daemon.get_fee_estimate(grace_blocks).await; match &result { @@ -719,9 +707,7 @@ impl NodePool { result } - pub async fn network_type( - &self, - ) -> Result { + pub async fn network_type(&self) -> Result { let daemon = self.active().await; let result = daemon.network_type().await; match &result { @@ -777,10 +763,8 @@ fn compute_assignments( } // Speed weights: speed_i = 1.0 / latency_i (in seconds). - let speeds: Vec = candidates - .iter() - .map(|(_, lat)| 1.0 / lat.as_secs_f64().max(0.001)) - .collect(); + let speeds: Vec = + candidates.iter().map(|(_, lat)| 1.0 / lat.as_secs_f64().max(0.001)).collect(); let total_speed: f64 = speeds.iter().sum(); // Blocks per node (proportional), ensuring at least 1 each. @@ -793,12 +777,8 @@ fn compute_assignments( let sum: usize = block_counts.iter().sum(); if sum != total_blocks { // Find the node with the largest allocation. - let max_idx = block_counts - .iter() - .enumerate() - .max_by_key(|(_, c)| **c) - .map(|(i, _)| i) - .unwrap_or(0); + let max_idx = + block_counts.iter().enumerate().max_by_key(|(_, c)| **c).map(|(i, _)| i).unwrap_or(0); if sum > total_blocks { let excess = sum - total_blocks; block_counts[max_idx] = block_counts[max_idx].saturating_sub(excess).max(1); @@ -861,10 +841,7 @@ mod tests { #[tokio::test] async fn pool_creates_seed_nodes() { - let pool = NodePool::new(PoolConfig { - network: Network::Testnet, - ..Default::default() - }); + let pool = NodePool::new(PoolConfig { network: Network::Testnet, ..Default::default() }); let inner = pool.inner.read().await; assert_eq!(inner.nodes.len(), crate::seed_nodes::TESTNET.len()); assert!(inner.nodes.iter().all(|n| n.is_seed)); @@ -887,10 +864,7 @@ mod tests { #[tokio::test] async fn add_node_dedup() { - let pool = NodePool::new(PoolConfig { - network: Network::Testnet, - ..Default::default() - }); + let pool = NodePool::new(PoolConfig { network: Network::Testnet, ..Default::default() }); let initial_count = pool.inner.read().await.nodes.len(); pool.add_node("http://new-node:29081").await; @@ -923,23 +897,16 @@ mod tests { #[test] fn compute_assignments_single_node_fallback() { // Only 1 node with latency data → single-node path. - let nodes = vec![ - fake_node(Some(100), true), - fake_node(None, true), - fake_node(None, true), - ]; + let nodes = vec![fake_node(Some(100), true), fake_node(None, true), fake_node(None, true)]; let a = compute_assignments(&nodes, 0, 4, 1, 100); assert_eq!(a.len(), 1); - assert_eq!(a[0].1, 1); // sub_start + assert_eq!(a[0].1, 1); // sub_start assert_eq!(a[0].2, 100); // sub_end } #[test] fn compute_assignments_no_latency_data() { - let nodes = vec![ - fake_node(None, true), - fake_node(None, true), - ]; + let nodes = vec![fake_node(None, true), fake_node(None, true)]; let a = compute_assignments(&nodes, 0, 4, 1, 50); assert_eq!(a.len(), 1); } diff --git a/crates/salvium-wallet/src/sync.rs b/crates/salvium-wallet/src/sync.rs index 5facd52..d5e76b6 100644 --- a/crates/salvium-wallet/src/sync.rs +++ b/crates/salvium-wallet/src/sync.rs @@ -309,22 +309,17 @@ impl SyncEngine { let next_end = batch_end + next_size as u64; let pool_clone = pool.clone(); prefetch = Some(tokio::spawn(async move { - let result = pool_clone - .fetch_batch_distributed(next_start, next_end) - .await; - PrefetchResult { - batch_start: next_start, - batch_end: next_end, - result, - } + let result = pool_clone.fetch_batch_distributed(next_start, next_end).await; + PrefetchResult { batch_start: next_start, batch_end: next_end, result } })); } // ── 3. Parallel parse + sequential store ──────────────────── // Estimate batch bytes for throughput tracking. - let batch_bytes: usize = bin_blocks.iter().map(|e| { - e.block.len() + e.txs.iter().map(|t| t.len()).sum::() - }).sum(); + let batch_bytes: usize = bin_blocks + .iter() + .map(|e| e.block.len() + e.txs.iter().map(|t| t.len()).sum::()) + .sum(); // Phase 1: Parse + scan all blocks in parallel via spawn_blocking. let scan_ctx_clone = scan_ctx.clone(); @@ -492,10 +487,7 @@ impl SyncEngine { // Count outputs: coinbase/protocol + regular. total_outputs_found += pr.outputs.len() - + pr.regular_txs - .iter() - .map(|t| t.found_outputs.len()) - .sum::(); + + pr.regular_txs.iter().map(|t| t.found_outputs.len()).sum::(); // Update sync height per block for crash safety. if pr.height > 0 { @@ -670,8 +662,6 @@ fn parse_and_scan_block( block_height: height, tx_type: scan_data.tx_type, unlock_time: scan_data.unlock_time, - - }, )); } @@ -727,8 +717,7 @@ fn parse_and_scan_block( // CARROT/CN scanning for protocol TX. if let Some(ref ptx_hash) = protocol_tx_hash_opt { - if let Some(scan_data) = - parse_tx_for_scanning(protocol_tx_json, ptx_hash, height, true) + if let Some(scan_data) = parse_tx_for_scanning(protocol_tx_json, ptx_hash, height, true) { let found = scanner::scan_transaction(&scan_ctx, &scan_data); for fo in &found { @@ -742,8 +731,6 @@ fn parse_and_scan_block( block_height: height, tx_type: scan_data.tx_type, unlock_time: scan_data.unlock_time, - - }, )); } @@ -878,7 +865,13 @@ fn store_found_output_row( unlock_time: info.unlock_time, }; - store_found_outputs(db, scan_ctx, std::slice::from_ref(found), &scan_data, info.block_timestamp)?; + store_found_outputs( + db, + scan_ctx, + std::slice::from_ref(found), + &scan_data, + info.block_timestamp, + )?; Ok(()) }