Quick Update: Making My Hive-Engine Script Faster

in #coding3 months ago

Hey folks,

Just a quick update on what I’ve been tinkering with for the past few days. I’ve been streamlining my Hive-Engine liquidity pools script, and I’m happy to report that it’s now significantly faster. Where it used to take 5-20 seconds to process accounts with lots of liquidity pools, it now zips through in way under a second — basically, it’s gone from sluggish to speedy!


An unrelated photo of a Wood Pigeon in a tree, taken by Yours Truly.

What made the difference? Well, I cut down on repetitive requests by fetching pool details once per run instead of making multiple fetches. Sounds simple, right? Turns out it made a world of difference.

I got some great ideas from @Arc7icWolf for this one. I hadn't learned, and didn't quite know about the Sessions capability of the requests module he suddenly mentioned out of the blue, so I checked out the functionality, and it was an eyeopener. I could now open a connection to an API server, and "stream" everything within the connection, without having to make individual queries, opening and closing the connection between each of them. This would mean it's both a time- and resource-saver in the same package!

Turns out that since my script currently makes one query only, using a session here will actually slow things down, about one tenth of a second.

However, I’ve kept the session functionality in the script, even though it’s not strictly necessary for this particular version. Why? Because I’ve got bigger plans! The main script I’m working on will rely on session-based requests to keep things fast and smooth. This is just a stepping stone.

More news...

In the process of all this, I’ve picked up some new skills, including messing around with Python more seriously. I’ve even started writing a simple text editor in Python — just another tool to add to my growing collection of side projects.

If you’re curious, here’s the streamlined script I’ve been working on:

# fetch_liquidity_pools.py
import json
import os
import argparse
import requests

# from time import sleep
import time
from random import choice  # To randomly choose a node from the list

# Hive-Engine API Node
# HIVE_ENGINE_NODE = 'https://api2.hive-engine.com/rpc/contracts'
NODES_FILE = "nodes.json"
retry_delay = 5  # seconds to wait between retries
max_retries = 3  # Maximum number of retries

# Default values
DEFAULT_ACCOUNT_NAME = "hive-engine"  # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = (
    "BTC"  # Replace with the desired token to filter, or use 'ALL' to list all tokens
)

# File to store token details with precision
TOKEN_CACHE_FILE = "token_details_cache.json"
cached_token_details = {}
hive_engine_nodes = []


def load_nodes():
    global hive_engine_nodes
    # Check if the nodes file exists
    if os.path.exists(NODES_FILE):
        try:
            with open(NODES_FILE, "r") as f:
                hive_engine_nodes = json.load(f)
                print("Loaded Hive-Engine nodes from file.")
        except (ValueError, IOError):
            print(
                "Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes."
            )
    else:
        print(
            "Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes."
        )
        hive_engine_nodes = []  # Ensure nodes list is empty on error


def get_node():
    # Choose a random node from the list
    if hive_engine_nodes:
        selected_node = choice(hive_engine_nodes)
        print(f"Using Hive-Engine node: {selected_node}")  # Print the current node
        return selected_node
    else:
        print("Error: No Hive-Engine nodes available.")
        return None


def load_token_cache():
    global cached_token_details
    # Check if the token cache file exists
    if os.path.exists(TOKEN_CACHE_FILE):
        try:
            with open(TOKEN_CACHE_FILE, "r") as f:
                cached_token_details = json.load(f)
                print("Loaded cached token details from file.")
        except (ValueError, IOError):
            print(
                "Error: Failed to load token cache file. Starting with an empty cache."
            )


def save_token_cache():
    # Save the current token details cache to a file
    try:
        with open(TOKEN_CACHE_FILE, "w") as f:
            json.dump(cached_token_details, f)
            print("Saved token details to cache file.")
    except IOError:
        print("Error: Failed to save token cache file.")


def fetch_token_details(symbol, session):
    # Check if token details are already cached
    if symbol in cached_token_details:
        # print(f"Token details for {symbol} found in cache.")
        return cached_token_details[symbol]

    print(f"Fetching token details for {symbol}...")
    # Fetch token details for the given symbol
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "tokens",
                "table": "tokens",
                "query": {"symbol": symbol},
                "limit": 1,
            },
        }

        response = session.post(url, json=payload)

        # print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")

        if response.status_code == 200:
            try:
                data = response.json()
            except ValueError:
                print("Error: Failed to parse JSON response.")
                return {}

            if "result" in data and data["result"]:
                cached_token_details[symbol] = data["result"][
                    0
                ]  # Cache the token details
                save_token_cache()  # Save cache after updating
                return data["result"][0]

        print(
            f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}"
        )
        if attempt < max_retries - 1:
            time.sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {symbol}. Skipping.")

    return {}


