Optimize wallet sync: binary bulk fetch, parallel fallback, adaptive batching
Replace serial per-block RPC calls with binary bulk fetch via getBlocksByHeight (2 RPCs per batch regardless of size). Falls back to parallel JSON fetch with bulk tx retrieval when binary endpoint unavailable. Adaptive batch sizing tracks ms/block trend across batches — doubles when fast, halves when slow, nudges up when stable. Starts at 10 blocks, ramps to 500.
This commit is contained in:
+1
-1
@@ -439,7 +439,7 @@ import {
|
||||
bytesToScalar,
|
||||
scalarToBytes,
|
||||
bytesToPoint,
|
||||
hashToPoint as bpHashToPoint,
|
||||
hashToPointMonero as bpHashToPoint,
|
||||
hashToScalar as bpHashToScalar,
|
||||
initGenerators,
|
||||
initTranscript,
|
||||
|
||||
+406
-82
@@ -13,7 +13,7 @@
|
||||
import { WalletOutput, WalletTransaction } from './wallet-store.js';
|
||||
import { cnSubaddressSecretKey, carrotIndexExtensionGenerator, carrotSubaddressScalar } from './subaddress.js';
|
||||
import { scanCarrotOutput, makeInputContext, makeInputContextCoinbase, generateCarrotKeyImage } from './carrot-scanning.js';
|
||||
import { parseTransaction, parseBlock, extractTxPubKey, extractPaymentId, extractAdditionalPubKeys } from './transaction.js';
|
||||
import { parseTransaction, parseBlock, extractTxPubKey, extractPaymentId, extractAdditionalPubKeys, serializeTxPrefix } from './transaction.js';
|
||||
import { bytesToHex, hexToBytes } from './address.js';
|
||||
import { TX_TYPE } from './wallet.js';
|
||||
import {
|
||||
@@ -31,27 +31,22 @@ import {
|
||||
/**
|
||||
* Default number of blocks to fetch per batch
|
||||
*/
|
||||
export const DEFAULT_BATCH_SIZE = 100;
|
||||
export const DEFAULT_BATCH_SIZE = 10;
|
||||
|
||||
/**
|
||||
* Minimum batch size (floor)
|
||||
*/
|
||||
export const MIN_BATCH_SIZE = 5;
|
||||
export const MIN_BATCH_SIZE = 2;
|
||||
|
||||
/**
|
||||
* Maximum batch size (ceiling) - prevent memory/timeout issues
|
||||
*/
|
||||
export const MAX_BATCH_SIZE = 400;
|
||||
export const MAX_BATCH_SIZE = 500;
|
||||
|
||||
/**
|
||||
* Target time per batch cycle in milliseconds (~2 seconds)
|
||||
* Maximum concurrent RPC calls for parallel block fetching
|
||||
*/
|
||||
export const TARGET_BATCH_TIME = 2000;
|
||||
|
||||
/**
|
||||
* Batch adjustment factor (20% per cycle)
|
||||
*/
|
||||
export const BATCH_ADJUST_FACTOR = 0.2;
|
||||
export const FETCH_CONCURRENCY = 20;
|
||||
|
||||
/**
|
||||
* Default confirmations required for unlock
|
||||
@@ -145,6 +140,10 @@ export class WalletSync {
|
||||
this.startHeight = 0;
|
||||
this.error = null;
|
||||
|
||||
// Adaptive batch sizing state
|
||||
this._lastMsPerBlock = 0; // ms per block from previous batch
|
||||
this._lastBatchBlocks = 0; // blocks processed in previous batch
|
||||
|
||||
// Control
|
||||
this._stopRequested = false;
|
||||
this._listeners = [];
|
||||
@@ -379,7 +378,13 @@ export class WalletSync {
|
||||
// ===========================================================================
|
||||
|
||||
/**
|
||||
* Sync a batch of blocks
|
||||
* Sync a batch of blocks.
|
||||
*
|
||||
* Uses binary bulk fetch (2 RPCs total per batch):
|
||||
* 1. getBlockHeadersRange — headers in bulk
|
||||
* 2. getBlocksByHeight — all block blobs + embedded tx blobs
|
||||
*
|
||||
* Falls back to parallel JSON fetch if binary endpoint unavailable.
|
||||
* @private
|
||||
*/
|
||||
async _syncBatch() {
|
||||
@@ -390,7 +395,7 @@ export class WalletSync {
|
||||
this.targetHeight
|
||||
);
|
||||
|
||||
// Fetch block headers for the range
|
||||
// ── Phase 1: Fetch block headers in bulk (1 RPC) ──────────────────────
|
||||
const headersResponse = await this.daemon.getBlockHeadersRange(
|
||||
this.currentHeight,
|
||||
endHeight - 1
|
||||
@@ -401,86 +406,333 @@ export class WalletSync {
|
||||
}
|
||||
|
||||
const headers = headersResponse.result.headers || [];
|
||||
if (headers.length === 0) return;
|
||||
|
||||
for (let idx = 0; idx < headers.length; idx++) {
|
||||
const header = headers[idx];
|
||||
if (this._stopRequested) break;
|
||||
await this._processBlock(header);
|
||||
await this.storage.putBlockHash(header.height, header.hash);
|
||||
this.currentHeight = header.height + 1;
|
||||
this._emit('syncProgress', this.getProgress());
|
||||
// ── Phase 2: Try binary bulk fetch (1 RPC for all blocks) ─────────────
|
||||
const heights = headers.map(h => h.height);
|
||||
let usedBinaryPath = false;
|
||||
|
||||
// Yield to event loop every 20 blocks to allow GC and prevent OOM
|
||||
if (idx % 20 === 19) {
|
||||
await new Promise(r => setTimeout(r, 0));
|
||||
if (this.daemon.getBlocksByHeight) {
|
||||
try {
|
||||
const binResp = await this.daemon.getBlocksByHeight(heights);
|
||||
if (binResp.success && binResp.result.blocks?.length === headers.length) {
|
||||
await this._processBinaryBatch(headers, binResp.result.blocks);
|
||||
usedBinaryPath = true;
|
||||
}
|
||||
} catch (e) {
|
||||
// Binary endpoint failed — fall through to JSON path
|
||||
}
|
||||
}
|
||||
|
||||
// ── Phase 2b: Fallback — parallel JSON fetch ──────────────────────────
|
||||
if (!usedBinaryPath) {
|
||||
await this._syncBatchJsonFallback(headers);
|
||||
}
|
||||
|
||||
// Save sync height
|
||||
await this.storage.setSyncHeight(this.currentHeight);
|
||||
|
||||
// Adaptive batch sizing: adjust based on elapsed time
|
||||
this._adjustBatchSize(batchStartTime);
|
||||
// Adaptive batch sizing based on throughput trend
|
||||
this._adjustBatchSize(batchStartTime, headers.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adjust batch size based on elapsed time for last batch
|
||||
* Process a batch of blocks from binary bulk fetch.
|
||||
* Block blobs contain miner_tx, protocol_tx, and tx_hashes inline.
|
||||
* Regular tx blobs are included in the response.
|
||||
*
|
||||
* Total RPCs for this path: 0 (all data already fetched).
|
||||
* @private
|
||||
* @param {number} batchStartTime - Start time of the batch (from Date.now())
|
||||
*/
|
||||
_adjustBatchSize(batchStartTime) {
|
||||
async _processBinaryBatch(headers, binaryBlocks) {
|
||||
for (let idx = 0; idx < headers.length; idx++) {
|
||||
if (this._stopRequested) break;
|
||||
const header = headers[idx];
|
||||
const binBlock = binaryBlocks[idx];
|
||||
|
||||
try {
|
||||
// Parse the block blob → { minerTx, protocolTx, txHashes, header: {...} }
|
||||
const blockBlob = binBlock.block instanceof Uint8Array
|
||||
? binBlock.block
|
||||
: new Uint8Array(binBlock.block);
|
||||
const parsed = parseBlock(blockBlob);
|
||||
|
||||
// Compute miner_tx and protocol_tx hashes from the raw blob
|
||||
const minerTxHash = this._computeTxHashFromBlob(blockBlob, parsed.minerTx);
|
||||
const protocolTxHash = this._computeTxHashFromBlob(blockBlob, parsed.protocolTx);
|
||||
|
||||
// Process miner_tx (coinbase)
|
||||
if (parsed.minerTx && minerTxHash) {
|
||||
await this._processParsedTransaction(
|
||||
parsed.minerTx, minerTxHash, header,
|
||||
{ isMinerTx: true, isProtocolTx: false }
|
||||
);
|
||||
}
|
||||
|
||||
// Process protocol_tx
|
||||
if (parsed.protocolTx && protocolTxHash) {
|
||||
await this._processParsedTransaction(
|
||||
parsed.protocolTx, protocolTxHash, header,
|
||||
{ isMinerTx: false, isProtocolTx: true }
|
||||
);
|
||||
}
|
||||
|
||||
// Process regular transactions (blobs included in binary response)
|
||||
const txBlobs = binBlock.txs || [];
|
||||
for (let ti = 0; ti < txBlobs.length; ti++) {
|
||||
const txBlobBytes = txBlobs[ti] instanceof Uint8Array
|
||||
? txBlobs[ti]
|
||||
: new Uint8Array(txBlobs[ti]);
|
||||
const tx = parseTransaction(txBlobBytes);
|
||||
// tx_hashes from the block tell us the hash for each regular tx
|
||||
const txHashBytes = parsed.txHashes[ti];
|
||||
const txHash = txHashBytes ? bytesToHex(txHashBytes) : null;
|
||||
if (tx && txHash) {
|
||||
await this._processParsedTransaction(
|
||||
tx, txHash, header,
|
||||
{ isMinerTx: false, isProtocolTx: false }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Emit new block event
|
||||
this._emit('newBlock', {
|
||||
height: header.height,
|
||||
hash: header.hash,
|
||||
timestamp: header.timestamp,
|
||||
txCount: txBlobs.length,
|
||||
hasMinerTx: !!parsed.minerTx,
|
||||
hasProtocolTx: !!parsed.protocolTx
|
||||
});
|
||||
|
||||
} catch (e) {
|
||||
// If binary parsing fails for a block, log and continue
|
||||
console.error(`Binary parse failed at height ${header.height}: ${e.message}`);
|
||||
}
|
||||
|
||||
await this.storage.putBlockHash(header.height, header.hash);
|
||||
this.currentHeight = header.height + 1;
|
||||
this._emit('syncProgress', this.getProgress());
|
||||
|
||||
if (idx % 20 === 19) {
|
||||
await new Promise(r => setTimeout(r, 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute transaction hash from a parsed tx that came from a binary blob.
|
||||
*
|
||||
* For v2+ RCTTypeNull (coinbase/protocol): uses the 3-hash scheme
|
||||
* hash = keccak256(prefix_hash || rct_base_hash || prunable_hash)
|
||||
* where rct_base_hash and prunable_hash are constants for type null.
|
||||
*
|
||||
* For v1: hash = keccak256(full blob)
|
||||
* @private
|
||||
*/
|
||||
_computeTxHashFromBlob(blockBlob, parsedTx) {
|
||||
if (!parsedTx?._bytesRead) return null;
|
||||
|
||||
const version = parsedTx.prefix?.version ?? 1;
|
||||
const rctType = parsedTx.rct?.type ?? 0;
|
||||
|
||||
// For v1 transactions: hash = keccak256(blob)
|
||||
if (version < 2) {
|
||||
// Can't easily extract the raw blob from blockBlob without offset tracking
|
||||
// Fall back — this case is extremely rare on Salvium
|
||||
return null;
|
||||
}
|
||||
|
||||
// For v2+ with RCTTypeNull (coinbase, protocol):
|
||||
// The blob is: prefix_bytes || 0x00 (rct type null)
|
||||
// prefix_hash = keccak256(prefix_bytes)
|
||||
// rct_base_hash = keccak256([0x00]) = constant
|
||||
// prunable_hash = keccak256([]) = constant
|
||||
if (rctType === 0) {
|
||||
// We need prefix bytes. parseTransaction gives _bytesRead for the entire tx.
|
||||
// For RCTTypeNull, the prefix ends 1 byte before the end (the 0x00 rct type).
|
||||
// But we don't have the raw tx blob slice here directly.
|
||||
// Use the prefix serialization approach instead.
|
||||
try {
|
||||
const prefixHash = this._hashTxPrefix(parsedTx);
|
||||
if (!prefixHash) return null;
|
||||
|
||||
// keccak256([0x00]) — rct_base hash for type null
|
||||
const rctBaseHash = cnFastHash(new Uint8Array([0x00]));
|
||||
// keccak256([]) — prunable hash (empty)
|
||||
const prunableHash = cnFastHash(new Uint8Array(0));
|
||||
|
||||
// Final: hash = keccak256(prefix_hash || rct_base_hash || prunable_hash)
|
||||
const combined = new Uint8Array(96);
|
||||
combined.set(prefixHash instanceof Uint8Array ? prefixHash : hexToBytes(prefixHash), 0);
|
||||
combined.set(rctBaseHash instanceof Uint8Array ? rctBaseHash : hexToBytes(rctBaseHash), 32);
|
||||
combined.set(prunableHash instanceof Uint8Array ? prunableHash : hexToBytes(prunableHash), 64);
|
||||
const hash = cnFastHash(combined);
|
||||
return bytesToHex(hash instanceof Uint8Array ? hash : hexToBytes(hash));
|
||||
} catch (e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// For non-null RCT types, we'd need to split rct_base and rct_prunable.
|
||||
// Skip — these won't appear in miner_tx/protocol_tx.
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hash a transaction prefix by re-serializing it.
|
||||
* @private
|
||||
*/
|
||||
_hashTxPrefix(parsedTx) {
|
||||
try {
|
||||
const prefixBytes = serializeTxPrefix(parsedTx);
|
||||
return cnFastHash(prefixBytes);
|
||||
} catch (e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fallback: parallel JSON fetch + bulk tx fetch.
|
||||
* Used when binary endpoint is unavailable.
|
||||
* @private
|
||||
*/
|
||||
async _syncBatchJsonFallback(headers) {
|
||||
// Parallel-fetch all block data
|
||||
const blockDataArr = new Array(headers.length);
|
||||
for (let i = 0; i < headers.length; i += FETCH_CONCURRENCY) {
|
||||
const chunk = headers.slice(i, Math.min(i + FETCH_CONCURRENCY, headers.length));
|
||||
const results = await Promise.all(
|
||||
chunk.map(h => this.daemon.getBlock({ height: h.height }))
|
||||
);
|
||||
for (let j = 0; j < results.length; j++) {
|
||||
blockDataArr[i + j] = results[j];
|
||||
}
|
||||
}
|
||||
|
||||
// Parse block JSONs, collect all regular tx hashes
|
||||
const parsedBlocks = new Array(headers.length);
|
||||
const allTxHashes = [];
|
||||
|
||||
for (let idx = 0; idx < headers.length; idx++) {
|
||||
const blockResp = blockDataArr[idx];
|
||||
if (!blockResp?.success) {
|
||||
parsedBlocks[idx] = null;
|
||||
continue;
|
||||
}
|
||||
const block = blockResp.result;
|
||||
let blockJson = null;
|
||||
if (block.json) {
|
||||
try { blockJson = JSON.parse(block.json); } catch (e) { /* skip */ }
|
||||
}
|
||||
parsedBlocks[idx] = { block, blockJson };
|
||||
|
||||
const txHashes = blockJson?.tx_hashes || [];
|
||||
for (const hash of txHashes) {
|
||||
allTxHashes.push(hash);
|
||||
}
|
||||
}
|
||||
|
||||
// Bulk-fetch all regular transactions in one call
|
||||
const txDataMap = new Map();
|
||||
if (allTxHashes.length > 0) {
|
||||
const txsResponse = await this.daemon.getTransactions(allTxHashes, {
|
||||
decode_as_json: true
|
||||
});
|
||||
if (txsResponse.success && txsResponse.result.txs) {
|
||||
for (const txData of txsResponse.result.txs) {
|
||||
txDataMap.set(txData.tx_hash, txData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process blocks sequentially with pre-fetched data
|
||||
for (let idx = 0; idx < headers.length; idx++) {
|
||||
if (this._stopRequested) break;
|
||||
const header = headers[idx];
|
||||
const parsed = parsedBlocks[idx];
|
||||
|
||||
if (!parsed) {
|
||||
throw new Error(`Failed to get block ${header.height}`);
|
||||
}
|
||||
|
||||
await this._processBlockPrefetched(header, parsed.block, parsed.blockJson, txDataMap);
|
||||
await this.storage.putBlockHash(header.height, header.hash);
|
||||
this.currentHeight = header.height + 1;
|
||||
this._emit('syncProgress', this.getProgress());
|
||||
|
||||
if (idx % 20 === 19) {
|
||||
await new Promise(r => setTimeout(r, 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adjust batch size based on per-block throughput trend.
|
||||
*
|
||||
* Tracks ms/block across batches:
|
||||
* - If per-block time dropped (faster): scale up aggressively
|
||||
* - If per-block time more than doubled (slower): scale back ~30%
|
||||
* - If per-block time rose modestly: scale back gently
|
||||
* - If roughly stable: nudge up slightly
|
||||
*
|
||||
* @private
|
||||
*/
|
||||
_adjustBatchSize(batchStartTime, blocksProcessed) {
|
||||
const elapsed = Date.now() - batchStartTime;
|
||||
const msPerBlock = blocksProcessed > 0 ? elapsed / blocksProcessed : elapsed;
|
||||
|
||||
// Too fast (< 60% of target) → increase by 20%
|
||||
if (elapsed < TARGET_BATCH_TIME * 0.6) {
|
||||
this.batchSize = Math.min(
|
||||
Math.round(this.batchSize * (1 + BATCH_ADJUST_FACTOR)),
|
||||
MAX_BATCH_SIZE
|
||||
);
|
||||
}
|
||||
// Too slow (> 150% of target) → decrease by 20%
|
||||
else if (elapsed > TARGET_BATCH_TIME * 1.5) {
|
||||
this.batchSize = Math.max(
|
||||
Math.round(this.batchSize * (1 - BATCH_ADJUST_FACTOR)),
|
||||
MIN_BATCH_SIZE
|
||||
);
|
||||
}
|
||||
// else: in the sweet spot, keep same
|
||||
const prev = this._lastMsPerBlock;
|
||||
let newSize = this.batchSize;
|
||||
|
||||
if (prev > 0) {
|
||||
const ratio = msPerBlock / prev;
|
||||
|
||||
if (ratio > 2.0) {
|
||||
// Processing time more than doubled — scale back hard
|
||||
newSize = Math.round(this.batchSize * 0.5);
|
||||
} else if (ratio > 1.3) {
|
||||
// Slowed down moderately — scale back gently
|
||||
newSize = Math.round(this.batchSize * 0.75);
|
||||
} else if (ratio < 0.5) {
|
||||
// Processing time dropped by more than half — scale up aggressively
|
||||
newSize = Math.round(this.batchSize * 2.0);
|
||||
} else if (ratio < 0.8) {
|
||||
// Getting faster — scale up
|
||||
newSize = Math.round(this.batchSize * 1.5);
|
||||
} else {
|
||||
// Stable — nudge up 10%
|
||||
newSize = Math.round(this.batchSize * 1.1);
|
||||
}
|
||||
} else {
|
||||
// First batch — if it was fast, double; otherwise keep
|
||||
if (msPerBlock < 50) {
|
||||
newSize = Math.round(this.batchSize * 2.0);
|
||||
}
|
||||
}
|
||||
|
||||
this.batchSize = Math.max(MIN_BATCH_SIZE, Math.min(MAX_BATCH_SIZE, newSize));
|
||||
this._lastMsPerBlock = msPerBlock;
|
||||
this._lastBatchBlocks = blocksProcessed;
|
||||
|
||||
// Emit batch timing info for debugging/monitoring
|
||||
this._emit('batchComplete', {
|
||||
elapsed,
|
||||
batchSize: this.batchSize,
|
||||
blocksPerSec: this.batchSize / (elapsed / 1000)
|
||||
blocksProcessed,
|
||||
msPerBlock: Math.round(msPerBlock),
|
||||
blocksPerSec: blocksProcessed / (elapsed / 1000)
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single block
|
||||
* Process a single block using pre-fetched data (no RPC calls).
|
||||
* @private
|
||||
* @param {Object} header - Block header
|
||||
* @param {Object} block - Pre-fetched block RPC result
|
||||
* @param {Object|null} blockJson - Pre-parsed block JSON
|
||||
* @param {Map} txDataMap - Map of txHash → txData (pre-fetched regular txs)
|
||||
*/
|
||||
async _processBlock(header) {
|
||||
|
||||
// Get full block data - includes miner_tx and protocol_tx in JSON
|
||||
const blockResponse = await this.daemon.getBlock({ height: header.height });
|
||||
if (!blockResponse.success) {
|
||||
throw new Error(`Failed to get block ${header.height}`);
|
||||
}
|
||||
|
||||
const block = blockResponse.result;
|
||||
|
||||
// Parse block JSON to get miner_tx and protocol_tx directly
|
||||
// (they can't be fetched via getTransactions - that returns empty as_hex)
|
||||
let blockJson = null;
|
||||
if (block.json) {
|
||||
try {
|
||||
blockJson = JSON.parse(block.json);
|
||||
} catch (e) {
|
||||
console.error(`Failed to parse block JSON at height ${header.height}:`, e.message);
|
||||
}
|
||||
}
|
||||
|
||||
async _processBlockPrefetched(header, block, blockJson, txDataMap) {
|
||||
|
||||
// Process miner_tx (coinbase - block reward)
|
||||
if (blockJson?.miner_tx && block.miner_tx_hash) {
|
||||
@@ -502,23 +754,17 @@ export class WalletSync {
|
||||
);
|
||||
}
|
||||
|
||||
// Fetch and process regular transactions
|
||||
// Process regular transactions from pre-fetched map
|
||||
const txHashes = blockJson?.tx_hashes || [];
|
||||
if (txHashes.length > 0) {
|
||||
const txsResponse = await this.daemon.getTransactions(txHashes, {
|
||||
decode_as_json: true
|
||||
});
|
||||
for (const txHash of txHashes) {
|
||||
const txData = txDataMap.get(txHash);
|
||||
if (!txData) continue;
|
||||
|
||||
if (txsResponse.success && txsResponse.result.txs) {
|
||||
for (const txData of txsResponse.result.txs) {
|
||||
if (txData.as_hex) {
|
||||
await this._processTransaction(txData, header, { isMinerTx: false, isProtocolTx: false });
|
||||
} else if (txData.as_json) {
|
||||
// Fallback for testnet/in-memory nodes that don't have binary serialization
|
||||
const txJson = typeof txData.as_json === 'string' ? JSON.parse(txData.as_json) : txData.as_json;
|
||||
await this._processEmbeddedTransaction(txJson, txData.tx_hash, header, { isMinerTx: false, isProtocolTx: false });
|
||||
}
|
||||
}
|
||||
if (txData.as_hex) {
|
||||
await this._processTransaction(txData, header, { isMinerTx: false, isProtocolTx: false });
|
||||
} else if (txData.as_json) {
|
||||
const txJson = typeof txData.as_json === 'string' ? JSON.parse(txData.as_json) : txData.as_json;
|
||||
await this._processEmbeddedTransaction(txJson, txData.tx_hash, header, { isMinerTx: false, isProtocolTx: false });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -533,6 +779,84 @@ export class WalletSync {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a transaction already in parsed format (from parseTransaction).
|
||||
* Used by the binary fetch path — no JSON conversion needed.
|
||||
* @private
|
||||
* @param {Object} tx - Parsed transaction (from parseTransaction or parseBlock)
|
||||
* @param {string} txHash - Transaction hash (hex)
|
||||
* @param {Object} header - Block header
|
||||
* @param {Object} options - { isMinerTx, isProtocolTx }
|
||||
*/
|
||||
async _processParsedTransaction(tx, txHash, header, options = {}) {
|
||||
const { isMinerTx = false, isProtocolTx = false } = options;
|
||||
|
||||
const existing = await this.storage.getTransaction(txHash);
|
||||
if (existing && existing.isConfirmed) return;
|
||||
|
||||
try {
|
||||
const txPubKey = extractTxPubKey(tx);
|
||||
const paymentId = extractPaymentId(tx);
|
||||
const txType = isMinerTx ? 'miner' : (isProtocolTx ? 'protocol' : this._getTxType(tx));
|
||||
|
||||
const ownedOutputs = await this._scanOutputs(tx, txHash, txPubKey, header, txType);
|
||||
const spentOutputs = isProtocolTx ? await this._checkSpentOutputs(tx, txHash, header) :
|
||||
(!isMinerTx ? await this._checkSpentOutputs(tx, txHash, header) : []);
|
||||
|
||||
if (ownedOutputs.length === 0 && spentOutputs.length === 0) return;
|
||||
|
||||
let incomingAmount = 0n;
|
||||
let outgoingAmount = 0n;
|
||||
for (const output of ownedOutputs) incomingAmount += output.amount;
|
||||
for (const spent of spentOutputs) outgoingAmount += spent.amount;
|
||||
|
||||
const walletTx = new WalletTransaction({
|
||||
txHash,
|
||||
txPubKey: txPubKey ? bytesToHex(txPubKey) : null,
|
||||
blockHeight: header.height,
|
||||
blockTimestamp: header.timestamp,
|
||||
isConfirmed: true,
|
||||
inPool: false,
|
||||
isIncoming: incomingAmount > 0n,
|
||||
isOutgoing: outgoingAmount > 0n,
|
||||
incomingAmount,
|
||||
outgoingAmount,
|
||||
fee: (isMinerTx || isProtocolTx) ? 0n : this._safeBigInt(tx.rct?.txnFee),
|
||||
paymentId,
|
||||
unlockTime: tx.prefix?.unlockTime || 0n,
|
||||
txType,
|
||||
isMinerTx,
|
||||
isProtocolTx,
|
||||
transfers: [
|
||||
...ownedOutputs.map(o => ({
|
||||
type: 'incoming',
|
||||
amount: o.amount.toString(),
|
||||
subaddressIndex: o.subaddressIndex
|
||||
})),
|
||||
...spentOutputs.map(o => ({
|
||||
type: 'outgoing',
|
||||
amount: o.amount.toString()
|
||||
}))
|
||||
]
|
||||
});
|
||||
|
||||
await this.storage.putTransaction(walletTx);
|
||||
|
||||
if (ownedOutputs.length > 0) {
|
||||
this._emit('outputReceived', {
|
||||
txHash, outputs: ownedOutputs, blockHeight: header.height
|
||||
});
|
||||
}
|
||||
if (spentOutputs.length > 0) {
|
||||
this._emit('outputSpent', {
|
||||
txHash, outputs: spentOutputs, blockHeight: header.height
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`Error processing tx ${txHash}: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process an embedded transaction (miner_tx or protocol_tx from block JSON)
|
||||
* @private
|
||||
|
||||
Reference in New Issue
Block a user