Some profanity prevention stuff, breadmixer rewrite

This commit is contained in:
2025-12-27 20:47:34 -05:00
parent 6e93022cb3
commit c4187c1590
7 changed files with 402 additions and 2 deletions

198
bin/breadbot_common.py Normal file
View File

@@ -0,0 +1,198 @@
import sqlite3
import mysql.connector
import subprocess
import string
import random
from pathlib import Path
from datetime import datetime
class Database():
def __init__(self):
pass
def close(self):
pass
def query(self, query: str, parameters: list=None) -> tuple[int, list[tuple]]:
pass
def select(self, table: str, columns: list[str]=None, where: list[dict]=None, values: list=None) -> list[tuple]:
query_string = "SELECT {columns} FROM {table}{where}".format(
columns = "*" if columns is None else ",".join(columns),
table = table,
where = self.__generate_basic_where_clause(where) if not where is None else ""
)
return self.query(query_string, [element["value"] for element in where] if not where is None else None, values)[1]
def insert(self, table: str, columns: list[str], values: list) -> int:
query_string = "INSERT INTO {table} ({columns}) VALUES ({values})".format(
table = table,
columns = ",".join(columns),
values = ("?," * len(values))[:-1]
)
return self.query(query_string, values)[0]
def update(self, table: str, columns: list[str], values: list, where: list[dict]=None) -> int:
query_string = "UPDATE {table} SET {set_rules}{where}".format(
table = table,
set_rules = ",".join([element + "=?" for element in columns]),
where = self.__generate_basic_where_clause(where) if not where is None else ""
)
return self.query(query_string, values)[0]
def delete(self, table: str, values: list, where: list[dict]=None) -> int:
query_string = "DELETE FROM {table}{where}".format(
table = table,
where = self.__generate_basic_where_clause(where) if not where is None else ""
)
return self.query(query_string, values)[0]
# TODO This probably breaks with MySQL, because question mark bad, maybe just have MySQL class override this
def __generate_basic_where_clause(self, where: list[dict]):
return " WHERE {clauses}".format(
clauses = "".join([
element["name"] + " " + element["compare"] + " ?" + (" " + element["boolean_op"] + " " if "boolean_op" in element else "")
for element in where
])
)
class SQLite(Database):
def __init__(self, db_name: str):
super(Database, self).__init__()
self.db = sqlite3.connect(db_name)
self.db.autocommit = True
def close(self):
self.db.close()
def query(self, query: str, parameters: list=None) -> tuple[int, list[tuple]]:
if parameters is None:
cursor = self.db.execute(query)
else:
cursor = self.db.execute(query, parameters)
if query.casefold().startswith("SELECT".casefold()):
rows = list(cursor)
return (len(rows), rows)
elif query.casefold().startswith("INSERT".casefold()):
return (cursor.lastrowid, None)
else:
return (cursor.rowcount, None)
class MySQL(Database):
def __init__(self, host: str, user: str, password: str, db_name: str):
super(Database, self).__init__()
self.db = mysql.connector.connect(
host = host,
user = user,
password = password,
database = db_name
)
self.db.autocommit = True
def close(self):
self.db.close()
def query(self, query: str, parameters: list=None) -> tuple[int, list[tuple]]:
with self.db.cursor() as cursor:
if parameters is None:
cursor.execute(query)
else:
cursor.execute(query, parameters)
if query.casefold().startswith("SELECT".casefold()):
rows = cursor.fetchall()
return (len(rows), rows)
elif query.casefold().startswith("INSERT".casefold()):
return (cursor.lastrowid, None)
else:
return (cursor.rowcount, None)
# Data class (effective struct) because complex dictionary access is uggo.
class TranscriptableFile():
def __init__(self, file_path: str, real_date: datetime, milliseconds_from_start: int, user_snowflake: str=None):
self.file_path = file_path
self.real_date = real_date
self.milliseconds_from_start = milliseconds_from_start
self.user_snowflake = user_snowflake
def mix_audio_with_ffmpeg(files: list[TranscriptableFile], media_folder_path: str, call_id: int, is_final_pass: bool) -> TranscriptableFile:
filter_list = [
"[{input_id}]adelay={delay}|{delay}[a{input_id}]".format(
input_id = index,
delay = files[index].milliseconds_from_start
)
for index in range(len(files))
]
command_list = ["ffmpeg"]
for file in files:
command_list.append("-i")
command_list.append(file.file_path)
command_list.append("-filter_complex")
filter_string = "\"" + ";".join(filter_list) + ";"
filter_string = filter_string + "".join([
"[a{input_id}]".format(
input_id = index
)
for index in range(len(files))
])
filter_string = filter_string + "amix=inputs={input_count}:normalize=0[a]".format(
input_count = len(files)
)
if is_final_pass:
filter_string = filter_string + ";[a]volume=3[boosted]\""
else:
filter_string = filter_string + "\""
command_list.append(filter_string)
command_list.append("-map")
if is_final_pass:
command_list.append("\"[boosted]\"")
else:
command_list.append("\"[a]\"")
output_file_name = Path(
media_folder_path,
call_id,
"output.mp3" if is_final_pass else "intermediate-" + "".join(random.choices(string.ascii_uppercase + string.digits, k=10)) + ".mp3"
)
command_list.append(output_file_name)
# TODO shell = True isn't great, I don't remember the reason why it has to be this way
# I *think* it had something to do with me not using ffmpeg's absolute path
ffmpeg_process = subprocess.Popen(
' '.join(command_list),
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
shell = True
)
stdout, stderr = ffmpeg_process.communicate()
if ffmpeg_process.returncode != 0:
print("An FFMPEG process failed")
print(stdout)
print(stderr)
raise Exception("An FFMPEG process broke spectacularly")
return TranscriptableFile(output_file_name, files[0].real_date, files[0].milliseconds_from_start)