def fetch_pool_details(token_pair):
    # Fetch details of the specified liquidity pool
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            print("Error: No node URL available, exiting fetch_pool_details.")
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "marketpools",
                "table": "pools",
                "query": {"tokenPair": token_pair},
                "limit": 1,
            },
        }

        print(
            f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}..."
        )  # Debugging statement

        try:
            response = session.post(
                url, json=payload, timeout=10
            )  # Set a timeout for the request
            # print(
            #     f"Received response status code: {response.status_code} for {token_pair} from {url}"
            # )

            if response.status_code == 200:
                try:
                    data = response.json()
                    print(
                        f"Data received for {token_pair}: {data}"
                    )  # Debugging the received data
                    if "result" in data and data["result"]:
                        print(f"Successfully fetched pool details for {token_pair}")
                        return data["result"][0]
                    else:
                        print(
                            f"Unexpected response format or empty result for {token_pair} from {url}: {data}"
                        )
                except ValueError as e:
                    print(f"Error: Failed to parse JSON response: {e}.")
                    # print(f"Response content: {response.text}") # Print the actual response content
            else:
                print(
                    f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}"
                )
        except requests.exceptions.RequestException as e:
            print(f"Request exception occurred for {token_pair} from {url}: {e}")

        # Handle retries
        if attempt < max_retries - 1:
            print(f"Retrying after {retry_delay} seconds...")
            time.sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {token_pair}. Skipping to next.")

    print(f"Returning empty details for {token_pair} after all attempts.")
    return {}


def fetch_all_pools(session):
    url = get_node()
    if not url:
        return []

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "pools",
            "query": {},
            "limit": 1000,  # Adjust limit based on how many pools exist
        },
    }

    try:
        response = session.post(url, json=payload, timeout=10)
        if response.status_code == 200:
            data = response.json()
            return data.get("result", [])
        else:
            print(
                f"Error: Failed to fetch all pools. Status Code: {response.status_code}"
            )
            return []
    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")
        return []


def fetch_liquidity_positions(account_name, retries=5, backoff_factor=5):
    # Fetch liquidity positions for the given account
    url = get_node()
    if not url:
        return {}

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "liquidityPositions",
            "query": {"account": account_name},
            "limit": 1000,
        },
    }

    for attempt in range(retries):
        try:
            response = session.post(url, json=payload, timeout=10)
            # print("Response Status Code: ", response.status_code)

            # Print the entire raw response text for debugging purposes
            # print("Raw response text: ", response.text)

            if response.status_code == 200:
                try:
                    data = response.json()
                    return data.get("result", [])
                except ValueError:
                    print("Error: Failed to parse JSON response.")
                    return []
            else:
                print(
                    f"Error: Failed to fetch data. Status Code: {response.status_code}"
                )
                return []

        except requests.exceptions.ConnectionError as e:
            print(
                f"Attempt {attempt + 1}: Connection error: {e}, retrying in {backoff_factor} seconds..."
            )
            time.sleep(backoff_factor)
        except requests.exceptions.Timeout as e:
            print(
                f"Attempt {attempt + 1}: Request timed out: {e}, retrying in {backoff_factor} seconds..."
            )
            time.sleep(backoff_factor)
        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt + 1}: An error occurred: {e}")
            return []

    print(f"Max retries exceeded. Could not fetch data for account: {account_name}")
    return []


def get_filtered_pools(account_name, filter_token, session):
    # Fetch all pools in one go
    all_pools = fetch_all_pools(session)

    # Check if pools were fetched succesfully
    if not all_pools:
        print("Error: Failed to fetch all pools.")
        return []

    pool_dict = {pool["tokenPair"]: pool for pool in all_pools}

    # Get and filter pools by the specified token
    positions = fetch_liquidity_positions(account_name)

    # Debug: Check fetched positions
    print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")
    # print("Test print of all the fetched positions:")
    # print(json.dumps(positions, indent=4))  # Pretty-print the positions

    if not positions:
        print("No liquidity positions found for this account.")
        return []

    filtered_pools = []

    for position in positions:
        token_pair = position.get("tokenPair", "Unknown")

        # Debug: Print each position being processed
        # print(f"Processing position: {position}")

        # If filter_token is 'ALL', skip filtering; otherwise, check for the token in the pair
        if (
            filter_token.upper() != "ALL"
            and filter_token.upper() not in token_pair.upper()
        ):
            # print(f"Skipping position {token_pair} as it does not match filter token {filter_token.upper()}")
            continue

        # Additional debug to see which positions pass the filter
        # print(
        #    f"Including position {token_pair} with filter token {filter_token.upper()}"
        # )

        # Fetch the pool details from the all_pools dictionary
        pool_details = pool_dict.get(token_pair)
        if not pool_details:
            print(f"Warning: No pool details found for {token_pair}")
            continue

        # Calculate user balances
        shares = float(position.get("shares", "0"))
        base_quantity = float(pool_details.get("baseQuantity", "0"))
        quote_quantity = float(pool_details.get("quoteQuantity", "0"))
        total_shares = float(pool_details.get("totalShares", "0"))

        if total_shares == 0:
            print(f"Skipping position {token_pair} due to total shares being 0.")
            continue

        # Calculate user balances
        user_base_balance = (shares / total_shares) * base_quantity
        user_quote_balance = (shares / total_shares) * quote_quantity

        if ":" in token_pair:
            base_symbol, quote_symbol = token_pair.split(":")
        else:
            base_symbol, quote_symbol = "Unknown", "Unknown"

        # Fetch token details to get precision
        base_token_details = fetch_token_details(base_symbol, session)
        quote_token_details = fetch_token_details(quote_symbol, session)
        base_precision = base_token_details.get("precision", 0)
        quote_precision = quote_token_details.get("precision", 0)

        filtered_pools.append(
            {
                "token_pair": token_pair,
                "base_symbol": base_symbol,
                "quote_symbol": quote_symbol,
                "base_balance": user_base_balance,
                "quote_balance": user_quote_balance,
                "base_precision": base_precision,
                "quote_precision": quote_precision,
            }
        )

    # Debug: Print the number of filtered pools
    print(f"Number of filtered pools: {len(filtered_pools)}")

    return filtered_pools


