[ENG/ITA] Python & Hive: My Scripts are Ready! My First Project is Completed :)

cover


La versione italiana si trova sotto quella inglese

The italian version is under the english one


Python & Hive: My Scripts are Ready! My First Project is Completed :)

After almost 30 days, my project is finally finished!

Both scripts I was working on are ready, polished and improved thanks to the precious advice I got from @felixxx, whose knowledge has been crucial to re-write most of the original code I had originally created.

Thanks to his help, I have greatly reduced the use of the BEEM library, which is no longer supported and therefore is no longer very reliable, at least in its more complex functions.

The first of the two scripts I created, i.e. the one that takes care of reading the information from the chain and identifying what I am interested in - posts written in Italian, with a specific tag and with a minimum length -, has at its heart the custom client written by @felixxx, which I have slightly adapted to my needs.

The second script - whose task is to comment and/or upvote the selected posts, as well as to publish a summary post - still relies on BEEM, but now uses only the Hive module to sign the transactions, whereas before it used 3/4 of them... a big difference that makes the code not only much more readable, but also more robust, because there are fewer places where something could go wrong.


And now?

Now that the scripts are ready, I can consider my project completed, at least in terms of ‘core’ features.

It would be possible to go even further and create, for example, a small interface that allows one to interact with it without having any programming knowledge.

The code itself could be tweaked to suit the needs of a real user.

In short, if I wanted to, there would still be many things I could do, but since the code will presumably never be used by anyone other than myself, and also considering that I can work on it only occasionally and so my brain is a bit melted from all of this ‘start-stop-start-tru remember what I was doing-I have no idea-stop-start again and so on’😂 , I guess I'd say it's finished, for now.

Also in terms of testing I must admit that I haven't done much - I don't know how to start or connect to a testnet, so every time I have to upvote, comment and publish random posts, which isn't great - but it all seems to be working and this is already a huge win for me!


Learn, learn, learn!

Working on this project has taught me a lot, even though my code is definitely very low-level, so a real developer will look at it and feel disgusted and think I have just murdered Python... but you have to start somewhere, and creating something gives me much, much more gratification than repeating exhausting exercises with no real use cases.

Looking through Hive I am also starting to get to know users who are much more experienced than I am, whose scripts and suggestions are invaluable to me and give me hints to improve my code or ideas to attempt to create something new.

The end of this small project is therefore only the beginning of a new challenge :)


Finally, here are both scripts!

FIRST SCRIPT


import requests
import time
import datetime
import csv
import os
import re
import json
import markdown
from bs4 import BeautifulSoup
from langdetect import detect_langs, LangDetectException as lang_e
import logging
from logging.handlers import TimedRotatingFileHandler

# logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = TimedRotatingFileHandler("main.log", when="D", interval=1)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

logger.addHandler(handler)


# Check it target language is among the top languages
def text_language(text):
    try:
        languages = detect_langs(text)
    except lang_e:
        logger.error(f"Language error: {lang_e}")
        return False, 0

    num_languages = len(languages)
    languages_sorted = sorted(languages, key=lambda x: x.prob, reverse=True)
    top_languages = (
        languages_sorted[:2] if len(languages_sorted) > 1 else languages_sorted
    )

    contains_target_lang = any(lang.lang == "it" for lang in top_languages)
    return contains_target_lang, num_languages


# Convert text from markdown to HTML and count words
def convert_and_count_words(md_text):
    html = markdown.markdown(md_text)

    soup = BeautifulSoup(html, "html.parser")
    text = soup.get_text()

    words = re.findall(r"\b\w+\b", text)
    return len(words)


# Send request to HIVE API and return response
def get_response(data, url, session: requests.Session):
    request = requests.Request("POST", url=url, data=data).prepare()
    response = session.send(request, allow_redirects=False)
    return response


# Get properties and find last block
def get_properties(url, session: requests.Session):
    data = '{"jsonrpc":"2.0", "method":"database_api.get_dynamic_global_properties", "id":1}'
    response = get_response(data, url, session)
    properties = response.json()["result"]
    return properties


