RE: RE: [ENG/ITA] Python & Hive: My Scripts are Ready! My First Project is Completed :)
You are viewing a single comment's thread from:

RE: [ENG/ITA] Python & Hive: My Scripts are Ready! My First Project is Completed :)

RE: [ENG/ITA] Python & Hive: My Scripts are Ready! My First Project is Completed :)

:)))))

I'd be extremely happy if it will help you at least a tiny bit, as it would be my first contributione ever to someone's else script! Just a very, very, very, veeeeeery small one, but that would still be something :)

That is my main script

Is this the script you shared in your latest post? Or an even bigger and more complex one?

Btw, here's your original code with the "Session" included, just to prove that I really experimented with it 🤣 (I don't know how to highlight my edits, but I only opened a session, added it as a parameter to the API calls and "prepared" the calls with the slightly different sintax of a session... but I have no idea if what I just said makes sense for you or for anyone at all!)

# fetch_liquidity_pools.py
import json
import os
import argparse
import requests
from time import sleep, time
from random import choice

# Hive-Engine API Nodes
NODES_FILE = 'nodes.json'
retry_delay = 5  # seconds to wait between retries
max_retries = 3  # Maximum number of retries

# Default values
DEFAULT_ACCOUNT_NAME = 'hive-engine'  # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = 'BTC'  # Replace with the desired default token to filter, or use 'ALL' to list all tokens

# File to store token details with precision
TOKEN_CACHE_FILE = 'token_details_cache.json'
cached_token_details = {}
hive_engine_nodes = []

def load_nodes():
    global hive_engine_nodes
    # Check if the nodes file exists
    if os.path.exists(NODES_FILE):
        try:
            with open(NODES_FILE, 'r') as f:
                hive_engine_nodes = json.load(f)
                print("Loaded Hive-Engine nodes from file.")
        except (ValueError, IOError):
            print("Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes.")
    else:
        print("Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes.")
        hive_engine_nodes = [] # Ensure nodes list is empty on error


def get_node():
# Choose a random node from the list
    if hive_engine_nodes:
        selected_node = choice(hive_engine_nodes)
        print(f"Using Hive-Engine node: {selected_node}") # Print the current node
        return selected_node
    else:
        print("Error: No Hive-Engine nodes available.")
        return None


def load_token_cache():
    global cached_token_details
    # Check if the token cache file exists
    if os.path.exists(TOKEN_CACHE_FILE):
        try:
            with open(TOKEN_CACHE_FILE, 'r') as f:
                cached_token_details = json.load(f)
                print("Loaded cached token details from file.")
        except (ValueError, IOError):
            print("Error: Failed to load token cache file. Starting with an empty cache.")


def save_token_cache():
    # Save the current token details cache to a file
    try:
        with open(TOKEN_CACHE_FILE, 'w') as f:
            json.dump(cached_token_details, f)
            print("Saved token details to cache file.")
    except IOError:
        print("Error: Failed to save token cache file.")


def fetch_token_details(symbol, session: requests.Session):
    # Check if token details are already cached
    if symbol in cached_token_details:
        print(f"Token details for {symbol} found in cache.")
        return cached_token_details[symbol]

    print (f"Fetching token details for {symbol}...")
    # Fetch token details for the given symbol
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "tokens",
                "table": "tokens",
                "query": {"symbol": symbol},
                "limit": 1
            }
        }

        request = requests.Request("POST", url=url, json=payload).prepare()
        response = session.send(request, allow_redirects=False)

        print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")

        if response.status_code == 200:
            try:
                data = response.json()
            except ValueError:
                print("Error: Failed to parse JSON response.")
                return {}

            if 'result' in data and data['result']:
                cached_token_details[symbol] = data['result'][0]  # Cache the token details
                save_token_cache()  # Save cache after updating
                return data['result'][0]

        print(f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}")
        if attempt < max_retries - 1:
            sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {symbol}. Skipping.")

    return {}


def fetch_pool_details(token_pair, session: requests.Session):
    # Fetch details of the specified liquidity pool
    for attempt in range(max_retries):
        url = get_node()
        if not url:
            print("Error: No node URL available, exiting fetch_pool_details.")
            return {}

        payload = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "find",
            "params": {
                "contract": "marketpools",
                "table": "pools",
                "query": {"tokenPair": token_pair},
                "limit": 1
            }
        }

        # print(f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}...") # Debugging statement

        try:
            request = requests.Request("POST", url=url, json=payload).prepare()
            response = session.send(request, timeout=10, allow_redirects=False)
            print(f"Received response status code: {response.status_code} for {token_pair} from {url}")

            if response.status_code == 200:
                try:
                    data = response.json()
                    # print(f"Data received for {token_pair}: {data}") # Debugging the received data
                    if 'result' in data and data['result']:
                        print(f"Successfully fetched pool details for {token_pair}")
                        return data['result'][0]
                    else:
                        print(f"Unexpected response format or empty result for {token_pair} from {url}: {data}")
                except ValueError as e:
                    print("Error: Failed to parse JSON response.")
                    print(f"Response content: {response.text}") # Print the actual response content
            else:
                print(f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}")
        except requests.exceptions.RequestException as e:
            print(f"Request exception occurred for {token_pair} from {url}: {e}")

        # Handle retries
        if attempt < max_retries - 1:
            print(f"Retrying after {retry_delay} seconds...")
            sleep(retry_delay)
        else:
            print(f"Max retries exceeded for {token_pair}. Skipping to next.")

    print(f"Returning empty details for {token_pair} after all attempts.")
    return {}