def main(account_name, filter_token):
    # Load nodes from the external file
    load_nodes()

    # Load cached token details
    load_token_cache()

    # Create a session object
    with requests.Session() as session:
        # Fetch and print filtered pools
        pools = get_filtered_pools(account_name, filter_token, session)
        print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
        for pool in pools:
            print(
                f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
                f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}"
            )

        # Debug: If no pools were printed
        if not pools:
            print("No matching liquidity pools found for the given filter.")


if __name__ == "__main__":
    # When run as a standalone script
    session = requests.Session()
    try:
        parser = argparse.ArgumentParser(
            description="Fetch Hive-Engine liquidity pools."
        )
        parser.add_argument(
            "account_name",
            nargs="?",
            default=DEFAULT_ACCOUNT_NAME,
            help="Hive account name to fetch liquidity pools for.",
        )
        parser.add_argument(
            "filter_token",
            nargs="?",
            default=DEFAULT_FILTER_TOKEN,
            help="Token to filter by, or 'ALL' to list all tokens.",
        )

        args = parser.parse_args()

        main(args.account_name, args.filter_token)
    finally:
        session.close()

That’s all for now! I’ll keep you posted as I make more headway on the main project. Until then, happy coding!

Sort:  

I could now open a connection to an API server, and "stream" everything within the connection, without having to make individual queries, opening and closing the connection between each of them.

I've also seen your reply and I'm happy to see that I had roughly understood the idea behind this tool! Basically, if I have 3 calls I need to send to the API, instead of doing "open-do_A-close", "open-do_B-close" and "open-do_C-close" (9 steps total), I can do "open-do_A-do_B-do_C-close", reducing the steps to 5, right?

Btw, I'm happy this addition might help you in the future :)

Yes I believe that's exactly how it works. Oh, and with requests.Session() as session: opens and keeps the connection up as long as you use it, and then automatically closes it when the operation is finished, so you don't need to close it yourself at the end of your program with session.close().

As soon as I had submitted my last post about the new and "improved" liquidity fetching script, I realized it was riddled with new bugs, and some old ones somehow raised their ugly heads, so I had to do some bug hunting again. I think I am finally done with the scripts, and I think I might release both of them in the coming days. I just have to do some clean-up, and maybe prettify their output a little. Also need to write some instructions on how to use them, so...

Keep it up! Your are almost there :) can't wait to read and analize it again and see if I can learn even more new stuff! Meanwhile I've also started working again on my script too and I'm trying to make it more useful... and, if I can, at least a bit better than what it used to be!

Wow! You have been very busy!
Nice work for the community.
Have a nice day!

Thanks! I just noticed that even the current script was buggy. I will still need to fiddle with it, because it just doesn't cut it for the job I originally wanted and created it for.

You have a nice day too! Has the flood subsided yet?

!PIZZA !WINE

Luckily, the flood has subsided. People are in the phase of cleaning up all the mud. Then, within a week we could observe all the damages around the house and garden.

I’m so relieved and happy to hear that the flood has subsided. I hope the damage isn’t too severe. Water truly is a force of nature... It can erode and destroy, yet also sustain life. It really puts things in perspective and makes many other worries feel tiny in comparison. We wish you the very best.

!WINE

Thanks so much for your good wishes.

Hope you’ll be able to get a lot of things done and post more photos of your pets.

PIZZA!

$PIZZA slices delivered:
@gamer00(2/15) tipped @kaminchan