def get_ops_in_block(num, url, session: requests.Session):
    data = f'{{"jsonrpc":"2.0", "method":"condenser_api.get_ops_in_block", "params":[{num},false], "id":1}}'
    response = get_response(data, url, session)
    ops_in_block = response.json()["result"]
    return ops_in_block


def get_post(ops):
    comment_list = []
    for op in ops:
        if (
            op["op"][0] == "comment" and op["op"][1]["parent_author"] == ""
        ):  # Posts, not comments

            try:
                json_metadata = json.loads(op["op"][1]["json_metadata"])
            except (json.JSONDecodeError, KeyError) as e:
                logger.error(f"JSON decode error or missing key: {e}")
                continue

            # Check if there's the tag we are looking for
            if "ita" not in json_metadata.get("tags", []):
                continue

            # Check post language
            valid_language, lang_num = text_language(op["op"][1]["body"])

            if valid_language == False:
                continue

            # Check post length
            word_count = convert_and_count_words(op["op"][1]["body"])

            if (lang_num == 1 and word_count < 400) or (
                lang_num > 1 and word_count < 800
            ):
                continue

            author = op["op"][1]["author"]
            permlink = op["op"][1]["permlink"]
            link = f"https://peakd.com/@{author}/{permlink}"
            comment_list.append(link)
            logger.info(f"Found eligible post: {link}")
    return comment_list


def load_last_block():
    if os.path.exists("last_block.txt"):
        with open("last_block.txt", "r") as file:
            return int(file.read())
    return None


def save_last_block(block_num):
    with open("last_block.txt", "w") as file:
        file.write(str(block_num))


# return the eligible posts in a list
def get_post_list(url):
    with requests.Session() as session:
        last_hive_block_num = get_properties(url, session)[
            "last_irreversible_block_num"
        ]
        last_block_num = load_last_block()
        if last_block_num is None:
            last_block_num = last_hive_block_num
        if int(last_block_num) == int(last_hive_block_num):
            time.sleep(60)  # always stay behind the last Hive block
        ops = get_ops_in_block(last_block_num, url, session)
        post_list = get_post(ops)
        save_last_block(int(last_block_num) + 1)
        return post_list


# Get date and generate csv file name
def get_filename():
    current_date = datetime.datetime.now().strftime("%Y-%m-%d")
    return f"urls_{current_date}.csv"


