import sqlite3 import mysql.connector import subprocess import string import random from pathlib import Path from datetime import datetime class Database(): def __init__(self): pass def close(self): pass def query(self, query: str, parameters: list=None) -> tuple[int, list[tuple]]: pass def select(self, table: str, columns: list[str]=None, where: list[dict]=None, values: list=None) -> list[tuple]: query_string = "SELECT {columns} FROM {table}{where}".format( columns = "*" if columns is None else ",".join(columns), table = table, where = self.__generate_basic_where_clause(where) if not where is None else "" ) return self.query(query_string, values)[1] def insert(self, table: str, columns: list[str], values: list) -> int: query_string = "INSERT INTO {table} ({columns}) VALUES ({values})".format( table = table, columns = ",".join(columns), values = ("?," * len(values))[:-1] ) return self.query(query_string, values)[0] def update(self, table: str, columns: list[str], values: list, where: list[dict]=None) -> int: query_string = "UPDATE {table} SET {set_rules}{where}".format( table = table, set_rules = ",".join([element + "=?" for element in columns]), where = self.__generate_basic_where_clause(where) if not where is None else "" ) return self.query(query_string, values)[0] def delete(self, table: str, values: list, where: list[dict]=None) -> int: query_string = "DELETE FROM {table}{where}".format( table = table, where = self.__generate_basic_where_clause(where) if not where is None else "" ) return self.query(query_string, values)[0] # TODO This probably breaks with MySQL, because question mark bad, maybe just have MySQL class override this def __generate_basic_where_clause(self, where: list[dict]): return " WHERE {clauses}".format( clauses = "".join([ element["name"] + " " + element["compare"] + " ?" + (" " + element["boolean_op"] + " " if "boolean_op" in element else "") for element in where ]) ) class SQLite(Database): def __init__(self, db_name: str): super(Database, self).__init__() self.db = sqlite3.connect(db_name) self.db.autocommit = True def close(self): self.db.close() def query(self, query: str, parameters: list=None) -> tuple[int, list[tuple]]: if parameters is None: cursor = self.db.execute(query) else: cursor = self.db.execute(query, parameters) if query.casefold().startswith("SELECT".casefold()): rows = list(cursor) return (len(rows), rows) elif query.casefold().startswith("INSERT".casefold()): return (cursor.lastrowid, None) else: return (cursor.rowcount, None) class MySQL(Database): def __init__(self, host: str, user: str, password: str, db_name: str): super(Database, self).__init__() self.db = mysql.connector.connect( host = host, user = user, password = password, database = db_name ) self.db.autocommit = True def close(self): self.db.close() def query(self, query: str, parameters: list=None) -> tuple[int, list[tuple]]: with self.db.cursor() as cursor: if parameters is None: cursor.execute(query) else: cursor.execute(query, parameters) if query.casefold().startswith("SELECT".casefold()): rows = cursor.fetchall() return (len(rows), rows) elif query.casefold().startswith("INSERT".casefold()): return (cursor.lastrowid, None) else: return (cursor.rowcount, None) # Data class (effective struct) because complex dictionary access is uggo. class TranscriptableFile(): def __init__(self, file_path: str, real_date: datetime, milliseconds_from_start: int, user_snowflake: str=None): self.file_path = file_path self.real_date = real_date self.milliseconds_from_start = milliseconds_from_start self.user_snowflake = user_snowflake def mix_audio_with_ffmpeg(files: list[TranscriptableFile], media_folder_path: str, call_id: int, is_final_pass: bool) -> TranscriptableFile: filter_list = [ "[{input_id}]adelay={delay}|{delay}[a{input_id}]".format( input_id = index, delay = files[index].milliseconds_from_start ) for index in range(len(files)) ] command_list = ["ffmpeg"] for file in files: command_list.append("-i") command_list.append(file.file_path) command_list.append("-filter_complex") filter_string = "\"" + ";".join(filter_list) + ";" filter_string = filter_string + "".join([ "[a{input_id}]".format( input_id = index ) for index in range(len(files)) ]) filter_string = filter_string + "amix=inputs={input_count}:normalize=0[a]".format( input_count = len(files) ) if is_final_pass: filter_string = filter_string + ";[a]volume=3[boosted]\"" else: filter_string = filter_string + "\"" command_list.append(filter_string) command_list.append("-map") if is_final_pass: command_list.append("\"[boosted]\"") else: command_list.append("\"[a]\"") output_file_name = Path( media_folder_path, call_id, "output.mp3" if is_final_pass else "intermediate-" + "".join(random.choices(string.ascii_uppercase + string.digits, k=10)) + ".mp3" ) command_list.append(output_file_name) # TODO shell = True isn't great, I don't remember the reason why it has to be this way # I *think* it had something to do with me not using ffmpeg's absolute path ffmpeg_process = subprocess.Popen( ' '.join(command_list), stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell = True ) stdout, stderr = ffmpeg_process.communicate() if ffmpeg_process.returncode != 0: print("An FFMPEG process failed") print(stdout) print(stderr) raise Exception("An FFMPEG process broke spectacularly") return TranscriptableFile(output_file_name, files[0].real_date, files[0].milliseconds_from_start)