Hmm... interesting. It appears I didn't know how to use the session that I opened in 'main', and only used the requests.post()
function instead of session.post()
, this is what made my script slower. So my script still used the old way of fetching data, while it had opened a session for faster communication. I timed my old script with your enhancements:
➜ time python arc7ic_fetch.py spswhale ALL
...
Fetching pools required 10.768758773803711 seconds
real 0m10.913s
user 0m0.196s
sys 0m0.017s
My current script, that I spruced up with your enhancements:
➜ time python fetch_liquidity_pools-add_session.py spswhale ALL
...
real 0m0.336s
user 0m0.150s
sys 0m0.018s
Oh, I notice you had added a timer inside the main loop, nice touch. I am lazy and didn't bother, since I could 'time' it in the terminal. But yep, there were some repeating reuquests in the original script that made it way slower. Here's the current one:
# fetch_liquidity_pools.py
import json
import os
import argparse
import requests
# from time import sleep
import time
from random import choice # To randomly choose a node from the list
# Hive-Engine API Node
# HIVE_ENGINE_NODE = 'https://api2.hive-engine.com/rpc/contracts'
NODES_FILE = "nodes.json"
retry_delay = 5 # seconds to wait between retries
max_retries = 3 # Maximum number of retries
# Default values
DEFAULT_ACCOUNT_NAME = "hive-engine" # Replace with your actual Hive account name
DEFAULT_FILTER_TOKEN = (
"BTC" # Replace with the desired token to filter, or use 'ALL' to list all tokens
)
# File to store token details with precision
TOKEN_CACHE_FILE = "token_details_cache.json"
cached_token_details = {}
hive_engine_nodes = []
def load_nodes():
global hive_engine_nodes
# Check if the nodes file exists
if os.path.exists(NODES_FILE):
try:
with open(NODES_FILE, "r") as f:
hive_engine_nodes = json.load(f)
print("Loaded Hive-Engine nodes from file.")
except (ValueError, IOError):
print(
"Error: Hive-Engine nodes file is corrupted or not readable. Please re-create 'nodes.json' with the list of nodes."
)
else:
print(
"Error: Hive-Engine nodes file not found. Please create 'nodes.json' with the list of nodes."
)
hive_engine_nodes = [] # Ensure nodes list is empty on error
def get_node():
# Choose a random node from the list
if hive_engine_nodes:
selected_node = choice(hive_engine_nodes)
print(f"Using Hive-Engine node: {selected_node}") # Print the current node
return selected_node
else:
print("Error: No Hive-Engine nodes available.")
return None
def load_token_cache():
global cached_token_details
# Check if the token cache file exists
if os.path.exists(TOKEN_CACHE_FILE):
try:
with open(TOKEN_CACHE_FILE, "r") as f:
cached_token_details = json.load(f)
print("Loaded cached token details from file.")
except (ValueError, IOError):
print(
"Error: Failed to load token cache file. Starting with an empty cache."
)
def save_token_cache():
# Save the current token details cache to a file
try:
with open(TOKEN_CACHE_FILE, "w") as f:
json.dump(cached_token_details, f)
print("Saved token details to cache file.")
except IOError:
print("Error: Failed to save token cache file.")
def fetch_token_details(symbol, session):
# Check if token details are already cached
if symbol in cached_token_details:
# print(f"Token details for {symbol} found in cache.")
return cached_token_details[symbol]
print(f"Fetching token details for {symbol}...")
# Fetch token details for the given symbol
for attempt in range(max_retries):
url = get_node()
if not url:
return {}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "tokens",
"table": "tokens",
"query": {"symbol": symbol},
"limit": 1,
},
}
response = session.post(url, json=payload)
# print(f"Attempt {attempt+1}: Status Code: {response.status_code}, Response: {response.text}")
if response.status_code == 200:
try:
data = response.json()
except ValueError:
print("Error: Failed to parse JSON response.")
return {}
if "result" in data and data["result"]:
cached_token_details[symbol] = data["result"][
0
] # Cache the token details
save_token_cache() # Save cache after updating
return data["result"][0]
print(
f"Error: Failed to fetch token details for {symbol}. Status Code: {response.status_code}"
)
if attempt < max_retries - 1:
time.sleep(retry_delay)
else:
print(f"Max retries exceeded for {symbol}. Skipping.")
return {}
def fetch_pool_details(token_pair):
# Fetch details of the specified liquidity pool
for attempt in range(max_retries):
url = get_node()
if not url:
print("Error: No node URL available, exiting fetch_pool_details.")
return {}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "marketpools",
"table": "pools",
"query": {"tokenPair": token_pair},
"limit": 1,
},
}
print(
f"Attempt {attempt + 1} to fetch pool details for {token_pair} from {url}..."
) # Debugging statement
try:
response = session.post(
url, json=payload, timeout=10
) # Set a timeout for the request
# print(
# f"Received response status code: {response.status_code} for {token_pair} from {url}"
# )
if response.status_code == 200:
try:
data = response.json()
print(
f"Data received for {token_pair}: {data}"
) # Debugging the received data
if "result" in data and data["result"]:
print(f"Successfully fetched pool details for {token_pair}")
return data["result"][0]
else:
print(
f"Unexpected response format or empty result for {token_pair} from {url}: {data}"
)
except ValueError as e:
print(f"Error: Failed to parse JSON response: {e}.")
# print(f"Response content: {response.text}") # Print the actual response content
else:
print(
f"Error: Failed to fetch pool details for {token_pair}. Status Code: {response.status_code}"
)
except requests.exceptions.RequestException as e:
print(f"Request exception occurred for {token_pair} from {url}: {e}")
# Handle retries
if attempt < max_retries - 1:
print(f"Retrying after {retry_delay} seconds...")
time.sleep(retry_delay)
else:
print(f"Max retries exceeded for {token_pair}. Skipping to next.")
print(f"Returning empty details for {token_pair} after all attempts.")
return {}
def fetch_all_pools(session):
url = get_node()
if not url:
return []
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "marketpools",
"table": "pools",
"query": {},
"limit": 1000, # Adjust limit based on how many pools exist
},
}
try:
response = session.post(url, json=payload, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get("result", [])
else:
print(
f"Error: Failed to fetch all pools. Status Code: {response.status_code}"
)
return []
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
return []
def fetch_liquidity_positions(account_name, retries=5, backoff_factor=5):
# Fetch liquidity positions for the given account
url = get_node()
if not url:
return {}
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "find",
"params": {
"contract": "marketpools",
"table": "liquidityPositions",
"query": {"account": account_name},
"limit": 1000,
},
}
for attempt in range(retries):
try:
response = session.post(url, json=payload, timeout=10)
# print("Response Status Code: ", response.status_code)
# Print the entire raw response text for debugging purposes
# print("Raw response text: ", response.text)
if response.status_code == 200:
try:
data = response.json()
return data.get("result", [])
except ValueError:
print("Error: Failed to parse JSON response.")
return []
else:
print(
f"Error: Failed to fetch data. Status Code: {response.status_code}"
)
return []
except requests.exceptions.ConnectionError as e:
print(
f"Attempt {attempt + 1}: Connection error: {e}, retrying in {backoff_factor} seconds..."
)
time.sleep(backoff_factor)
except requests.exceptions.Timeout as e:
print(
f"Attempt {attempt + 1}: Request timed out: {e}, retrying in {backoff_factor} seconds..."
)
time.sleep(backoff_factor)
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1}: An error occurred: {e}")
return []
print(f"Max retries exceeded. Could not fetch data for account: {account_name}")
return []
def get_filtered_pools(account_name, filter_token, session):
# Fetch all pools in one go
all_pools = fetch_all_pools(session)
# Check if pools were fetched succesfully
if not all_pools:
print("Error: Failed to fetch all pools.")
return []
pool_dict = {pool["tokenPair"]: pool for pool in all_pools}
# Get and filter pools by the specified token
positions = fetch_liquidity_positions(account_name)
# Debug: Check fetched positions
print(f"Fetched {len(positions)} liquidity positions for account {account_name}.")
# print("Test print of all the fetched positions:")
# print(json.dumps(positions, indent=4)) # Pretty-print the positions
if not positions:
print("No liquidity positions found for this account.")
return []
filtered_pools = []
for position in positions:
token_pair = position.get("tokenPair", "Unknown")
# Debug: Print each position being processed
# print(f"Processing position: {position}")
# If filter_token is 'ALL', skip filtering; otherwise, check for the token in the pair
if (
filter_token.upper() != "ALL"
and filter_token.upper() not in token_pair.upper()
):
# print(f"Skipping position {token_pair} as it does not match filter token {filter_token.upper()}")
continue
# Additional debug to see which positions pass the filter
# print(
# f"Including position {token_pair} with filter token {filter_token.upper()}"
# )
# Fetch the pool details from the all_pools dictionary
pool_details = pool_dict.get(token_pair)
if not pool_details:
print(f"Warning: No pool details found for {token_pair}")
continue
# Calculate user balances
shares = float(position.get("shares", "0"))
base_quantity = float(pool_details.get("baseQuantity", "0"))
quote_quantity = float(pool_details.get("quoteQuantity", "0"))
total_shares = float(pool_details.get("totalShares", "0"))
if total_shares == 0:
print(f"Skipping position {token_pair} due to total shares being 0.")
continue
# Calculate user balances
user_base_balance = (shares / total_shares) * base_quantity
user_quote_balance = (shares / total_shares) * quote_quantity
if ":" in token_pair:
base_symbol, quote_symbol = token_pair.split(":")
else:
base_symbol, quote_symbol = "Unknown", "Unknown"
# Fetch token details to get precision
base_token_details = fetch_token_details(base_symbol, session)
quote_token_details = fetch_token_details(quote_symbol, session)
base_precision = base_token_details.get("precision", 0)
quote_precision = quote_token_details.get("precision", 0)
filtered_pools.append(
{
"token_pair": token_pair,
"base_symbol": base_symbol,
"quote_symbol": quote_symbol,
"base_balance": user_base_balance,
"quote_balance": user_quote_balance,
"base_precision": base_precision,
"quote_precision": quote_precision,
}
)
# Debug: Print the number of filtered pools
print(f"Number of filtered pools: {len(filtered_pools)}")
return filtered_pools
def main(account_name, filter_token):
# Load nodes from the external file
load_nodes()
# Load cached token details
load_token_cache()
# Create a session object
with requests.Session() as session:
# Fetch and print filtered pools
pools = get_filtered_pools(account_name, filter_token, session)
print(f"\nLiquidity Pool Positions with {filter_token.upper()} token:")
for pool in pools:
print(
f"Token Pair: {pool['token_pair']} | Base Balance: {pool['base_balance']:.{pool['base_precision']}f} {pool['base_symbol']} | "
f"Quote Balance: {pool['quote_balance']:.{pool['quote_precision']}f} {pool['quote_symbol']}"
)
# Debug: If no pools were printed
if not pools:
print("No matching liquidity pools found for the given filter.")
if __name__ == "__main__":
# When run as a standalone script
session = requests.Session()
try:
parser = argparse.ArgumentParser(
description="Fetch Hive-Engine liquidity pools."
)
parser.add_argument(
"account_name",
nargs="?",
default=DEFAULT_ACCOUNT_NAME,
help="Hive account name to fetch liquidity pools for.",
)
parser.add_argument(
"filter_token",
nargs="?",
default=DEFAULT_FILTER_TOKEN,
help="Token to filter by, or 'ALL' to list all tokens.",
)
args = parser.parse_args()
main(args.account_name, args.filter_token)
finally:
session.close()
Edit: I noticed that I still hadn't added the 'session' in the arguments of the functions. So I did, and updated the script here to reflect that. The problem is, that now it's slower again. The 'broken' script was a tenth of a second faster, but this one is correctly establishing the session. According to ChatGPT, I might as well not use 'session' anymore, since I only make one request. Session only makes things faster if you do multiple calls to the API. But anyway, it was a fun experiment.
Oh, and I forgot to comment about the main script I'm working on. It's actually a balancer for liquidity pairs. I might write about it as soon as I iron out the wrinkles.
RE: [ENG/ITA] Python & Hive: My Scripts are Ready! My First Project is Completed :)