def main():
    url = "https://api.deathwing.me"

    filename = get_filename()
    last_filename = filename
    i = 1

    if not os.path.exists(filename):
        with open(filename, "w", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["ID", "URL", "Upvote_Value"])

    logger.info(f"Current file: {filename}")

    while True:
        post_list = get_post_list(url)

        current_filename = get_filename()

        if current_filename != last_filename:
            filename = current_filename
            last_filename = current_filename
            i = 1  # Reset counter
            logger.info(f"Started writing to a new file: {filename}")

        # Add posts to the current file
        with open(filename, "a", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            for post in post_list:
                writer.writerow([i, post, ""])
                i += 1

        post_list.clear()


if __name__ == "__main__":

    main()


SECOND SCRIPT


#!/usr/bin/env python3
"""A script to upvote and comment posts from a .csv list"""
import os
import shutil
import jinja2
import configparser
import time
import re
import logging
from logging.handlers import TimedRotatingFileHandler
import pandas as pd
from beem import Hive, exceptions as beem_e
from beemapi import exceptions as beemapi_e


# Global configuration
config = configparser.ConfigParser()
config.read("config")

ENABLE_COMMENTS = config["Global"]["ENABLE_COMMENTS"] == "True"
ENABLE_UPVOTES = config["Global"]["ENABLE_UPVOTES"] == "True"
ENABLE_POST = config["Global"]["ENABLE_POST"] == "True"

ACCOUNT_NAME = config["Global"]["ACCOUNT_NAME"]
ACCOUNT_POSTING_KEY = config["Global"]["ACCOUNT_POSTING_KEY"]
HIVE_API_NODE = config["Global"]["HIVE_API_NODE"]
hive = Hive(node=[HIVE_API_NODE], keys=[config["Global"]["ACCOUNT_POSTING_KEY"]])

# Logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = TimedRotatingFileHandler("main.log", when="D", interval=1)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

logger.addHandler(handler)

logger.info("Configuration loaded:")
for section in config.keys():
    for key in config[section].keys():
        if "_key" in key:
            continue  # don't log posting keys
        logger.info(f"{section}, {key}, {config[section][key]}")


# Markdown template for comment
comment_curation_template = jinja2.Template(
    open(os.path.join("template", "comment_curation.template"), "r").read()
)
post_template = jinja2.Template(
    open(os.path.join("template", "post.template"), "r").read()
)


def give_upvote(vote_weight, voter, authorperm):
    print(f"Upvoting with weight {vote_weight}!")
    try:
        hive.vote(weight=vote_weight, account=voter, identifier=authorperm)
    except beem_e.VotingInvalidOnArchivedPost:
        logger.error("Post is too old to be upvoted")
    except beemapi_e.UnhandledRPCError:
        logger.error("Vote changed too many times")
    time.sleep(3)  # sleep 3s


def post_comment(author, replier, authorperm):
    print("Commenting!")
    comment_body = comment_curation_template.render(
        target_account=author, replier=replier
    )
    hive.post(
        title="",
        body=comment_body,
        author=replier,
        reply_identifier=authorperm,
    )
    time.sleep(3)  # sleep 3s


def post(posts):
    title = TITLE
    post_body = post_template.render(author=ACCOUNT_NAME, posts=posts)
    author = ACCOUNT_NAME
    tags = TAGS
    category = CATEGORY

    hive.post(
        title=title,
        body=post_body,
        author=author,
        permlink=None,
        json_metadata=json.dumps({"app": "leothreads/0.3", "tags": tags}),
        category=category,
    )

    logger.info(f"Post published with title: {title}")


def process_file(file_to_process):
    try:
        df = pd.read_csv(file_to_process)

        posts = []

        for _, row in df.iterrows():
            url = row["URL"]
            vote_weight = row["Upvote_Value"]
            print(f"Work in progress on {url}...")

            if pd.isna(vote_weight):
                print(f"No upvote value for {url}, skipping...")
                continue

            try:
                vote_weight = int(vote_weight)
            except ValueError:
                print(f"Invalid vote weight: {vote_weight}")
                continue

            if (vote_weight < 1) or (vote_weight > 100):
                print(f"Invalid vote weight: {vote_weight}%")
                continue

            # data of the post to be upvoted and/or replied
            permlink = re.search(r".+@([\w.-]+)/([\w-]+)", url)
            if not permlink:
                logger.error(f"Invalid URL format: {url}")
                continue
            post_author = permlink.group(1)
            permlink = permlink.group(2)
            post_url = f"{post_author}/{permlink}"
            logger.info(f"{post_author} is getting a {vote_weight}% upvote!")

            posts.append(
                {
                    "author": post_author,
                    "upvote_value": vote_weight,
                    "post_link": post_url,
                }
            )

            # leave an upvote
            if ENABLE_UPVOTES:
                give_upvote(vote_weight, ACCOUNT_NAME, post_url)
            else:
                print("Upvoting is disabled")

            # leave a comment
            if ENABLE_COMMENTS:
                post_comment(author, ACCOUNT_NAME, post_url)
            else:
                print("Posting is disabled")

    except pd.errors.EmptyDataError:
        logger.error(f"File {file_to_process} is empty. Skipping...")

    finally:
        # Once done, move file in the directory "posts_done"
        directory_done = "posts_done"
        destination = os.path.join(directory_done, os.path.basename(file_to_process))
        shutil.move(file_to_process, destination)
        logger.info(
            f"File {os.path.basename(file_to_process)} moved to '{directory_done}' directory."
        )
        return posts


def main():

    directory_to_do = "posts_to_do"

    file_to_process = None

    for filename in os.listdir(directory_to_do):
        if filename.endswith(".csv"):  # Only look for csv files
            file_to_process = os.path.join(directory_to_do, filename)
            break  # One file at a time

    if file_to_process:
        posts = process_file(file_to_process)
    else:
        logger.info("No files found in the 'urls_to_do' directory.")

    if posts and ENABLE_POST:
        post(posts)


if __name__ == "__main__":

    main()


I tag @gamer00 because he was curious to see what I was working on... and so now he can be horrified 🤣

Someday I'll be able to write decent, neat and readable code... but this is not the day! ahahah

I'm also tagging @slobberchops because he encouraged me to move away from BEEM, thus giving me the initial push I needed to commit to better study how Hive API works.

@felixxx I have already tagged him several times... but I'm tagging him once more because the heart of my project rests on his work, and I feel it's right to repeat it :)


