Eliminate per-tx DB queries in detect_spent_outputs with in-memory caches
Store phase was still the bottleneck (~700-1800ms/batch) despite the pipeline, because detect_spent_outputs issued ~51k individual DB queries per batch (key image + ring member lookups for every on-chain tx). Load all owned key images and global indices into HashSets at batch start, check in-memory first, and only hit the DB on the rare cache hit (~0.01%). Also adds PRAGMA synchronous=NORMAL and hoists get_stakes out of the per-block loop. Result: store times drop from 700-1800ms to 11-167ms, overall sync throughput improves from ~490 to ~2073 blocks/s (4x).
This commit is contained in:
@@ -27,7 +27,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
container: rust:alpine
|
||||
steps:
|
||||
- run: apk add --no-cache musl-dev perl make git
|
||||
- run: apk add --no-cache musl-dev perl make git g++
|
||||
- uses: actions/checkout@v4
|
||||
- run: rustup component add clippy
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
@@ -52,7 +52,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
container: rust:alpine
|
||||
steps:
|
||||
- run: apk add --no-cache musl-dev perl make git
|
||||
- run: apk add --no-cache musl-dev perl make git g++
|
||||
- uses: actions/checkout@v4
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: Build all crates
|
||||
@@ -67,7 +67,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
container: rust:alpine
|
||||
steps:
|
||||
- run: apk add --no-cache musl-dev perl make git
|
||||
- run: apk add --no-cache musl-dev perl make git g++
|
||||
- uses: actions/checkout@v4
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: salvium-types
|
||||
@@ -81,7 +81,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
container: rust:alpine
|
||||
steps:
|
||||
- run: apk add --no-cache musl-dev perl make git
|
||||
- run: apk add --no-cache musl-dev perl make git g++
|
||||
- uses: actions/checkout@v4
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: salvium-crypto (unit)
|
||||
@@ -95,7 +95,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
container: rust:alpine
|
||||
steps:
|
||||
- run: apk add --no-cache musl-dev perl make git
|
||||
- run: apk add --no-cache musl-dev perl make git g++
|
||||
- uses: actions/checkout@v4
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: salvium-wallet (unit + integration compile)
|
||||
@@ -111,7 +111,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
container: rust:alpine
|
||||
steps:
|
||||
- run: apk add --no-cache musl-dev perl make git
|
||||
- run: apk add --no-cache musl-dev perl make git g++
|
||||
- uses: actions/checkout@v4
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: salvium-miner
|
||||
@@ -127,7 +127,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
container: rust:alpine
|
||||
steps:
|
||||
- run: apk add --no-cache musl-dev perl make git
|
||||
- run: apk add --no-cache musl-dev perl make git g++
|
||||
- uses: actions/checkout@v4
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: Doc tests
|
||||
|
||||
@@ -789,6 +789,40 @@ impl WalletDb {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load all key images into a set for fast in-memory lookup.
|
||||
///
|
||||
/// Returns a `HashSet<String>` of all key images stored in the outputs table.
|
||||
/// Used by the sync engine to avoid per-transaction DB queries during
|
||||
/// spent-output detection (the vast majority of on-chain key images don't
|
||||
/// match any wallet output).
|
||||
pub fn get_all_key_images(&self) -> Result<std::collections::HashSet<String>, rusqlite::Error> {
|
||||
let mut stmt = self.conn.prepare("SELECT key_image FROM outputs WHERE key_image IS NOT NULL")?;
|
||||
let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
|
||||
let mut set = std::collections::HashSet::new();
|
||||
for row in rows {
|
||||
set.insert(row?);
|
||||
}
|
||||
Ok(set)
|
||||
}
|
||||
|
||||
/// Load all (asset_type, global_index) pairs into a set for fast ring member lookup.
|
||||
///
|
||||
/// Used by the sync engine's output tracker cache to check if any of our
|
||||
/// outputs appear as ring members, without per-index DB queries.
|
||||
pub fn get_all_global_indices(&self) -> Result<std::collections::HashSet<(String, i64)>, rusqlite::Error> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
"SELECT asset_type, global_index FROM outputs WHERE global_index IS NOT NULL"
|
||||
)?;
|
||||
let rows = stmt.query_map([], |r| {
|
||||
Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?))
|
||||
})?;
|
||||
let mut set = std::collections::HashSet::new();
|
||||
for row in rows {
|
||||
set.insert(row?);
|
||||
}
|
||||
Ok(set)
|
||||
}
|
||||
|
||||
/// Get outputs that need global_index resolution (global_index IS NULL).
|
||||
/// Returns (key_image, tx_hash, output_index) tuples.
|
||||
pub fn get_outputs_needing_global_index(
|
||||
|
||||
@@ -854,6 +854,14 @@ fn execute_store_batch(
|
||||
let mut empty_blobs = 0usize;
|
||||
let mut block_hashes = Vec::new();
|
||||
|
||||
// Load in-memory caches for detect_spent_outputs to avoid ~51k DB queries per batch.
|
||||
let mut ki_cache = db
|
||||
.get_all_key_images()
|
||||
.map_err(|e| WalletError::Storage(e.to_string()))?;
|
||||
let gi_cache = db
|
||||
.get_all_global_indices()
|
||||
.map_err(|e| WalletError::Storage(e.to_string()))?;
|
||||
|
||||
for pr in parse_results {
|
||||
if pr.empty_blob {
|
||||
empty_blobs += 1;
|
||||
@@ -868,6 +876,9 @@ fn execute_store_batch(
|
||||
// Store coinbase/protocol found outputs.
|
||||
for (found_output, scan_data_info) in &pr.outputs {
|
||||
store_found_output_row(db, scan_ctx, found_output, scan_data_info)?;
|
||||
if let Some(ki) = &found_output.key_image {
|
||||
ki_cache.insert(hex::encode(ki));
|
||||
}
|
||||
}
|
||||
|
||||
// Store coinbase/protocol transaction rows.
|
||||
@@ -879,10 +890,19 @@ fn execute_store_batch(
|
||||
for tx_data in &pr.regular_txs {
|
||||
for (found_output, info) in &tx_data.found_outputs {
|
||||
store_found_output_row(db, scan_ctx, found_output, info)?;
|
||||
if let Some(ki) = &found_output.key_image {
|
||||
ki_cache.insert(hex::encode(ki));
|
||||
}
|
||||
}
|
||||
|
||||
let spent_info =
|
||||
detect_spent_outputs(db, &tx_data.tx_json, &tx_data.tx_hash_hex, pr.height)?;
|
||||
let spent_info = detect_spent_outputs(
|
||||
db,
|
||||
&tx_data.tx_json,
|
||||
&tx_data.tx_hash_hex,
|
||||
pr.height,
|
||||
&ki_cache,
|
||||
&gi_cache,
|
||||
)?;
|
||||
|
||||
let found_outputs: Vec<&FoundOutput> =
|
||||
tx_data.found_outputs.iter().map(|(fo, _)| fo).collect();
|
||||
@@ -1754,6 +1774,8 @@ fn detect_spent_outputs(
|
||||
tx_json: &serde_json::Value,
|
||||
tx_hash_hex: &str,
|
||||
block_height: u64,
|
||||
ki_cache: &std::collections::HashSet<String>,
|
||||
gi_cache: &std::collections::HashSet<(String, i64)>,
|
||||
) -> Result<SpentInfo, WalletError> {
|
||||
let prefix = tx_json.get("prefix").unwrap_or(tx_json);
|
||||
let key_images = extract_all_key_images(prefix);
|
||||
@@ -1772,14 +1794,16 @@ fn detect_spent_outputs(
|
||||
let mut spent_info = SpentInfo::default();
|
||||
|
||||
// ── Pass 1: Key image matching (primary mechanism) ──────────────────
|
||||
// Check the in-memory cache first to avoid DB queries for the ~99.9%
|
||||
// of on-chain key images that don't belong to this wallet.
|
||||
let mut matched_key_images = std::collections::HashSet::new();
|
||||
for ki_hex in &key_images {
|
||||
// Check if this key image belongs to one of our outputs.
|
||||
if !ki_cache.contains(ki_hex.as_str()) {
|
||||
continue;
|
||||
}
|
||||
// Cache hit — query DB for the full output row.
|
||||
let output = db.get_output(ki_hex).map_err(|e| WalletError::Storage(e.to_string()))?;
|
||||
if let Some(row) = output {
|
||||
// Count all outputs that belong to us, even if already marked spent
|
||||
// (e.g. by mark_inputs_spent after TX submission). This ensures
|
||||
// stake recording still triggers when the block is later synced.
|
||||
spent_info.count += 1;
|
||||
spent_info.total_amount += row.amount.parse::<u64>().unwrap_or(0);
|
||||
if spent_info.asset_type.is_none() {
|
||||
@@ -1802,30 +1826,26 @@ fn detect_spent_outputs(
|
||||
// on ring membership alone would cause false positives (~15/16 of the
|
||||
// time for a standard ring size of 16).
|
||||
//
|
||||
// For view-only wallets, CN outputs will always show a balance >=
|
||||
// the real balance. This matches C++ behavior: the tracker just sets
|
||||
// `recognized_owned_possibly_spent_enote = true` and caches the TX
|
||||
// for potential re-processing when the spend key becomes available.
|
||||
let inputs = extract_inputs_with_offsets(prefix);
|
||||
for input in &inputs {
|
||||
if matched_key_images.contains(&input.key_image) {
|
||||
continue; // Already matched by key image in Pass 1
|
||||
}
|
||||
// Uses in-memory global index cache to avoid DB queries. Only logs
|
||||
// at debug level when a match is found.
|
||||
if !gi_cache.is_empty() {
|
||||
let inputs = extract_inputs_with_offsets(prefix);
|
||||
for input in &inputs {
|
||||
if matched_key_images.contains(&input.key_image) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let asset_type = input.asset_type.as_deref().unwrap_or("SAL");
|
||||
let asset_type = input.asset_type.as_deref().unwrap_or("SAL");
|
||||
|
||||
for &global_idx in &input.ring_member_indices {
|
||||
let output = db
|
||||
.get_output_by_global_index(asset_type, global_idx as i64)
|
||||
.map_err(|e| WalletError::Storage(e.to_string()))?;
|
||||
if let Some(row) = output {
|
||||
log::debug!(
|
||||
"output tracker: our output global_idx={} asset={} seen as ring member (ki={}, tx={})",
|
||||
global_idx, asset_type,
|
||||
row.key_image.as_deref().unwrap_or("none"),
|
||||
&tx_hash_hex[..16]
|
||||
);
|
||||
break;
|
||||
for &global_idx in &input.ring_member_indices {
|
||||
if gi_cache.contains(&(asset_type.to_string(), global_idx as i64)) {
|
||||
log::debug!(
|
||||
"output tracker: our output global_idx={} asset={} seen as ring member (tx={})",
|
||||
global_idx, asset_type,
|
||||
&tx_hash_hex[..16]
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user