199 lines
6.5 KiB
Python
199 lines
6.5 KiB
Python
import sqlite3
|
|
import mysql.connector
|
|
import subprocess
|
|
import string
|
|
import random
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
class Database():
|
|
def __init__(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
def query(self, query: str, parameters: list=None) -> tuple[int, list[tuple]]:
|
|
pass
|
|
|
|
def select(self, table: str, columns: list[str]=None, where: list[dict]=None, values: list=None) -> list[tuple]:
|
|
query_string = "SELECT {columns} FROM {table}{where}".format(
|
|
columns = "*" if columns is None else ",".join(columns),
|
|
table = table,
|
|
where = self.__generate_basic_where_clause(where) if not where is None else ""
|
|
)
|
|
|
|
return self.query(query_string, values)[1]
|
|
|
|
def insert(self, table: str, columns: list[str], values: list) -> int:
|
|
query_string = "INSERT INTO {table} ({columns}) VALUES ({values})".format(
|
|
table = table,
|
|
columns = ",".join(columns),
|
|
values = ("?," * len(values))[:-1]
|
|
)
|
|
|
|
return self.query(query_string, values)[0]
|
|
|
|
def update(self, table: str, columns: list[str], values: list, where: list[dict]=None) -> int:
|
|
query_string = "UPDATE {table} SET {set_rules}{where}".format(
|
|
table = table,
|
|
set_rules = ",".join([element + "=?" for element in columns]),
|
|
where = self.__generate_basic_where_clause(where) if not where is None else ""
|
|
)
|
|
|
|
return self.query(query_string, values)[0]
|
|
|
|
def delete(self, table: str, values: list, where: list[dict]=None) -> int:
|
|
query_string = "DELETE FROM {table}{where}".format(
|
|
table = table,
|
|
where = self.__generate_basic_where_clause(where) if not where is None else ""
|
|
)
|
|
|
|
return self.query(query_string, values)[0]
|
|
|
|
# TODO This probably breaks with MySQL, because question mark bad, maybe just have MySQL class override this
|
|
def __generate_basic_where_clause(self, where: list[dict]):
|
|
return " WHERE {clauses}".format(
|
|
clauses = "".join([
|
|
element["name"] + " " + element["compare"] + " ?" + (" " + element["boolean_op"] + " " if "boolean_op" in element else "")
|
|
for element in where
|
|
])
|
|
)
|
|
|
|
class SQLite(Database):
|
|
def __init__(self, db_name: str):
|
|
super(Database, self).__init__()
|
|
|
|
self.db = sqlite3.connect(db_name)
|
|
self.db.autocommit = True
|
|
|
|
def close(self):
|
|
self.db.close()
|
|
|
|
def query(self, query: str, parameters: list=None) -> tuple[int, list[tuple]]:
|
|
if parameters is None:
|
|
cursor = self.db.execute(query)
|
|
else:
|
|
cursor = self.db.execute(query, parameters)
|
|
|
|
if query.casefold().startswith("SELECT".casefold()):
|
|
rows = list(cursor)
|
|
|
|
return (len(rows), rows)
|
|
elif query.casefold().startswith("INSERT".casefold()):
|
|
return (cursor.lastrowid, None)
|
|
else:
|
|
return (cursor.rowcount, None)
|
|
|
|
class MySQL(Database):
|
|
def __init__(self, host: str, user: str, password: str, db_name: str):
|
|
super(Database, self).__init__()
|
|
|
|
self.db = mysql.connector.connect(
|
|
host = host,
|
|
user = user,
|
|
password = password,
|
|
database = db_name
|
|
)
|
|
self.db.autocommit = True
|
|
|
|
def close(self):
|
|
self.db.close()
|
|
|
|
def query(self, query: str, parameters: list=None) -> tuple[int, list[tuple]]:
|
|
with self.db.cursor() as cursor:
|
|
if parameters is None:
|
|
cursor.execute(query)
|
|
else:
|
|
cursor.execute(query, parameters)
|
|
|
|
if query.casefold().startswith("SELECT".casefold()):
|
|
rows = cursor.fetchall()
|
|
|
|
return (len(rows), rows)
|
|
elif query.casefold().startswith("INSERT".casefold()):
|
|
return (cursor.lastrowid, None)
|
|
else:
|
|
return (cursor.rowcount, None)
|
|
|
|
# Data class (effective struct) because complex dictionary access is uggo.
|
|
class TranscriptableFile():
|
|
def __init__(self, file_path: str, real_date: datetime, milliseconds_from_start: int, user_snowflake: str=None):
|
|
self.file_path = file_path
|
|
self.real_date = real_date
|
|
self.milliseconds_from_start = milliseconds_from_start
|
|
self.user_snowflake = user_snowflake
|
|
|
|
def mix_audio_with_ffmpeg(files: list[TranscriptableFile], media_folder_path: str, call_id: int, is_final_pass: bool) -> TranscriptableFile:
|
|
filter_list = [
|
|
"[{input_id}]adelay={delay}|{delay}[a{input_id}]".format(
|
|
input_id = index,
|
|
delay = files[index].milliseconds_from_start
|
|
)
|
|
for index in range(len(files))
|
|
]
|
|
|
|
command_list = ["ffmpeg"]
|
|
|
|
for file in files:
|
|
command_list.append("-i")
|
|
command_list.append(file.file_path)
|
|
|
|
command_list.append("-filter_complex")
|
|
|
|
filter_string = "\"" + ";".join(filter_list) + ";"
|
|
|
|
filter_string = filter_string + "".join([
|
|
"[a{input_id}]".format(
|
|
input_id = index
|
|
)
|
|
for index in range(len(files))
|
|
])
|
|
|
|
filter_string = filter_string + "amix=inputs={input_count}:normalize=0[a]".format(
|
|
input_count = len(files)
|
|
)
|
|
|
|
if is_final_pass:
|
|
filter_string = filter_string + ";[a]volume=3[boosted]\""
|
|
else:
|
|
filter_string = filter_string + "\""
|
|
|
|
command_list.append(filter_string)
|
|
command_list.append("-map")
|
|
|
|
if is_final_pass:
|
|
command_list.append("\"[boosted]\"")
|
|
else:
|
|
command_list.append("\"[a]\"")
|
|
|
|
output_file_name = Path(
|
|
media_folder_path,
|
|
str(call_id),
|
|
"output.mp3" if is_final_pass else "intermediate-" + "".join(random.choices(string.ascii_uppercase + string.digits, k=10)) + ".mp3"
|
|
)
|
|
|
|
command_list.append(str(output_file_name))
|
|
|
|
# TODO shell = True isn't great, I don't remember the reason why it has to be this way
|
|
# I *think* it had something to do with me not using ffmpeg's absolute path
|
|
ffmpeg_process = subprocess.Popen(
|
|
' '.join(command_list),
|
|
stdout = subprocess.PIPE,
|
|
stderr = subprocess.PIPE,
|
|
shell = True
|
|
)
|
|
|
|
stdout, stderr = ffmpeg_process.communicate()
|
|
|
|
if ffmpeg_process.returncode != 0:
|
|
print("An FFMPEG process failed")
|
|
print(stdout)
|
|
print(stderr)
|
|
raise Exception("An FFMPEG process broke spectacularly")
|
|
|
|
return TranscriptableFile(output_file_name, files[0].real_date, files[0].milliseconds_from_start)
|
|
|
|
|
|
|