cover made with Bing AI and edited with GIMP

to support the #OliodiBalena community, @balaenoptera is 3% beneficiary of this post


If you've read this far, thank you! If you want to leave an upvote, a reblog, a follow, a comment... well, any sign of life is really much appreciated!


Versione italiana

Italian version


cover

Python & Hive: i Miei Scripts sono Pronti! Il Mio Primo Progetto è Completo :)

Dopo quasi 30 giorni finalmente il mio progetto può considerarsi terminato!

Entrambi gli scripts su cui stavo lavorando sono pronti, rifiniti e migliorati grazie ai preziosi insegnamenti ricevuti da @felixxx, le cui conoscenze sono state fondamentali per riscrivere buona parte del codice originario che avevo creato.

Grazie al suo aiuto ho ridotto tantissimo l'utilizzo della libreria BEEM, non più supportata e quindi sempre meno affidabile, almeno nelle sue funzioni più complesse.

Il primo dei due scripts che ho creato, ossia quello che si occupa di leggere le informazioni presenti sulla chain ed individuare ciò che mi interessa - posts scritti in lingua italiana, dotati di un tag specifico ed aventi una lunghezza minima -, ha al suo cuore il client custom scritto da @felixxx, che ho leggermente adattato alle mie esigenze.

Il secondo script - il cui compito consiste nel commentare e/o upvotare i posts selezionati, nonchè pubblicare un post di riepilogo - si appoggia invece ancora a BEEM, ma adesso utilizza solamente il modulo Hive per firmare le transazioni, mentre prima ne utilizzava 3/4... una bella differenza che rende il codice oltre che molto più leggibile, anche più robusto, perchè sono meno i punti in cui qualcosa potrebbe andare storto.


E adesso?

Adesso che gli scripts sono pronti posso considerare concluso il mio progetto, almeno dal punto di vista delle funzionalità "core".

Volendo sarebbe possibile fare ancora di più e creare, ad esempio, una piccola interfaccia che consenta di interagire con lo stesso anche senza avere nessuna conoscenza in ambito di programmazione.

Il codice stesso potrebbe essere ritoccato per adeguarsi alle esigenze di un reale utilizzatore.

Insomma, volendo le cose che potrei fare sarebbero ancora tante ma, dato che presumibilmente il codice non sarà mai utilizzato da nessuno di diverso da me, e considerato anche che dovendoci lavorare quando posso ho ormai il cervello un po' fuso e comincio a non poterlo vedere più 😂 , per ora direi di considerarlo terminato.

Anche a livello di test ammetto di non averne fatti di particolarmente approfonditi - non so come avviare o connettermi ad una testnet, per cui ogni volta devo uvpotare, commentare e pubblicare post a caso, il che non è il massimo - ma di base mi sembra tutto funzionante e questo per me è già un'enorme vittoria!


Imparare, imparare, imparare!