def fetch_liquidity_positions(account_name, session: requests.Session):
    # Fetch liquidity positions for the given account
    url = get_node()
    if not url:
        return {}

    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "find",
        "params": {
            "contract": "marketpools",
            "table": "liquidityPositions",
            "query": {"account": account_name},
            "limit": 1000
        }
    }

    request = requests.Request("POST", url=url, json=payload).prepare()
    response = session.send(request, allow_redirects=False)

    # print("Response Status Code: ", response.status_code)
    # print("Response Content: ", response.text) # Debug

    if response.status_code != 200:
        print(f"Error: Failed to fetch data. Status Code: {response.status_code}")
        return []

    try:
        data = response.json()
    except ValueError:
        print("Error: Failed to parse JSON response.")
        return []

    return data.get('result', [])


def get_filtered_pools(account_name, filter_token):
    with requests.Session() as session:
        # Get and filter pools by the specified token
        positions = fetch_liquidity_positions(account_name, session)

        # Debug: Check fetched positions
        print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")

        if not positions:
            print("No liquidity positions found for this account.")
            return []

        filtered_pools = []

        for position in positions:
            token_pair = position.get('tokenPair', 'Unknown')

            # Debug: Print each position being processed
            print(f"Processing position: {position}")

            # If filter_token is 'ALL', skip filtering; otherwise, check for the token in the pair
            if filter_token.upper() != 'ALL' and filter_token.upper() not in token_pair.upper():
                print(f"Skipping position {token_pair} as it does not match filter token {filter_token.upper()}")
                continue

            # Additional debug to see which positions pass the filter
            print(f"Including position {token_pair} with filter token {filter_token.upper()}")

            # Fetch balances and calculate user share
            shares = float(position.get('shares', '0'))
            pool_details = fetch_pool_details(token_pair, session)
            if not pool_details:
                continue

            total_shares = float(pool_details.get('totalShares', '0'))
            base_quantity = float(pool_details.get('baseQuantity', '0'))
            quote_quantity = float(pool_details.get('quoteQuantity', '0'))

            if total_shares == 0:
                print(f"Skipping position {token_pair} due to total shares being 0.")
                continue

            # Calculate user balances
            user_base_balance = (shares / total_shares) * base_quantity
            user_quote_balance = (shares / total_shares) * quote_quantity

            if ':' in token_pair:
                base_symbol, quote_symbol = token_pair.split(':')
            else:
                base_symbol, quote_symbol = "Unknown", "Unknown"

            # Fetch token details to get precision
            base_token_details = fetch_token_details(base_symbol, session)
            quote_token_details = fetch_token_details(quote_symbol, session)
            base_precision = base_token_details.get('precision', 0)
            quote_precision = quote_token_details.get('precision', 0)

            filtered_pools.append({
                "token_pair": token_pair,
                "base_symbol": base_symbol,
                "quote_symbol": quote_symbol,
                "base_balance": user_base_balance,
                "quote_balance": user_quote_balance,
                "base_precision": base_precision,
                "quote_precision": quote_precision
            })

        # Debug: Print the number of filtered pools
        # print(f"Number of filtered pools: {len(filtered_pools)}")

        return filtered_pools


def main(account_name, filter_token):
    start = time()
    # Load nodes from the external file
    load_nodes()

    # Load cached token details
    load_token_cache()

    # Fetch and print filtered pools
    pools = get_filtered_pools(account_name, filter_token)
    if pools:
        print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
    for pool in pools:
        print(f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
              f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}")

    # Debug: If no pools were printed
    # if not pools:
    #    print("No matching liquidity pools found for the given filter.")
    end = time()
    elapsed_time = end - start
    print(f"Fetching pools required {elapsed_time} seconds")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Fetch Hive-Engine liquidity pools.')
    parser.add_argument('account_name', nargs='?', default=DEFAULT_ACCOUNT_NAME, help='Hive account name to fetch liquidity pools for.')
    parser.add_argument('filter_token', nargs='?', default=DEFAULT_FILTER_TOKEN, help="Token to filter by, or 'ALL' to list all tokens.")

    args = parser.parse_args()

    main(args.account_name, args.filter_token)

H2
H3
H4
3 columns
2 columns
1 column
Join the conversation now
Logo
Center