From 424464a8b73a81dea3a98cb9354df7a79e33b580 Mon Sep 17 00:00:00 2001 From: lomasson <97454276+lomasson@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:15:56 +0200 Subject: [PATCH] feat: optimize script fetching data (#222) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Maciej Kamiński --- .gitignore | 5 +- scripts/data/client.sh | 23 +-- scripts/data/generate_data.py | 211 ++++++++++++++++++++---- scripts/data/generate_timestamp_data.py | 20 +-- scripts/data/generate_utxo_data.py | 61 ++++--- scripts/data/regenerate_tests.sh | 2 +- 6 files changed, 245 insertions(+), 77 deletions(-) mode change 100644 => 100755 scripts/data/client.sh diff --git a/.gitignore b/.gitignore index 2fb090f2..efec7b40 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,7 @@ Cargo.lock .python-version __pycache__ -.client_cache/ \ No newline at end of file +.client_cache/ +.utxo_data/ +.timestamps_data/ + diff --git a/scripts/data/client.sh b/scripts/data/client.sh old mode 100644 new mode 100755 index 5b39d1a2..7296f3c9 --- a/scripts/data/client.sh +++ b/scripts/data/client.sh @@ -6,12 +6,13 @@ base_dir=".client_cache" start=${1:-0} -end=${2:-100} +no_of_blocks=${2:-100} +end=$(($start+$no_of_blocks)) step=${3:-1} mode=${4:-"light"} strategy=${5:-"sequential"} -mkdir $base_dir || true +mkdir -p $base_dir run_client() { local initial_height=$1 @@ -19,21 +20,23 @@ run_client() { first=$((initial_height+1)) second=$((initial_height+num_blocks)) - echo -n "Running $mode client on blocks $first — $second ..." batch_file=${base_dir}/${mode}_${initial_height}_${num_blocks}.json + arguments_file=${base_dir}/arguments-${mode}_${initial_height}_${num_blocks}.json if [ ! -f "$batch_file" ]; then - python ../../scripts/data/generate_data.py $mode $initial_height $num_blocks true $batch_file + python ../../scripts/data/generate_data.py --fast --mode $mode --height $initial_height --num_blocks $num_blocks --include_expected --output_file $batch_file fi - - arguments=$(python ../../scripts/data/format_args.py $batch_file) - output=$(scarb cairo-run --no-build --package client --function test "$arguments") - if [[ "$output" == *"FAIL"* ]]; then - echo " fail" + + echo -n "Running $mode client on blocks $first - $second " + python ../../scripts/data/format_args.py $batch_file > $arguments_file + output=$(scarb cairo-run --no-build --package client --function test --arguments-file $arguments_file) + if [[ $? -ne 0 || "$output" == *"FAIL"* || "$output" == *error* || "$output" == *panicked* ]]; then + echo "fail" echo $output exit 1 else - echo " ok" + echo "ok" + echo $output fi } diff --git a/scripts/data/generate_data.py b/scripts/data/generate_data.py index c837405e..a483fec1 100755 --- a/scripts/data/generate_data.py +++ b/scripts/data/generate_data.py @@ -4,8 +4,12 @@ import os import json import requests +import argparse from pathlib import Path from decimal import Decimal, getcontext +from generate_timestamp_data import get_timestamp_data +from generate_utxo_data import get_utxo_set +from tqdm import tqdm getcontext().prec = 16 @@ -13,6 +17,8 @@ USERPWD = os.getenv("USERPWD") DEFAULT_URL = "https://bitcoin-mainnet.public.blastapi.io" +FAST = False + def request_rpc(method: str, params: list): """Makes a JSON-RPC call to a Bitcoin API endpoint. @@ -32,6 +38,26 @@ def request_rpc(method: str, params: list): raise ConnectionError(f"Unexpected RPC response:\n{res.text}") +def fetch_chain_state_fast(block_height: int): + """Fetches chain state at the end of a specific block with given height. + Chain state is a just a block header extended with extra fields: + - prev_timestamps + - epoch_start_time + """ + # Chain state at height H is the state after applying block H + block_hash = request_rpc("getblockhash", [block_height]) + head = request_rpc("getblockheader", [block_hash]) + + # If block is downloaded take it locally + data = get_timestamp_data(block_height)[str(block_height)] + head["prev_timestamps"] = [int(t) for t in data["previous_timestamps"]] + if block_height < 2016: + head["epoch_start_time"] = 1231006505 + else: + head["epoch_start_time"] = int(data["epoch_start_time"]) + return head + + def fetch_chain_state(block_height: int): """Fetches chain state at the end of a specific block with given height. Chain state is a just a block header extended with extra fields: @@ -44,7 +70,7 @@ def fetch_chain_state(block_height: int): # In order to init prev_timestamps we need to query 10 previous headers prev_header = head - prev_timestamps = [head["time"]] + prev_timestamps = [int(head["time"])] for _ in range(10): if prev_header["height"] == 0: prev_timestamps.insert(0, 0) @@ -52,7 +78,7 @@ def fetch_chain_state(block_height: int): prev_header = request_rpc( "getblockheader", [prev_header["previousblockhash"]] ) - prev_timestamps.insert(0, prev_header["time"]) + prev_timestamps.insert(0, int(prev_header["time"])) head["prev_timestamps"] = prev_timestamps # In order to init epoch start we need to query block header at epoch start @@ -120,16 +146,24 @@ def bits_to_target(bits: str) -> int: return mantissa << (8 * (exponent - 3)) -def fetch_block(block_hash: str, include_utreexo_data: bool): +def fetch_block(block_height: int, block_hash: str, include_utreexo_data: bool, fast): """Downloads block with transactions (and referred UTXOs) from RPC given the block hash.""" block = request_rpc("getblock", [block_hash, 2]) + + previous_outputs = ( + {(o["txid"], int(o["vout"])): o for o in get_utxo_set(block_height + 1)} + if fast + else None + ) + block["data"] = { - tx["txid"]: resolve_transaction(tx, include_utreexo_data) for tx in block["tx"] + tx["txid"]: resolve_transaction(tx, include_utreexo_data, previous_outputs) + for tx in tqdm(block["tx"], "Resolving transactions") } return block -def resolve_transaction(transaction: dict, include_utreexo_data: bool): +def resolve_transaction(transaction: dict, include_utreexo_data, previous_outputs): """Resolves transaction inputs and formats the content according to the Cairo type.""" if include_utreexo_data: return { @@ -137,7 +171,9 @@ def resolve_transaction(transaction: dict, include_utreexo_data: bool): "txid": transaction["txid"], # Skip the first 4 bytes (version) and take the next 4 bytes (marker + flag) "is_segwit": transaction["hex"][8:12] == "0001", - "inputs": [resolve_input(input) for input in transaction["vin"]], + "inputs": [ + resolve_input(input, previous_outputs) for input in transaction["vin"] + ], "outputs": [format_output(output) for output in transaction["vout"]], "lock_time": transaction["locktime"], } @@ -146,23 +182,52 @@ def resolve_transaction(transaction: dict, include_utreexo_data: bool): "version": transaction["version"], # Skip the first 4 bytes (version) and take the next 4 bytes (marker + flag) "is_segwit": transaction["hex"][8:12] == "0001", - "inputs": [resolve_input(input) for input in transaction["vin"]], + "inputs": [ + resolve_input(input, previous_outputs) for input in transaction["vin"] + ], "outputs": [format_output(output) for output in transaction["vout"]], "lock_time": transaction["locktime"], } -def resolve_input(input: dict): +def resolve_input(input: dict, previous_outputs): """Resolves referenced UTXO and formats the transaction inputs according to the Cairo type.""" if input.get("coinbase"): return format_coinbase_input(input) else: - return { - "script": f'0x{input["scriptSig"]["hex"]}', - "sequence": input["sequence"], - "previous_output": resolve_outpoint(input), - "witness": [f"0x{item}" for item in input.get("txinwitness", [])], - } + if previous_outputs: + previous_output = previous_outputs.get((input["txid"], input["vout"])) + return { + "script": f'0x{input["scriptSig"]["hex"]}', + "sequence": input["sequence"], + "previous_output": format_outpoint(previous_output), + "witness": [f"0x{item}" for item in input.get("txinwitness", [])], + } + else: + return { + "script": f'0x{input["scriptSig"]["hex"]}', + "sequence": input["sequence"], + "previous_output": resolve_outpoint(input), + "witness": [f"0x{item}" for item in input.get("txinwitness", [])], + } + + +def format_outpoint(previous_output): + """Formats output according to the Cairo type.""" + + return { + "txid": previous_output["txid"], + "vout": int(previous_output["vout"]), + "data": { + "value": int(previous_output["value"]), + "pk_script": f'0x{previous_output["pk_script"]}', + "cached": False, + }, + "block_hash": previous_output["block_hash"], + "block_height": int(previous_output["block_height"]), + "block_time": int(previous_output["block_time"]), + "is_coinbase": previous_output["is_coinbase"], + } def resolve_outpoint(input: dict): @@ -175,7 +240,7 @@ def resolve_outpoint(input: dict): "txid": input["txid"], "vout": input["vout"], "data": format_output(tx["vout"][input["vout"]]), - "block_hash": block["hash"], + "block_hash": tx["blockhash"], "block_height": block["height"], "block_time": block["time"], "is_coinbase": tx["vin"][0].get("coinbase") is not None, @@ -258,17 +323,31 @@ def generate_data( num_blocks: int, include_expected: bool, include_utreexo_data: bool, + fast: bool, ): """Generates arguments for Raito program in a human readable form and the expected result. :param mode: Validation mode: "light" — generate block headers with Merkle root only - "full" — generate full blocks with transactions (and referenced UTXOs) + "full, full_fast" — generate full blocks with transactions (and referenced UTXOs) :param initial_height: The block height of the initial chain state (0 means the state after genesis) :param num_blocks: The number of blocks to apply on top of it (has to be at least 1) :return: tuple (arguments, expected output) """ - chain_state = fetch_chain_state(initial_height) + + if fast: + print("Fetching chain state (fast)...") + else: + print("Fetching chain state...") + + print(f"blocks: {initial_height} - {initial_height + num_blocks - 1}") + + chain_state = ( + fetch_chain_state_fast(initial_height) + if fast + else fetch_chain_state(initial_height) + ) + next_block_hash = chain_state["nextblockhash"] blocks = [] @@ -276,8 +355,10 @@ def generate_data( if include_utreexo_data: blocks.append( fetch_block( + 0, "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f", include_utreexo_data, + fast, ) ) @@ -285,15 +366,14 @@ def generate_data( # UTXO defined to track unspent output in the same block to define cached output. utxo_set = {} + for i in range(num_blocks): + print(f"Fetching block {initial_height}| {i+1}/{num_blocks}") if mode == "light": block = fetch_block_header(next_block_hash) elif mode == "full": - print( - f"\rFetching block {initial_height + i}/{initial_height + num_blocks}", - end="", - flush=True, + block = fetch_block( + initial_height + i, next_block_hash, include_utreexo_data, fast ) - block = fetch_block(next_block_hash, include_utreexo_data) # Build UTXO set and mark outputs spent within the same block (span). # Also set "cached" flag for the inputs that spend those UTXOs. for txid, tx in block["data"].items(): @@ -303,7 +383,8 @@ def generate_data( tx_input["previous_output"]["vout"], ) if outpoint in utxo_set: - tx_input["previous_output"]["cached"] = True + # TODO: Check if this is correct + tx_input["previous_output"]["data"]["cached"] = True utxo_set[outpoint]["cached"] = True for idx, output in enumerate(tx["outputs"]): @@ -314,7 +395,7 @@ def generate_data( next_block_hash = block["nextblockhash"] blocks.append(block) - if mode == "full": + if mode == "full" or mode == "full_fast": # Do another pass to mark UTXOs spent within the same block (span) with "cached" flag. for block in blocks: for txid, tx in block["data"].items(): @@ -337,18 +418,84 @@ def generate_data( return result +def str2bool(value): + if isinstance(value, bool): + return value + if value.lower() in ("yes", "true", "t", "y", "1"): + return True + elif value.lower() in ("no", "false", "f", "n", "0"): + return False + else: + raise argparse.ArgumentTypeError("Boolean value expected.") + + # Usage: generate_data.py MODE INITIAL_HEIGHT NUM_BLOCKS INCLUDE_EXPECTED OUTPUT_FILE # Example: generate_data.py 'light' 0 10 false light_0_10.json if __name__ == "__main__": - if len(sys.argv) != 7: - raise TypeError("Expected six arguments") + + parser = argparse.ArgumentParser(description="Process UTXO files.") + parser.add_argument( + "--mode", + dest="mode", + default="full", + choices=["light", "full"], + help="Mode", + ) + + parser.add_argument( + "--height", + dest="height", + required=True, + type=int, + help="The block height of the initial chain state", + ) + + parser.add_argument( + "--num_blocks", + dest="num_blocks", + required=True, + type=int, + help="The number of blocks", + ) + + parser.add_argument( + "--include_expected", + dest="include_expected", + action="store_true", + help="Include expected output", + ) + + parser.add_argument( + "--include_utreexo_data", + dest="include_utreexo_data", + action="store_true", + help="Include utreexo data", + ) + + parser.add_argument( + "--output_file", + dest="output_file", + required=True, + type=str, + help="Output file", + ) + + parser.add_argument( + "--fast", + dest="fast", + action="store_true", + help="Fast mode", + ) + + args = parser.parse_args() data = generate_data( - mode=sys.argv[1], - initial_height=int(sys.argv[2]), - num_blocks=int(sys.argv[3]), - include_expected=sys.argv[4].lower() == "true", - include_utreexo_data=sys.argv[5].lower() == "true", + mode=args.mode, + initial_height=args.height, + num_blocks=args.num_blocks, + include_expected=args.include_expected, + include_utreexo_data=args.include_utreexo_data, + fast=args.fast, ) - Path(sys.argv[6]).write_text(json.dumps(data, indent=2)) + Path(args.output_file).write_text(json.dumps(data, indent=2)) diff --git a/scripts/data/generate_timestamp_data.py b/scripts/data/generate_timestamp_data.py index 1f019488..71b74719 100644 --- a/scripts/data/generate_timestamp_data.py +++ b/scripts/data/generate_timestamp_data.py @@ -10,7 +10,7 @@ INDEX_SIZE = 30000 -BASE_DIR = "timestamps_data" +BASE_DIR = ".timestamps_data" GCS_BUCKET_NAME = "shinigami-consensus" GCS_FOLDER_NAME = "timestamps" @@ -29,7 +29,7 @@ def download_timestamp(file_name: str): response = requests.get(url) if response.status_code != 200: - raise Exception(f"Failed to download {file_name}") + raise Exception(f"Failed to download {file_name}", response) with open(file_path, "wb") as f: f.write(response.content) @@ -47,11 +47,13 @@ def create_index(folder_path): return index -def list_files_in_gcs(bucket_name: str, prefix: str): +def list_files_in_gcs(): """List all files in a GCS bucket under a specific folder (prefix).""" - client = storage.Client() - bucket = client.get_bucket(bucket_name) - blobs = bucket.list_blobs(prefix=prefix) + print(f"Getting file list from GCS...") + client = storage.Client.create_anonymous_client() + bucket = client.get_bucket(GCS_BUCKET_NAME) + blobs = bucket.list_blobs(prefix=GCS_FOLDER_NAME) + return [ os.path.basename(blob.name) for blob in blobs if blob.name.endswith(".json") ] @@ -85,15 +87,13 @@ def load_index(file_name): def get_timestamp_data(block_number): """Get the timestamp data for a given block number.""" - print(int(block_number) // INDEX_SIZE) file_name = index_file_name(int(block_number) // INDEX_SIZE) - print(file_name) index = load_index(file_name) - return index[block_number] + return index if __name__ == "__main__": - file_names = list_files_in_gcs(GCS_BUCKET_NAME, GCS_FOLDER_NAME) + file_names = list_files_in_gcs() for file_name in tqdm(file_names, "Downloading files"): download_timestamp(file_name) diff --git a/scripts/data/generate_utxo_data.py b/scripts/data/generate_utxo_data.py index a5b4bae8..456cfa33 100644 --- a/scripts/data/generate_utxo_data.py +++ b/scripts/data/generate_utxo_data.py @@ -7,16 +7,32 @@ import subprocess from typing import Dict, Any import argparse +from google.cloud import storage from tqdm import tqdm from functools import lru_cache from collections import defaultdict # Constants -GCS_BASE_URL = "https://storage.googleapis.com/shinigami-consensus/utxos/" -BASE_DIR = "utxo_data" +BASE_DIR = ".utxo_data" CHUNK_SIZE = 10 INDEX_SIZE = 50000 +GCS_BUCKET_NAME = "shinigami-consensus" +GCS_FOLDER_NAME = "utxos" +GCS_BASE_URL = f"https://storage.googleapis.com/{GCS_BUCKET_NAME}/{GCS_FOLDER_NAME}/" + + +def list_files_in_gcs(): + """List all files in a GCS bucket under a specific folder (prefix).""" + print(f"Getting file list from GCS...") + client = storage.Client.create_anonymous_client() + bucket = client.get_bucket(GCS_BUCKET_NAME) + blobs = bucket.list_blobs(prefix=GCS_FOLDER_NAME) + + return [ + os.path.basename(blob.name) for blob in blobs if blob.name.endswith(".json") + ] + def download_and_split(file_name: str): """Download a file from GCS and split it into chunks.""" @@ -107,33 +123,38 @@ def load_index(file_name): def get_utxo_set(block_number: int) -> Dict[str, Any]: - index = load_index(index_file_name(int(block_number) // INDEX_SIZE)) + index_file = index_file_name(int(block_number) // INDEX_SIZE) + index = load_index(index_file) # Find chunk file chunk_file = index.get(str(block_number)) if not chunk_file: - raise Exception(f"Block number {block_number} not found in index") + return [] + # raise Exception(f"Block number {block_number} not found in index file: {index_file}") # Find and return data for the block with open(BASE_DIR + "/" + chunk_file, "r") as f: for line in f: - data = json.loads(line.strip()) - if data["block_number"] == block_number: - return data + # data = json.loads(line.strip()) + # if data["block_number"] == str(block_number): + if line.startswith(f'{{"block_number":"{block_number}"'): + data = json.loads(line.strip()) + return data["outputs"] raise Exception(f"Block {block_number} not found in chunk file {chunk_file}") -def process_file_range(start_file: str, end_file: str): +def process_files(num_files: int): """Process a range of files from start_file to end_file.""" os.makedirs(BASE_DIR, exist_ok=True) - start_num = int(start_file.split(".")[0]) - end_num = int(end_file.split(".")[0]) + files = list_files_in_gcs() - for file_num in tqdm(range(start_num, end_num + 1), desc="Processing files"): - file_name = f"{file_num:012d}.json" + if num_files: + files = files[:num_files] + + for file_name in tqdm(files, desc="Downloading files"): # print(f"\nProcessing file: {file_name}") download_and_split(file_name) @@ -144,20 +165,14 @@ def process_file_range(start_file: str, end_file: str): if __name__ == "__main__": parser = argparse.ArgumentParser(description="Process UTXO files.") parser.add_argument( - "--from", - dest="start_file", - required=True, - help="Starting file number (e.g., 000000000001)", - ) - parser.add_argument( - "--to", - dest="end_file", - required=True, - help="Ending file number (e.g., 000000000050)", + "--num_files", + dest="num_files", + type=int, + help="Number of files to process, all if not specified", ) args = parser.parse_args() - process_file_range(args.start_file, args.end_file) + process_files(args.num_files) print("All files processed successfully.") diff --git a/scripts/data/regenerate_tests.sh b/scripts/data/regenerate_tests.sh index 9a04780f..42c1fb76 100755 --- a/scripts/data/regenerate_tests.sh +++ b/scripts/data/regenerate_tests.sh @@ -62,5 +62,5 @@ done for test_case in "${full_test_cases[@]}"; do echo "Generating test data: full mode, chain state @ $test_case, single block" - generate_test "full" $test_case + generate_test "full_fast" $test_case done \ No newline at end of file