Lavorare a questo progetto mi ha fatto imparare tanto, anche se il mio codice è sicuramente di livello bassissimo, per cui un vero sviluppatore guardandolo avrà il voltastomaco e penserà che ho appena assassinato Python... ma da qualche parte bisogna pur iniziare e creare qualcosa mi dà molta, molta più soddisfazione che ripetere allo sfinimento esercizi completamente fini a se stessi.

Cercando su Hive sto poi cominciando a conoscere utenti molto più esperti di me, i cui scripts e suggerimenti sono per me preziosissimi e mi forniscono spunti per migliorare il mio codice o idee per provare a creare qualcosa di nuovo.

La fine di questo piccolo progetto rappresenta perciò solamente l'inizio di una nuova sfida :)


Per concludere, ecco entrambi gli scripts!

PRIMO SCRIPT


import requests
import time
import datetime
import csv
import os
import re
import json
import markdown
from bs4 import BeautifulSoup
from langdetect import detect_langs, LangDetectException as lang_e
import logging
from logging.handlers import TimedRotatingFileHandler

# logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = TimedRotatingFileHandler("main.log", when="D", interval=1)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

logger.addHandler(handler)


# Check it target language is among the top languages
def text_language(text):
    try:
        languages = detect_langs(text)
    except lang_e:
        logger.error(f"Language error: {lang_e}")
        return False, 0

    num_languages = len(languages)
    languages_sorted = sorted(languages, key=lambda x: x.prob, reverse=True)
    top_languages = (
        languages_sorted[:2] if len(languages_sorted) > 1 else languages_sorted
    )

    contains_target_lang = any(lang.lang == "it" for lang in top_languages)
    return contains_target_lang, num_languages


# Convert text from markdown to HTML and count words
def convert_and_count_words(md_text):
    html = markdown.markdown(md_text)

    soup = BeautifulSoup(html, "html.parser")
    text = soup.get_text()

    words = re.findall(r"\b\w+\b", text)
    return len(words)


# Send request to HIVE API and return response
def get_response(data, url, session: requests.Session):
    request = requests.Request("POST", url=url, data=data).prepare()
    response = session.send(request, allow_redirects=False)
    return response


# Get properties and find last block
def get_properties(url, session: requests.Session):
    data = '{"jsonrpc":"2.0", "method":"database_api.get_dynamic_global_properties", "id":1}'
    response = get_response(data, url, session)
    properties = response.json()["result"]
    return properties


def get_ops_in_block(num, url, session: requests.Session):
    data = f'{{"jsonrpc":"2.0", "method":"condenser_api.get_ops_in_block", "params":[{num},false], "id":1}}'
    response = get_response(data, url, session)
    ops_in_block = response.json()["result"]
    return ops_in_block


def get_post(ops):
    comment_list = []
    for op in ops:
        if (
            op["op"][0] == "comment" and op["op"][1]["parent_author"] == ""
        ):  # Posts, not comments

            try:
                json_metadata = json.loads(op["op"][1]["json_metadata"])
            except (json.JSONDecodeError, KeyError) as e:
                logger.error(f"JSON decode error or missing key: {e}")
                continue

            # Check if there's the tag we are looking for
            if "ita" not in json_metadata.get("tags", []):
                continue

            # Check post language
            valid_language, lang_num = text_language(op["op"][1]["body"])

            if valid_language == False:
                continue

            # Check post length
            word_count = convert_and_count_words(op["op"][1]["body"])

            if (lang_num == 1 and word_count < 400) or (
                lang_num > 1 and word_count < 800
            ):
                continue

            author = op["op"][1]["author"]
            permlink = op["op"][1]["permlink"]
            link = f"https://peakd.com/@{author}/{permlink}"
            comment_list.append(link)
            logger.info(f"Found eligible post: {link}")
    return comment_list


def load_last_block():
    if os.path.exists("last_block.txt"):
        with open("last_block.txt", "r") as file:
            return int(file.read())
    return None


def save_last_block(block_num):
    with open("last_block.txt", "w") as file:
        file.write(str(block_num))


# return the eligible posts in a list
def get_post_list(url):
    with requests.Session() as session:
        last_hive_block_num = get_properties(url, session)[
            "last_irreversible_block_num"
        ]
        last_block_num = load_last_block()
        if last_block_num is None:
            last_block_num = last_hive_block_num
        if int(last_block_num) == int(last_hive_block_num):
            time.sleep(60)  # always stay behind the last Hive block
        ops = get_ops_in_block(last_block_num, url, session)
        post_list = get_post(ops)
        save_last_block(int(last_block_num) + 1)
        return post_list


# Get date and generate csv file name
def get_filename():
    current_date = datetime.datetime.now().strftime("%Y-%m-%d")
    return f"urls_{current_date}.csv"


def main():
    url = "https://api.deathwing.me"

    filename = get_filename()
    last_filename = filename
    i = 1

    if not os.path.exists(filename):
        with open(filename, "w", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(["ID", "URL", "Upvote_Value"])

    logger.info(f"Current file: {filename}")

    while True:
        post_list = get_post_list(url)

        current_filename = get_filename()

        if current_filename != last_filename:
            filename = current_filename
            last_filename = current_filename
            i = 1  # Reset counter
            logger.info(f"Started writing to a new file: {filename}")

        # Add posts to the current file
        with open(filename, "a", newline="", encoding="utf-8") as csvfile:
            writer = csv.writer(csvfile)
            for post in post_list:
                writer.writerow([i, post, ""])
                i += 1

        post_list.clear()


if __name__ == "__main__":

    main()


SECONDO SCRIPT


#!/usr/bin/env python3
"""A script to upvote and comment posts from a .csv list"""
import os
import shutil
import jinja2
import configparser
import time
import re
import logging
from logging.handlers import TimedRotatingFileHandler
import pandas as pd
from beem import Hive, exceptions as beem_e
from beemapi import exceptions as beemapi_e


# Global configuration
config = configparser.ConfigParser()
config.read("config")

ENABLE_COMMENTS = config["Global"]["ENABLE_COMMENTS"] == "True"
ENABLE_UPVOTES = config["Global"]["ENABLE_UPVOTES"] == "True"
ENABLE_POST = config["Global"]["ENABLE_POST"] == "True"

ACCOUNT_NAME = config["Global"]["ACCOUNT_NAME"]
ACCOUNT_POSTING_KEY = config["Global"]["ACCOUNT_POSTING_KEY"]
HIVE_API_NODE = config["Global"]["HIVE_API_NODE"]
hive = Hive(node=[HIVE_API_NODE], keys=[config["Global"]["ACCOUNT_POSTING_KEY"]])

# Logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = TimedRotatingFileHandler("main.log", when="D", interval=1)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

logger.addHandler(handler)

logger.info("Configuration loaded:")
for section in config.keys():
    for key in config[section].keys():
        if "_key" in key:
            continue  # don't log posting keys
        logger.info(f"{section}, {key}, {config[section][key]}")


# Markdown template for comment
comment_curation_template = jinja2.Template(
    open(os.path.join("template", "comment_curation.template"), "r").read()
)
post_template = jinja2.Template(
    open(os.path.join("template", "post.template"), "r").read()
)


def give_upvote(vote_weight, voter, authorperm):
    print(f"Upvoting with weight {vote_weight}!")
    try:
        hive.vote(weight=vote_weight, account=voter, identifier=authorperm)
    except beem_e.VotingInvalidOnArchivedPost:
        logger.error("Post is too old to be upvoted")
    except beemapi_e.UnhandledRPCError:
        logger.error("Vote changed too many times")
    time.sleep(3)  # sleep 3s


def post_comment(author, replier, authorperm):
    print("Commenting!")
    comment_body = comment_curation_template.render(
        target_account=author, replier=replier
    )
    hive.post(
        title="",
        body=comment_body,
        author=replier,
        reply_identifier=authorperm,
    )
    time.sleep(3)  # sleep 3s


def post(posts):
    title = TITLE
    post_body = post_template.render(author=ACCOUNT_NAME, posts=posts)
    author = ACCOUNT_NAME
    tags = TAGS
    category = CATEGORY

    hive.post(
        title=title,
        body=post_body,
        author=author,
        permlink=None,
        json_metadata=json.dumps({"app": "leothreads/0.3", "tags": tags}),
        category=category,
    )

    logger.info(f"Post published with title: {title}")


def process_file(file_to_process):
    try:
        df = pd.read_csv(file_to_process)

        posts = []

        for _, row in df.iterrows():
            url = row["URL"]
            vote_weight = row["Upvote_Value"]
            print(f"Work in progress on {url}...")

            if pd.isna(vote_weight):
                print(f"No upvote value for {url}, skipping...")
                continue

            try:
                vote_weight = int(vote_weight)
            except ValueError:
                print(f"Invalid vote weight: {vote_weight}")
                continue

            if (vote_weight < 1) or (vote_weight > 100):
                print(f"Invalid vote weight: {vote_weight}%")
                continue

            # data of the post to be upvoted and/or replied
            permlink = re.search(r".+@([\w.-]+)/([\w-]+)", url)
            if not permlink:
                logger.error(f"Invalid URL format: {url}")
                continue
            post_author = permlink.group(1)
            permlink = permlink.group(2)
            post_url = f"{post_author}/{permlink}"
            logger.info(f"{post_author} is getting a {vote_weight}% upvote!")

            posts.append(
                {
                    "author": post_author,
                    "upvote_value": vote_weight,
                    "post_link": post_url,
                }
            )

            # leave an upvote
            if ENABLE_UPVOTES:
                give_upvote(vote_weight, ACCOUNT_NAME, post_url)
            else:
                print("Upvoting is disabled")

            # leave a comment
            if ENABLE_COMMENTS:
                post_comment(author, ACCOUNT_NAME, post_url)
            else:
                print("Posting is disabled")

    except pd.errors.EmptyDataError:
        logger.error(f"File {file_to_process} is empty. Skipping...")

    finally:
        # Once done, move file in the directory "posts_done"
        directory_done = "posts_done"
        destination = os.path.join(directory_done, os.path.basename(file_to_process))
        shutil.move(file_to_process, destination)
        logger.info(
            f"File {os.path.basename(file_to_process)} moved to '{directory_done}' directory."
        )
        return posts


def main():

    directory_to_do = "posts_to_do"

    file_to_process = None

    for filename in os.listdir(directory_to_do):
        if filename.endswith(".csv"):  # Only look for csv files
            file_to_process = os.path.join(directory_to_do, filename)
            break  # One file at a time

    if file_to_process:
        posts = process_file(file_to_process)
    else:
        logger.info("No files found in the 'urls_to_do' directory.")

    if posts and ENABLE_POST:
        post(posts)


if __name__ == "__main__":

    main()


Taggo @gamer00 perchè era curioso di vedere a cosa stavo lavorando... e così ora può restarne traumatizzato 🤣

Un giorno riuscirò a scrivere del codice decente, ordinato e leggibile... ma non è questo il giorno! ahahah

Taggo anche @slobberchops perchè mi ha esortato ad affrancarmi da BEEM, dandomi la spinta iniziale di cui avevo bisogno per decidermi a cercare di studiare meglio il funzionamento delle API di Hive.

@felixxx l'ho già taggato varie volte... ma lo taggo una volta in più perchè il cuore del mio progetto poggia sul suo lavoro, e mi sembra giusto ribadirlo :)


cover realizzata con Bing AI ed editata con GIMP

a supporto della community #OliodiBalena, il 3% delle ricompense di questo post va a @balaenoptera

Se sei arrivato a leggere fin qui, grazie! Se hai voglia di lasciare un upvote, un reblog, un follow, un commento... be', un qualsiasi segnale di vita, in realtà, è molto apprezzato!

H2
H3
H4
3 columns
2 columns
1 column
Join the conversation now
Logo
Center