diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..77a5ba5
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,21 @@
+*.*~
+*.pyc
+*.iml
+*.txt
+*.yml
+!docker-compose.yml
+!requirements.txt
+data/*
+analyze/*
+bin/*
+.idea/*
+.handler-saves/*
+notify_failure.sh
+__pycache__/*
+venv
+.venv
+txt2speech-venv/*
+.vscode
+.DS_Store
+.python-version
+.env
diff --git a/.gitignore b/.gitignore
index 2adb59c..80b63e5 100755
--- a/.gitignore
+++ b/.gitignore
@@ -2,9 +2,20 @@
*.pyc
*.iml
*.txt
-data/
-analyze/
-bin/
-.idea/
-.handler-saves/
-notify_failure.sh
\ No newline at end of file
+*.yml
+!docker-compose.yml
+!requirements.txt
+data/*
+analyze/*
+bin/*
+.idea/*
+.handler-saves/*
+notify_failure.sh
+__pycache__/*
+venv
+.venv
+txt2speech-venv/*
+.vscode
+.DS_Store
+.python-version
+.env
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..0bcb1c5
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,20 @@
+FROM python:3.13-slim
+
+WORKDIR /usr/src/app
+
+RUN apt-get update && \
+ apt-get install -y --no-install-recommends \
+ libmagic1 libmagic-dev \
+ ffmpeg \
+ postgresql-client \
+ default-mysql-client && \
+ rm -rf /var/lib/apt/lists/*
+
+COPY requirements.txt .
+
+RUN pip install --no-cache-dir -r requirements.txt
+
+COPY . .
+
+ENTRYPOINT ["./wait_for_db.sh"]
+CMD ["python", "-m", "txt2SpeechBot"]
\ No newline at end of file
diff --git a/README.md b/README.md
index 5040e51..1b4bf21 100755
--- a/README.md
+++ b/README.md
@@ -45,6 +45,195 @@ They will appear every time you type the bot's name without any additional text.
If you would want to add an audio to this menu, don't hesitate to contact me on telegram ([@supremoh](https://t.me/supremoh)).
I will decide if it is suitable for the bot.
-
+
-* This bot collects some data for its proper operation. If you use it, you're accepting this fact.
\ No newline at end of file
+* This bot collects some data for its proper operation. If you use it, you're accepting this fact.
+* You can delete any stored data with `/delete_my_data` command.
+* You can view what data is collected with `/privacy` command.
+
+---
+
+## Local Setup
+
+You can run this bot using Docker Compose with profiles or Docker-less.
+
+### 1. Clone the Repository
+
+```bash
+git clone https://github.com/gmm96/Txt2SpeechBot.git
+cd Txt2SpeechBot
+```
+
+### 2. Create and Configure `.env`
+
+Open .env and modify it with your variables.
+
+```bash
+cp sample.env .env
+```
+
+
+Configuration Variables
+
+### Bot Settings
+
+* `BOT_TOKEN`: Your Telegram Bot token (required).
+* `TTS_API`: Your Text-to-Speech API endpoint. (required).
+* `HOST_PERSISTENT_PATH`: Absolute path on your host for storing files persistently (e.g. `/home/user/txt2speech`).
+* `OWNER_ID`: Telegram user ID of the bot owner.
+* `ADMINS_IDS`: Comma-separated list of Telegram user IDs with admin access.
+* `ALLOWED_CHATS`: Comma-separated list of allowed chat or group IDs. Leave empty for global access.
+
+### Usage Limits
+
+* `MAX_AUDIO_SIZE`: Maximum audio file size in MB (default: 20).
+* `MAX_STORED_AUDIO`: Maximum number of audios stored per user (default: 50).
+* `MAX_CHARACTER_DESC`: Max characters allowed in audio description (default: 30).
+* `MAX_CHARACTER_QUERY`: Max characters allowed per TTS query (default: 200).
+
+### Logging & Error Handling
+
+* `FORWARD_ERRORS_TO_OWNER`: `bool` forward exceptions to `OWNER_ID`.
+* `SAVE_INPUT_MSG`: `bool` save incoming messages to `messages.log`.
+* `SAVE_INPUT_QUERIES`: `bool` save text queries to `queries.log`.
+* `SAVE_UNAUTHORIZED`: `bool` log unauthorized access attempts.
+
+### Database Settings
+
+* `DB_TYPE`: One of `postgresql`, `mysql`, `mariadb`, or `sqlite`.
+* `DB_NAME`, `DB_USERNAME`, `DB_PASSWORD`: Credentials used for the selected database.
+* `DB_PATH`: Hostname or file path depending on DB type (e.g. `postgresql-db` (for Docker) or `db.myhost.com` (for external) or `data/db.sqlite` for SQLite).
+* `DB_PORT`: Port used by the database (e.g. 5432 for PostgreSQL).
+* `DB_ROOT_PASSWORD`: Database root password (Only for MySQL/MariaDB).
+
+### Admin Interfaces
+
+* `DB_MANAGER_EXTERNAL_PORT`: WebUI port to access pgAdmin or phpMyAdmin (default: 8080).
+* `PG_USERNAME`, `PG_PASSWORD`: Credentials to access pgAdmin for PostgreSQL.
+
+
+
+
+### 3. Setup and Run
+
+
+Docker Compose Route
+
+Choose one supported database and optionally the corresponding manager.
+
+## Supported Compose Profiles
+
+| Database | Profile Name | Docker Hostname | WebUI Managers | Notes |
+| ---------- | ------------ | --------------- | -------------- | ------------------------- |
+| PostgreSQL | `postgresql` | `postgresql-db` | `pgadmin` | Recomended Setup |
+| MySQL | `mysql` | `mysql-db` | `phpadmin` | - |
+| MariaDB | `mariadb` | `mariadb-db` | `phpadmin` | - |
+| SQLite | *(none)* | - | *(none)* | Use this for testing
or with low userbase |
+
+---
+
+## Running Examples
+
+### - PostgreSQL + pgAdmin
+
+```bash
+docker compose --profile postgresql --profile pgadmin up
+```
+
+* Use `DB_TYPE=postgresql`
+* Use `DB_PATH=postgresql-db`
+* pgAdmin available at: [http://localhost:8080](http://localhost:8080)
+* You can specify a different port by changing `DB_MANAGER_EXTERNAL_PORT`
+
+---
+
+### - MySQL + phpMyAdmin
+
+```bash
+docker compose --profile mysql --profile phpadmin up
+```
+
+* Use `DB_TYPE=mysql`
+* Use `DB_PATH=mysql-db`
+* phpMyAdmin available at: [http://localhost:8080](http://localhost:8080)
+* You can specify a different port by changing `DB_MANAGER_EXTERNAL_PORT`
+
+---
+
+### - MariaDB + phpMyAdmin
+
+```bash
+docker compose --profile mariadb --profile phpadmin up
+```
+
+* Use `DB_TYPE=mariadb`
+* Use `DB_PATH=mariadb-db`
+* phpMyAdmin available at: [http://localhost:8080](http://localhost:8080)
+* You can specify a different port by changing `DB_MANAGER_EXTERNAL_PORT`
+
+---
+
+### - SQLite (No Compose Profile)
+
+SQLite works without any extra profile:
+
+```bash
+docker compose up
+```
+
+* Use `DB_TYPE=sqlite`
+* Use `DB_PATH=data/db.sqlite`
+
+---
+
+### Stop Services
+
+To shut everything down:
+
+```bash
+docker compose --profile "*" down
+```
+
+
+
+
+Docker-less Route
+
+If you want to run the bot directly on your machine without Docker, follow these steps:
+
+#### 1. Install system dependencies
+
+Install the following packages:
+
+* `libmagic1`
+* `libmagic-dev`
+* `ffmpeg`
+
+For example, on Debian/Ubuntu:
+
+```bash
+sudo apt update
+sudo apt install libmagic1 libmagic-dev ffmpeg
+```
+---
+#### 2. Create venv inside repo folder
+```bash
+python3 -m venv .venv
+```
+---
+#### 3. Activate venv
+```bash
+source .venv/bin/activate
+```
+---
+#### 4. Install Python dependencies
+
+```bash
+pip install -r requirements.txt
+```
+---
+#### 5. Run the bot
+
+```bash
+python -m txt2SpeechBot
+```
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..d221507
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,91 @@
+services:
+ telegram-bot:
+ container_name: txt2SpeechBot
+ build: .
+ restart: unless-stopped
+ env_file:
+ - .env
+ volumes:
+ - ${HOST_PERSISTENT_PATH}/txt2SpeechBot/data:/usr/src/app/data
+
+ postgresql-db:
+ image: postgres:17
+ profiles: ["postgresql"]
+ container_name: txt2speech_postgres
+ restart: unless-stopped
+ environment:
+ POSTGRES_USER: ${DB_USERNAME}
+ POSTGRES_PASSWORD: ${DB_PASSWORD}
+ POSTGRES_DB: ${DB_NAME}
+ healthcheck:
+ test: ["CMD-SHELL", "pg_isready -U ${DB_USERNAME} -d ${DB_NAME}"]
+ interval: 5s
+ timeout: 2s
+ retries: 10
+ volumes:
+ - ${HOST_PERSISTENT_PATH}/txt2SpeechBot/data/postgresql:/var/lib/postgresql/data
+
+ pgadmin:
+ image: dpage/pgadmin4:latest
+ profiles: ["pgadmin"]
+ container_name: txt2speech_pgadmin
+ restart: unless-stopped
+ depends_on:
+ postgresql-db:
+ condition: service_healthy
+ environment:
+ PGADMIN_DEFAULT_EMAIL: ${PG_USERNAME}
+ PGADMIN_DEFAULT_PASSWORD: ${PG_PASSWORD}
+ ports:
+ - "${DB_MANAGER_EXTERNAL_PORT}:80"
+ volumes:
+ - ${HOST_PERSISTENT_PATH}/txt2SpeechBot/data/pgadmin:/var/lib/pgadmin
+
+ mysql-db:
+ image: mysql:9
+ container_name: txt2speech_mysql
+ profiles: ["mysql"]
+ restart: unless-stopped
+ environment:
+ MYSQL_ROOT_PASSWORD: ${DB_ROOT_PASSWORD}
+ MYSQL_DATABASE: ${DB_NAME}
+ MYSQL_USER: ${DB_USERNAME}
+ MYSQL_PASSWORD: ${DB_PASSWORD}
+ healthcheck:
+ test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
+ interval: 5s
+ timeout: 2s
+ retries: 10
+ volumes:
+ - ${HOST_PERSISTENT_PATH}/txt2SpeechBot/data/mysql:/var/lib/mysql
+
+ mariadb-db:
+ image: mariadb:11
+ profiles: ["mariadb"]
+ container_name: txt2speech_mariadb
+ restart: unless-stopped
+ environment:
+ MARIADB_ROOT_PASSWORD: ${DB_ROOT_PASSWORD}
+ MARIADB_DATABASE: ${DB_NAME}
+ MARIADB_USER: ${DB_USERNAME}
+ MARIADB_PASSWORD: ${DB_PASSWORD}
+ healthcheck:
+ # idk why this healthcheck doesn't work for mariadb
+ test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]
+ interval: 5s
+ timeout: 2s
+ retries: 10
+ volumes:
+ - ${HOST_PERSISTENT_PATH}/txt2SpeechBot/data/mariadb:/var/lib/mysql
+
+ phpmyadmin:
+ image: phpmyadmin/phpmyadmin:latest
+ profiles: ["phpmyadmin"]
+ container_name: txt2speech_phpadmin
+ restart: unless-stopped
+ environment:
+ PMA_HOST: ${DB_PATH}
+ PMA_PORT: ${DB_PORT}
+ PMA_ARBITRARY: 1
+ ports:
+ - "${DB_MANAGER_EXTERNAL_PORT}:80"
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..54d8d1f
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,8 @@
+requests
+pyTelegramBotAPI
+python-magic
+pydub
+mysql-connector-python
+psycopg[binary,pool]
+python-dotenv
+audioop-lts; python_version>='3.13'
diff --git a/sample.env b/sample.env
new file mode 100644
index 0000000..bdb669a
--- /dev/null
+++ b/sample.env
@@ -0,0 +1,59 @@
+###############################################
+# === Bot Settings ===
+###############################################
+
+BOT_TOKEN="" # Your Telegram Bot token
+TTS_API="" # Your preferred TTS API endpoint
+HOST_PERSISTENT_PATH=/absolute/host/path # Used for persistent volumes (e.g. /home/user/txt2speech)
+
+OWNER_ID="" # Owner user ID
+ADMINS_IDS="" # Comma-separated admin user IDs
+ALLOWED_CHATS="" # Comma-separated user or group IDs (leave empty for open bot access)
+
+MAX_AUDIO_SIZE=20 # Max file size in MB (default: 20)
+MAX_STORED_AUDIO=50 # Max number of stored audios per user (default: 50)
+MAX_CHARACTER_DESC=30 # Max characters in audio description (default: 30)
+MAX_CHARACTER_QUERY=200 # Max characters per TTS query (default: 200)
+
+FORWARD_ERRORS_TO_OWNER=false # true/false — Send exception messages to OWNER_ID
+SAVE_INPUT_MSG=false # true/false — Save input messages to messages.log
+SAVE_INPUT_QUERIES=false # true/false — Save queries to queries.log
+SAVE_UNAUTHORIZED=false # true/false — Log unauthorized access
+
+###############################################
+# === Database Settings ===
+###############################################
+
+# DB_TYPE must be one of: postgresql, mysql, mariadb, sqlite
+DB_TYPE=postgresql
+
+# === Shared Settings (All DBs) ===
+DB_NAME=your_database
+DB_USERNAME=your_user
+DB_PASSWORD=your_pass
+
+# === External or Docker Database Host ===
+# Use external host (e.g. your-db-host.com) for remote DB
+# OR use Docker internal hostname (postgresql-db, mysql-db, mariadb-db) when using Compose --profile
+DB_PATH=postgresql-db # e.g. postgresql-db (for Docker) or db.myhost.com (for external)
+DB_PORT=5432 # e.g. 5432 (Postgres), 3306 (MySQL/MariaDB)
+
+# === SQLite only ===
+# If using sqlite, set DB_TYPE=sqlite and DB_PATH=data/db.sqlite
+# Leave all other DB_ values blank
+
+###############################################
+# === Root Password (MySQL/MariaDB only) ===
+###############################################
+DB_ROOT_PASSWORD=change_me
+
+###############################################
+# === Web UI Port for pgAdmin or phpMyAdmin ===
+###############################################
+DB_MANAGER_EXTERNAL_PORT=8080 # pgAdmin or phpMyAdmin will be available on localhost:8080
+
+###############################################
+# === pgAdmin admin account ===
+###############################################
+PG_USERNAME=admin@admin.com
+PG_PASSWORD=adminpass
diff --git a/txt2SpeechBot/__main__.py b/txt2SpeechBot/__main__.py
new file mode 100644
index 0000000..6d1a262
--- /dev/null
+++ b/txt2SpeechBot/__main__.py
@@ -0,0 +1,10 @@
+# !/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+"""
+Entrypoint.
+"""
+from txt2SpeechBot.core import start
+
+if __name__ == "__main__":
+ start()
diff --git a/txt2SpeechBot/audioStore/audio.py b/txt2SpeechBot/audioStore/audio.py
index 270b062..e8b1613 100644
--- a/txt2SpeechBot/audioStore/audio.py
+++ b/txt2SpeechBot/audioStore/audio.py
@@ -17,7 +17,7 @@ class Audio:
without the requirement of downloading the file in our filesystem.
"""
- def __init__(self, file_id: str = "", user_id: str = "", description: str = "", duration: int = 0,
+ def __init__(self, row_id: int = 0, file_id: str = "", user_id: str = "", description: str = "", file_type: str = "", duration: int = 0,
size: int = 0, user_audio_id: int = 0, callback_code: str = 0, times_used: int = 0) -> None:
"""
Creates a object that represent a cached audio in Telegram.
@@ -31,9 +31,11 @@ def __init__(self, file_id: str = "", user_id: str = "", description: str = "",
:param callback_code: Callback code to obtain the description.
:param times_used: Number of times the audio has been used.
"""
+ self.row_id = row_id
self.file_id = file_id
self.user_id = user_id
self.description = description
+ self.file_type = file_type
self.duration = duration
self.size = size
self.user_audio_id = user_audio_id
@@ -50,11 +52,13 @@ def get_audio_list_for_inline_results(cls, db_result: List[Tuple[str, str, int,
:rtype: List[Audio].
"""
return sorted([cls(
- file_id=audio_tuple[0],
- description=audio_tuple[1],
- user_audio_id=audio_tuple[2],
- callback_code=audio_tuple[3],
- times_used=audio_tuple[4]
+ row_id=audio_tuple[0],
+ file_id=audio_tuple[1],
+ description=audio_tuple[2],
+ file_type=audio_tuple[3],
+ user_audio_id=audio_tuple[4],
+ callback_code=audio_tuple[5],
+ times_used=audio_tuple[6]
) for audio_tuple in db_result],
key=attrgetter("times_used")
)
@@ -71,8 +75,9 @@ def get_audio_list_for_listing(cls, db_result: List[Tuple]) -> 'List[Audio]':
return [cls(
file_id=audio_tuple[0],
description=audio_tuple[1],
- duration=audio_tuple[2],
- size=audio_tuple[3]
+ file_type=audio_tuple[2],
+ duration=audio_tuple[3],
+ size=audio_tuple[4]
) for audio_tuple in db_result
]
diff --git a/txt2SpeechBot/audioStore/storedAudio.py b/txt2SpeechBot/audioStore/storedAudio.py
index e7737dc..d37e718 100644
--- a/txt2SpeechBot/audioStore/storedAudio.py
+++ b/txt2SpeechBot/audioStore/storedAudio.py
@@ -7,10 +7,10 @@
from telebot import types
from typing import List, Optional, Union, Tuple
-from helpers.constants import Constants
-from helpers.database import Database
-from helpers.utils import Utils
-from audioStore.audio import Audio
+from ..helpers.constants import Constants, DBStatements
+from ..helpers.database import Database
+from ..helpers.utils import Utils
+from ..audioStore.audio import Audio
class StoredAudio:
@@ -23,7 +23,7 @@ class StoredAudio:
to check the mime type or which kind of file is attached to a Telegram message.
"""
- SIZE_LIMIT = 20 * 1024 * 1024
+ SIZE_LIMIT = Constants.MAX_AUDIO_SIZE * 1024 * 1024
"""Limit in bytes for the file size."""
@staticmethod
@@ -37,29 +37,52 @@ def create_inline_results_stored_audio(query: types.InlineQuery) \
:rtype: List [types.InlineQueryResultCachedVoice or types.InlineQueryResultArticle]
"""
db_conn = Utils.create_db_conn()
- sql_read = Constants.DBStatements.AUDIOS_READ_FOR_QUERY_BUILD % str(query.from_user.id)
- result = db_conn.read_all(sql_read)
+ #sql_read = Constants.DBStatements.AUDIOS_READ_FOR_QUERY_BUILD % str(query.from_user.id)
+ result = db_conn.read_all(DBStatements.AUDIOS_READ_FOR_QUERY_BUILD, (str(query.from_user.id),))
if len(result) > 0:
return StoredAudio.__create_inline_results_with_audios(result)
else:
return StoredAudio.__create_inline_results_no_audios()
@staticmethod
- def __create_inline_results_with_audios(db_result: List[Tuple]) -> List[types.InlineQueryResultCachedVoice]:
+ def __create_inline_results_with_audios(db_result: List[Tuple]) -> List[Union[types.InlineQueryResultCachedVoice, types.InlineQueryResultCachedAudio]]:
"""
Returns inline results for an user with stored audios.
:return: Inline results to answer a query.
- :rtype: List[types.InlineQueryResultCachedVoice]
+ :rtype: List[types.InlineQueryResultCachedVoice] or List[types.InlineQueryResultCachedAudio]
"""
inline_results = []
audios = Audio.get_audio_list_for_inline_results(db_result)
for audio in audios:
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton("Description", callback_data=audio.callback_code))
- inline_results.append(types.InlineQueryResultCachedVoice(
- str(audio.user_audio_id), audio.file_id, audio.description, reply_markup=markup
- ))
+
+ if audio.file_type == 'voice':
+ result = types.InlineQueryResultCachedVoice(
+ id=str(audio.user_audio_id),
+ voice_file_id=audio.file_id,
+ title=audio.description,
+ reply_markup=markup
+ )
+ elif audio.file_type == 'video':
+ result = types.InlineQueryResultCachedVideo(
+ id=str(audio.user_audio_id),
+ title=audio.description,
+ video_file_id=audio.file_id,
+ reply_markup=markup
+ )
+ else:
+ # CachedDocument for everything else
+ # Flacs don't work with InlineQueryResultCachedAudio
+ result = types.InlineQueryResultCachedDocument(
+ id=str(audio.user_audio_id),
+ title=audio.description,
+ document_file_id=audio.file_id,
+ reply_markup=markup
+ )
+ inline_results.append(result)
+
return inline_results
@staticmethod
@@ -70,7 +93,7 @@ def __create_inline_results_no_audios() -> List[types.InlineQueryResultArticle]:
:return: Inline results to answer a query.
:rtype: List[types.InlineQueryResultArticle]
"""
- msg_if_clicked = types.InputTextMessageContent(Constants.HELP_MSG)
+ msg_if_clicked = types.InputTextMessageContent(Constants.BotAnswers.HELP_MSG.format(bot_username=Constants.BOT_INFO.username, max_audios=Constants.MAX_STORED_AUDIO, max_size=Utils.format_bytes(Constants.MAX_DOWNLOAD_BYTES)))
inline_results = [
types.InlineQueryResultArticle(1, "No entries for personal audios", msg_if_clicked),
types.InlineQueryResultArticle(2, "You can type to get a TextToSpeech audio", msg_if_clicked),
@@ -86,13 +109,12 @@ def update_chosen_results_stored_audio(chosen_result: types.ChosenInlineResult)
:param chosen_result: Telegram chosen inline result.
"""
db_conn = Utils.create_db_conn()
- sql_read = Constants.DBStatements.AUDIOS_READ_FOR_CHOSEN_RESULT % (str(chosen_result.from_user.id),
- int(chosen_result.result_id))
- result = db_conn.read_one(sql_read)
+ #sql_read = Constants.DBStatements.AUDIOS_READ_FOR_CHOSEN_RESULT % (str(chosen_result.from_user.id), int(chosen_result.result_id))
+ result = db_conn.read_one(DBStatements.AUDIOS_READ_FOR_CHOSEN_RESULT, (str(chosen_result.from_user.id), int(chosen_result.result_id),))
if result:
audio = Audio(file_id=result[0], times_used=result[1])
- sql_update = Constants.DBStatements.AUDIOS_UPDATE_FOR_CHOSEN_RESULT % (audio.record_use(), audio.file_id)
- db_conn.write_all(sql_update)
+ #sql_update = Constants.DBStatements.AUDIOS_UPDATE_FOR_CHOSEN_RESULT % (audio.record_use(), audio.file_id)
+ db_conn.write_all(DBStatements.AUDIOS_UPDATE_FOR_CHOSEN_RESULT, (audio.record_use(), audio.file_id))
@staticmethod
def get_callback_query_stored_audio(callback_code: str) -> Optional[str]:
@@ -105,7 +127,7 @@ def get_callback_query_stored_audio(callback_code: str) -> Optional[str]:
"""
db_conn = Utils.create_db_conn()
audio = Audio(callback_code=callback_code)
- result = db_conn.read_one(Constants.DBStatements.AUDIOS_READ_FOR_CALLBACK_QUERY % callback_code)
+ result = db_conn.read_one(DBStatements.AUDIOS_READ_FOR_CALLBACK_QUERY, (callback_code,))
if result:
audio.description = result[0]
return audio.description
@@ -135,14 +157,13 @@ def get_stored_audios_listing(user_id: str, db_conn: Database) -> Optional[str]:
:param db_conn: Database object
:return: Str if exists audios, None in other case.
"""
- result = db_conn.read_all(Constants.DBStatements.AUDIOS_READ_FOR_LISTING % user_id)
+ result = db_conn.read_all(DBStatements.AUDIOS_READ_FOR_LISTING, (str(user_id),))
if len(result) == 0:
return None
audios = Audio.get_audio_list_for_listing(result)
message = "These are your stored audios.\n\n"
for index, audio in enumerate(audios):
- message += "%i.- %s \t|\t %i s \t|\t %.2fKB\n" % (index + 1, audio.description,
- audio.duration, audio.size / 1024.0)
+ message += "%i.- %s \t|\t %s \t|\t %i s \t|\t %s\n" % (index + 1, audio.description, audio.file_type, audio.duration, Utils.format_bytes(audio.size))
return message
@staticmethod
@@ -155,6 +176,28 @@ def is_file_valid_telegram_voice(content_type: str) -> bool:
:rtype: Bool.
"""
return content_type == 'voice'
+
+ @staticmethod
+ def is_file_valid_audio(content_type: str) -> bool:
+ """
+ Checks if file is an audio file.
+
+ :param content_type: Type of content.
+ :return: True if it is an audio, False in other case.
+ :rtype: Bool.
+ """
+ return content_type == 'audio'
+
+ @staticmethod
+ def is_file_valid_video(content_type: str) -> bool:
+ """
+ Checks if file is a video file.
+
+ :param content_type: Type of content.
+ :return: True if it is a video, False in other case.
+ :rtype: Bool.
+ """
+ return content_type == 'video'
@staticmethod
def validate_multimedia_file(msg: types.Message) -> bool:
diff --git a/txt2SpeechBot/core.py b/txt2SpeechBot/core.py
new file mode 100644
index 0000000..26f4f52
--- /dev/null
+++ b/txt2SpeechBot/core.py
@@ -0,0 +1,26 @@
+# !/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+"""
+File containing bot startup logic.
+"""
+import telebot
+from .ttsbot import Text_To_Speech_Bot
+from .helpers.constants import Constants
+from .helpers.database import DatabaseStartup
+
+my_bot = telebot.TeleBot(Constants.TOKEN)
+tts = Text_To_Speech_Bot(my_bot)
+
+def start():
+ Constants.STA_LOG.logger.info("Starting bot...")
+
+ DatabaseStartup.startup_db(Constants.DB_TYPE, Constants.DB_CREDENTIALS)
+
+ Constants.BOT_INFO = tts.get_user_or_chat_info()
+
+ if Constants.FORWARD_ERRORS_TO_OWNER:
+ Constants.STA_LOG.add_telegram_handler(tts, Constants.FilePath.STA_LOG)
+
+ Constants.STA_LOG.logger.info("Bot started.")
+ tts.start_polling()
\ No newline at end of file
diff --git a/txt2SpeechBot/handlers.py b/txt2SpeechBot/handlers.py
new file mode 100644
index 0000000..3594949
--- /dev/null
+++ b/txt2SpeechBot/handlers.py
@@ -0,0 +1,557 @@
+# !/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+"""
+File containing bot handlers.
+"""
+import re
+import os
+import random
+from telebot import types
+
+from .core import my_bot, tts
+from .ttsbot import Text_To_Speech_Bot
+from .helpers.constants import Constants, DBStatements
+from .helpers.database import Database
+from .helpers.utils import Utils
+from .helpers.user import User
+from .textToSpeech.language import Language
+from .helpers.custom_decorators import Decorators
+
+
+# region Callback Mode
+
+@my_bot.callback_query_handler(func=lambda call: call.data.startswith("log|") and (Text_To_Speech_Bot.is_owner(call) and Text_To_Speech_Bot.is_private_chat(call)))
+@Decorators.catch_and_report
+def handle_log_file_request(call: types.CallbackQuery):
+ _, file_path = call.data.split("|", 1)
+ tts.send_document_if_exists(call.from_user.id, file_path)
+
+
+@my_bot.callback_query_handler(func=lambda call: call.data.startswith("tts|") and Text_To_Speech_Bot.is_allowed_chat(call))
+@Decorators.catch_and_report
+def handle_tts_callback(call: types.CallbackQuery):
+ parts = call.data.split("|")
+
+ _, lang_code, code_id, user_id = parts
+
+ if user_id != str(call.from_user.id):
+ tts.bot.answer_callback_query(call.id, random.choice(Constants.BotAnswers.random_callback_answers), show_alert=True)
+ return
+
+ text = tts.get_callback_query_tts_audio(code_id, tts.queries)
+ if not text:
+ tts.bot.answer_callback_query(call.id, "❌ Bot was restarted, try again.", show_alert=False)
+ tts.bot.delete_message(chat_id=call.message.chat.id, message_id=call.message.message_id,)
+ return
+
+ if lang_code == "cancel":
+ tts.bot.answer_callback_query(call.id, "❌ TTS request cancelled.", show_alert=False)
+ tts.bot.delete_message(chat_id=call.message.chat.id, message_id=call.message.message_id,)
+ return
+
+ lang_code = lang_code.lower()
+ supported_lang_codes = {
+ lang.code.replace('-', '_').lower()
+ for lang in Language.get_languages_sorted_alphabetically()
+ }
+
+ if lang_code not in supported_lang_codes:
+ supported_lang_codes_text = ', '.join(supported_lang_codes)
+ tts.bot.answer_callback_query(call.id, "Unsupported language.")
+ tts.bot.send_message(call.message.chat.id, Constants.BotAnswers.LANGUAGE_UNSUPPORTED.format(lang_code=lang_code, supported_text=supported_lang_codes_text), parse_mode="HTML")
+ return
+
+ voice_link, buttons = tts.process_single(lang_code, text, tts.queries)
+
+ tts.bot.send_voice(chat_id=call.message.chat.id, voice=voice_link, reply_markup=buttons)
+ tts.bot.answer_callback_query(callback_query_id=call.id, text=f"✅ Sent voice in {lang_code.upper()}", show_alert=False)
+ tts.bot.delete_message(chat_id=call.message.chat.id, message_id=call.message.message_id)
+
+ User.set_last_lang(call.from_user.id, lang_code)
+
+# endregion
+
+# region Inline Mode
+
+@my_bot.inline_handler(func=lambda query: 0 <= len(query.query) <= Constants.MAX_CHARACTER_QUERY and Text_To_Speech_Bot.is_allowed_chat(query))
+@Decorators.catch_and_report
+def query_handler(query: types.InlineQuery) -> None:
+ """
+ Answers with different purpose audios an inline query from an user.
+
+ :param query: Telegram query.
+ """
+ User.validate_user_from_telegram(query.from_user)
+ Utils.record_query(query)
+ if not query.query:
+ inline_results = tts.create_inline_results_stored_audio(query)
+ else:
+ inline_results = tts.create_inline_results_tts_audio(query, tts.queries)
+ try:
+ tts.bot.answer_inline_query(str(query.id), inline_results, cache_time=1)
+ except Exception as query_exc:
+ Constants.STA_LOG.logger.exception('Query: "' + query.query + '"', exc_info=True)
+
+
+@my_bot.chosen_inline_handler(func=Text_To_Speech_Bot.is_allowed_chat)
+@Decorators.catch_and_report
+def chosen_result_handler(chosen_inline_result: types.ChosenInlineResult) -> None:
+ """
+ Updates previous database record with the selected inline result.
+
+ :param chosen_inline_result: Telegram chosen inline result.
+ """
+ if len(chosen_inline_result.query) == 0:
+ tts.update_chosen_results_stored_audio(chosen_inline_result)
+ else:
+ tts.update_chosen_results_tts_audio(chosen_inline_result)
+
+
+@my_bot.callback_query_handler(func=Text_To_Speech_Bot.is_allowed_chat)
+@Decorators.catch_and_report
+def callback_query_handler(callback: types.CallbackQuery) -> None:
+ """
+ Provides the user a description of the sent audio.
+
+ :param callback: Telegram callback query.
+ """
+ text = tts.get_callback_query(callback)
+ if len(text) > 54:
+ tts.bot.answer_callback_query(callback.id, text, show_alert=True)
+ else:
+ tts.bot.answer_callback_query(callback.id, text)
+
+# endregion
+
+# region Owner Bot Commands
+
+@my_bot.message_handler(commands=[Constants.OwnerCommands.LOG], func=lambda update: (Text_To_Speech_Bot.is_owner(update) and Text_To_Speech_Bot.is_private_chat(update)))
+@Decorators.auto_delete_temp_message()
+@Decorators.catch_and_report
+def command_log(msg: types.Message) -> None:
+ """
+ Sends log file to owner or admins.
+
+ :param msg: Telegram message with /log command.
+ """
+ buttons = []
+
+ for attr_name in dir(Constants.FilePath):
+ if attr_name.endswith('_LOG'):
+ file_path = getattr(Constants.FilePath, attr_name)
+ file_name = os.path.basename(file_path)
+ button = types.InlineKeyboardButton(
+ text=f"📄 {file_name}",
+ callback_data=f"log|{file_path}"
+ )
+ buttons.append(button)
+
+ return tts.bot.reply_to(msg, "🗂 Available log files:", reply_markup=types.InlineKeyboardMarkup([buttons]))
+
+# endregion
+
+# region Admin Commands
+
+@my_bot.message_handler(commands=[Constants.AdminCommands.DELETE_USER], func=lambda update: (Text_To_Speech_Bot.is_owner(update) or Text_To_Speech_Bot.is_admin(update)))
+@Decorators.catch_and_report
+def delete_specific_user_data_command(msg: types.Message) -> None:
+ """
+ Deletes all user data of a specified user stored in the database.
+ Supports both replying to a user message or using: /delete_user
+
+ :param msg: Telegram message with /delete_user {user_id} command or a reply to the target user's message.
+ """
+ if msg.reply_to_message:
+ user_id = msg.reply_to_message.from_user.id
+ else:
+ parts = msg.text.split()
+ if len(parts) >= 2 and parts[1].isdigit():
+ user_id = int(parts[1])
+ else:
+ tts.bot.reply_to(msg, "❌ You must reply to a user message or pass a valid numeric user ID like: /delete_user ")
+ return
+
+ if Text_To_Speech_Bot.is_owner(user_id) or Text_To_Speech_Bot.is_admin(user_id):
+ tts.bot.reply_to(msg, "❌ You cannot delete admin or owner data. Use /delete_my_data instead")
+ return
+
+ db_conn = Utils.create_db_conn()
+ result = db_conn.read_one(DBStatements.USER_EXISTS, (str(user_id),))
+ if not result:
+ Constants.STA_LOG.logger.warning(Constants.ExceptionMessages.DB_USER_NOT_FOUND.format(user_id))
+ tts.bot.reply_to(msg, Constants.BotAnswers.USER_NOT_FOUND.format(user_id), parse_mode="HTML")
+ return
+
+ remove_result = db_conn.delete_one(DBStatements.DELETE_USER_DATA, (str(user_id),))
+ if remove_result:
+ Constants.STA_LOG.logger.info(f"User {user_id} deleted succesfully from database requested by {msg.from_user.id} Admin")
+ tts.bot.reply_to(msg, Constants.BotAnswers.DELETED_ALL_AUDIO.format(f'data from {user_id} was'), parse_mode="HTML")
+ else:
+ Constants.STA_LOG.logger.warning(Constants.ExceptionMessages.DB_USER_DELETE_ERROR.format(user_id, f"{msg.from_user.id} Admin"))
+ tts.bot.reply_to(msg, Constants.BotAnswers.UNABLE_TO_DELETE_USER.format(user_id), parse_mode="HTML")
+
+
+@my_bot.message_handler(commands=[Constants.AdminCommands.CMD_LIST], func=lambda update: (Text_To_Speech_Bot.is_owner(update) or Text_To_Speech_Bot.is_admin(update)) and Text_To_Speech_Bot.is_private_chat(update))
+@Decorators.auto_delete_temp_message()
+@Decorators.catch_and_report
+def admin_command_list(msg: types.Message) -> None:
+ """
+ Sends a list of usable commands by admins or owner
+ """
+ return tts.bot.reply_to(msg, Constants.BotAnswers.ADMIN_CMD_LIST, parse_mode="HTML")
+
+# endregion
+
+# region Bot Commands
+
+@my_bot.message_handler(commands=[Constants.BotCommands.START, Constants.BotCommands.HELP], func=Text_To_Speech_Bot.is_allowed_chat)
+@Decorators.auto_delete_temp_message(delay_seconds=360)
+@Decorators.catch_and_report
+def command_help(msg: types.Message) -> None: # TODO improve help message
+ """
+ Answers the user with a help message to help him to understand the purpose of this bot.
+
+ :param msg: Telegram message with /help command.
+ """
+ return tts.bot.reply_to(msg, Constants.BotAnswers.HELP_MSG.format(bot_username=Constants.BOT_INFO.username, max_audios=Constants.MAX_STORED_AUDIO, max_size=Utils.format_bytes(Constants.MAX_DOWNLOAD_BYTES)), parse_mode="HTML")
+
+
+@my_bot.message_handler(commands=[Constants.BotCommands.PRIVACY_INFO], func=lambda update: (Text_To_Speech_Bot.is_allowed_chat(update) and Text_To_Speech_Bot.is_private_chat(update)))
+@Decorators.catch_and_report
+def command_privacy(msg: types.Message) -> None:
+ """
+ Answers the user with a message containing what data is collected.
+
+ :param msg: Telegram message with /privacy command.
+ """
+ tts.bot.reply_to(msg, Constants.BotAnswers.PRIVACY_MSG, parse_mode="HTML")
+
+
+@my_bot.message_handler(commands=[Constants.BotCommands.DELETE_USER_DATA], func=lambda update: (Text_To_Speech_Bot.is_allowed_chat(update) and Text_To_Speech_Bot.is_private_chat(update)))
+@Decorators.catch_and_report
+def delete_user_data_command(msg: types.Message) -> None:
+ """
+ Delete all user data stored from db (1/2 Remove User Data).
+
+ :param msg: Telegram message with /delete_my_data command.
+ """
+ list_stored_audios(msg)
+ db_conn = Utils.create_db_conn()
+ result = db_conn.read_one(DBStatements.USER_EXISTS, (str(msg.from_user.id),))
+ if result:
+ tts.next_step(msg, Constants.BotAnswers.RM_ALL_AUDIO.format('data'), confirm_delete_user_data)
+ else:
+ Constants.STA_LOG.logger.warning(Constants.ExceptionMessages.DB_USER_NOT_FOUND.format(msg.from_user.id))
+
+def confirm_delete_user_data(msg: types.Message) -> None:
+ """
+ Delete all user data stored from db (2/2 Remove User Data).
+
+ :param msg: Telegram message with /delete_my_data command.
+ """
+ if msg.content_type == 'text' and msg.text and msg.text.strip() == 'CONFIRM':
+ user_id = msg.from_user.id
+ db_conn = Utils.create_db_conn()
+ remove_result = db_conn.delete_one(DBStatements.DELETE_USER_DATA, (str(user_id),))
+ if remove_result:
+ Constants.STA_LOG.logger.info(f"User {msg.from_user.id} deleted succesfully from database requested by {user_id}")
+ tts.bot.reply_to(msg, Constants.BotAnswers.DELETED_ALL_AUDIO.format('your data was'))
+ else:
+ Constants.STA_LOG.logger.warning(Constants.ExceptionMessages.DB_USER_DELETE_ERROR.format(user_id, user_id))
+ tts.bot.reply_to(msg, Constants.BotAnswers.UNABLE_TO_DELETE_USER.format(user_id), parse_mode="HTML")
+ else:
+ tts.bot.reply_to(msg, Constants.BotAnswers.RM_ALL_NOT_CONFIRM)
+
+
+# TODO: refractor this hanlder
+@my_bot.message_handler(func=lambda message: message.text and message.text.lower().startswith(f"/{Constants.BotCommands.TTS}") and Text_To_Speech_Bot.is_allowed_chat(message))
+@Decorators.auto_delete_temp_message()
+@Decorators.catch_and_report
+def handle_tts_command(message: types.Message) -> None:
+ # Try to match format: /tts_xx some text or /tts_xx@bot_username some text
+ bot_username = Constants.BOT_INFO.username
+ pattern = rf"^/{Constants.BotCommands.TTS}_([a-z]{{2}}(?:_[a-z]{{2}})?)(?:@{bot_username})?(?:\s+(.+))?"
+ match = re.match(pattern, message.text, re.IGNORECASE)
+
+ if match:
+ # Handle /tts_xx or /tts_xx some text
+ lang_code, text = match.groups()
+ lang_code = lang_code.lower()
+
+ if not text:
+ if message.quote and message.quote.is_manual:
+ text = message.quote.text
+ elif message.reply_to_message and message.reply_to_message.text:
+ text = message.reply_to_message.text.strip()
+ else:
+ tts.bot.reply_to(message, Constants.BotAnswers.NO_TTS_TEXT, parse_mode="HTML")
+ return
+
+ if not tts.enforce_max_length(message, text, Constants.MAX_CHARACTER_QUERY):
+ return
+
+ supported_lang_codes = {
+ lang.code.replace('-', '_').lower()
+ for lang in Language.get_languages_sorted_alphabetically()
+ }
+
+ if lang_code not in supported_lang_codes:
+ supported_text = ', '.join(supported_lang_codes)
+ tts.bot.reply_to(message, Constants.BotAnswers.LANGUAGE_UNSUPPORTED.format(lang_code=lang_code, supported_text=supported_text), parse_mode="HTML")
+ return
+
+ voice_link, buttons = tts.process_single(lang_code, text, tts.queries)
+
+ tts.bot.send_voice(chat_id=message.chat.id, voice=voice_link, reply_markup=buttons)
+
+ User.set_last_lang(message.from_user.id, lang_code)
+ return
+
+ # Else: it's a plain /tts command (or a reply to text)
+ parts = message.text.split(maxsplit=1)
+ text = parts[1] if len(parts) > 1 else ""
+
+ if not text:
+ if message.quote and message.quote.is_manual:
+ text = message.quote.text
+ elif message.reply_to_message and message.reply_to_message.text:
+ text = message.reply_to_message.text.strip()
+ else:
+ tts.bot.reply_to(message, Constants.BotAnswers.NO_TTS_TEXT, parse_mode="HTML")
+ return
+
+ if not tts.enforce_max_length(message, text, Constants.MAX_CHARACTER_QUERY):
+ return
+
+ user_id = str(message.from_user.id)
+ last_lang_code = User.get_last_lang(user_id)
+ code_id = tts.store_tts_query(text, tts.queries)
+
+ keyboard = types.InlineKeyboardMarkup()
+ buttons = []
+
+ for lang_display, lang_code in Constants.SORTED_LANGUAGES.items():
+ lang_code_safe = lang_code.replace("-", "_").lower()
+ buttons.append(types.InlineKeyboardButton(
+ text=lang_display,
+ callback_data=f"tts|{lang_code_safe}|{code_id}|{user_id}"
+ ))
+
+ # Add buttons in rows of 3
+ for i in range(0, len(buttons), 3):
+ keyboard.row(*buttons[i:i + 3])
+
+ # Bottom row: last used + cancel
+ last_row = []
+ if last_lang_code:
+ label = Constants.LANGUAGES.get(last_lang_code.lower(), f"Last used ({last_lang_code.upper()})")
+ last_row.append(types.InlineKeyboardButton(
+ text=f"🕘 {label}",
+ callback_data=f"tts|{last_lang_code.lower()}|{code_id}|{user_id}"
+ ))
+ last_row.append(types.InlineKeyboardButton(text="❌ Cancel", callback_data=f"tts|cancel|_|{user_id}"))
+ keyboard.row(*last_row)
+
+ return tts.bot.reply_to(message, "Choose a language for TTS:", reply_markup=keyboard)
+
+
+@my_bot.message_handler(commands=[Constants.BotCommands.TO_VOICE], func=Text_To_Speech_Bot.is_allowed_chat)
+@Decorators.catch_and_report
+def convert_to_voice_command(msg: types.Message) -> None:
+ """
+ Convert the replied message file into Telegram Voice Format
+
+ :param msg: Reply to Telegram message with /to_voice command.
+ """
+ if not msg.reply_to_message:
+ tts.bot.reply_to(msg, Constants.BotAnswers.REPLY_TO_MSG.format(Constants.BotCommands.TO_VOICE))
+ return
+
+ replied_msg = msg.reply_to_message
+
+ if tts.is_file_valid_telegram_voice(replied_msg.content_type):
+ tts.bot.reply_to(msg, Constants.BotAnswers.IS_ALREADY_VOICE)
+ return
+
+ if not tts.validate_multimedia_file(replied_msg):
+ tts.bot.reply_to(msg, Constants.BotAnswers.NOT_AUDIO.format(Constants.BotCommands.TO_VOICE))
+ return
+
+ tts.convert_to_voice(msg, replied_msg)
+
+
+@my_bot.message_handler(commands=[Constants.BotCommands.ADD_AUDIO], func=lambda update: (Text_To_Speech_Bot.is_allowed_chat(update) and Text_To_Speech_Bot.is_private_chat(update)))
+@Decorators.catch_and_report
+def add_audio_start(msg: types.Message) -> None:
+ """
+ Initializes the process of audio uploading for user (1/3 Add Audio).
+
+ :param msg: Telegram message with /addaudio command.
+ """
+ db_conn = Utils.create_db_conn()
+ result = db_conn.read_one(DBStatements.AUDIOS_READ_COUNT, (str(msg.from_user.id),))
+
+ if not result or result[0] >= Constants.MAX_STORED_AUDIO:
+ tts.bot.reply_to(msg, Constants.BotAnswers.MAX_OWN_AUDIOS.format(Constants.MAX_STORED_AUDIO))
+ return
+
+ # support for replying with /addaudio directly to a file
+ reply = msg.reply_to_message
+ if reply and tts.validate_multimedia_file(reply):
+ user_id = str(msg.from_user.id)
+ file_link, file_type = tts.extract_file_link(reply)
+
+ duplicate = db_conn.read_one(DBStatements.AUDIOS_DUPLICATE_CHECK, (str(user_id), file_link.file_id))
+ if duplicate:
+ tts.bot.reply_to(reply, Constants.BotAnswers.AUDIO_ALREADY_SAVED)
+ return
+
+ tts.next_step_focused[str(msg.from_user.id)] = reply, file_link, file_type
+ tts.next_step(msg, Constants.BotAnswers.PROVIDE_DESC.format(Constants.MAX_CHARACTER_DESC), add_audio_description)
+ else:
+ tts.next_step(msg, Constants.BotAnswers.SEND_AUDIO, add_audio_file)
+ return
+
+
+def add_audio_file(msg: types.Message) -> None:
+ """
+ Validates file received from user (2/3 Add Audio).
+
+ :param msg: Telegram message with attached file.
+ """
+ if tts.validate_multimedia_file(msg):
+ message_context = msg
+ elif msg.reply_to_message and tts.validate_multimedia_file(msg.reply_to_message):
+ message_context = msg.reply_to_message
+ else:
+ tts.bot.reply_to(msg, Constants.BotAnswers.NOT_AUDIO.format(Constants.BotCommands.ADD_AUDIO))
+ return
+
+ db_conn = Utils.create_db_conn()
+ user_id = str(msg.from_user.id)
+
+ file_link, file_type = tts.extract_file_link(message_context)
+
+ duplicate = db_conn.read_one(DBStatements.AUDIOS_DUPLICATE_CHECK, (str(user_id), file_link.file_id))
+ if duplicate:
+ tts.bot.reply_to(message_context, Constants.BotAnswers.AUDIO_ALREADY_SAVED)
+ return
+
+ tts.next_step_focused[str(msg.from_user.id)] = message_context, file_link, file_type
+ tts.next_step(msg, Constants.BotAnswers.PROVIDE_DESC.format(Constants.MAX_CHARACTER_DESC), add_audio_description)
+
+
+def add_audio_description(msg: types.Message) -> None:
+ """
+ Downloads audio file and saves it with its respective description (3/3 Add Audio).
+
+ :param msg: Telegram message with audio description.
+ """
+ db_conn = Utils.create_db_conn()
+ description = Database.db_str(msg.text.strip())
+ if msg.content_type == 'text' and len(description) <= Constants.MAX_CHARACTER_DESC:
+ user_id = str(msg.from_user.id)
+ result = db_conn.read_one(DBStatements.AUDIOS_READ_FOR_CHECKING, (str(user_id), description))
+ if result is None:
+ file_message, file_link, file_type = tts.next_step_focused[str(msg.from_user.id)]
+
+ db_return = db_conn.read_all(DBStatements.AUDIOS_READ_USER_IDS, (str(user_id),))
+ if len(db_return) > 0:
+ taken_ids = [audio_id[0] for audio_id in db_return]
+ user_audio_id = tts.get_stored_audio_free_id(taken_ids)
+ else:
+ user_audio_id = 1
+ callback_code = Utils.generate_unique_str()
+ db_conn.write_all(DBStatements.AUDIOS_INSERT, (file_link.file_id, str(user_id), description, file_type, file_link.duration, file_link.file_size, user_audio_id, callback_code))
+ tts.bot.reply_to(file_message, Constants.BotAnswers.SAVED.format(description))
+ else:
+ tts.next_step(msg, Constants.BotAnswers.USED_DESC, add_audio_description)
+ else:
+ tts.next_step(msg, Constants.BotAnswers.WRONG_DESC.format(Constants.MAX_CHARACTER_DESC), add_audio_description)
+
+
+@my_bot.message_handler(commands=[Constants.BotCommands.LST_AUDIO], func=lambda update: (Text_To_Speech_Bot.is_allowed_chat(update) and Text_To_Speech_Bot.is_private_chat(update)))
+@Decorators.catch_and_report
+def list_stored_audios(msg: types.Message) -> None:
+ """
+ Lists all the stored audios by a certain user and their details.
+
+ :param msg: Telegram message with /listaudios command.
+ """
+ db_conn = Utils.create_db_conn()
+ audio_str_list = tts.get_stored_audios_listing(str(msg.from_user.id), db_conn)
+ if audio_str_list:
+ tts.bot.reply_to(msg, audio_str_list)
+ else:
+ tts.bot.reply_to(msg, Constants.BotAnswers.LST_NONE_AUDIO)
+
+
+@my_bot.message_handler(commands=[Constants.BotCommands.RM_AUDIO], func=lambda update: (Text_To_Speech_Bot.is_allowed_chat(update) and Text_To_Speech_Bot.is_private_chat(update)))
+@Decorators.catch_and_report
+def rm_audio_start(msg: types.Message) -> None:
+ """
+ Lists all the stored audios by a certain user asks him to delete one of them
+ (1/2 Remove One Audio).
+
+ :param msg: Telegram message with /rmaudio command.
+ """
+ list_stored_audios(msg)
+ db_conn = Utils.create_db_conn()
+ result = db_conn.read_one(DBStatements.AUDIOS_READ_FOR_REMOVING, (str(msg.from_user.id),))
+ if result:
+ tts.next_step(msg, Constants.BotAnswers.RM_AUDIO, rm_audio_select)
+
+
+def rm_audio_select(msg: types.Message) -> None:
+ """
+ Removes an uploaded audio by a determined user if description equals the received
+ message from that user (2/2 Remove One Audio).
+
+ :param msg: Telegram message with removing confirmation.
+ """
+ if msg.content_type == 'text' and msg.text:
+ db_conn = Utils.create_db_conn()
+ audio_to_rm = Database.db_str(msg.text.strip())
+ user_id = str(msg.from_user.id)
+ result = db_conn.read_one(DBStatements.AUDIOS_READ_FOR_CHECKING, (str(user_id), audio_to_rm))
+ if result:
+ db_conn.write_all(DBStatements.AUDIOS_REMOVE, (str(user_id), audio_to_rm))
+ tts.bot.reply_to(msg, Constants.BotAnswers.DELETED_AUDIO)
+ else:
+ tts.bot.reply_to(msg, Constants.BotAnswers.RM_USED_DESC)
+ else:
+ tts.bot.reply_to(msg, Constants.BotAnswers.RM_DESC_NOT_TEXT)
+
+
+@my_bot.message_handler(commands=[Constants.BotCommands.RM_ALL_AUDIOS], func=lambda update: (Text_To_Speech_Bot.is_allowed_chat(update) and Text_To_Speech_Bot.is_private_chat(update)))
+@Decorators.catch_and_report
+def rm_all_audios(msg: types.Message) -> None:
+ """
+ Lists all the stored audios by a certain user asks him to delete all of them.
+ (1/2 Remove All Audios)
+
+ :param msg: Telegram message with /rmallaudios command.
+ """
+ list_stored_audios(msg)
+ db_conn = Utils.create_db_conn()
+ result = db_conn.read_one(DBStatements.AUDIOS_READ_FOR_REMOVING, (str(msg.from_user.id),))
+ if result:
+ tts.next_step(msg, Constants.BotAnswers.RM_ALL_AUDIO.format('audios'), confirm_rm_all_audios)
+
+
+def confirm_rm_all_audios(msg: types.Message) -> None:
+ """
+ Removes all previous uploaded audios by a determined user (2/2 Remove All Audios).
+
+ :param msg: Telegram message with removing confirmation.
+ """
+ if msg.content_type == 'text' and msg.text and msg.text.strip() == 'CONFIRM':
+ db_conn = Utils.create_db_conn()
+ remove_result = db_conn.write_all(DBStatements.AUDIOS_REMOVE_ALL, (str(msg.from_user.id),))
+ if remove_result:
+ tts.bot.reply_to(msg, Constants.BotAnswers.DELETED_ALL_AUDIO.format('your audios were'))
+ else:
+ tts.bot.reply_to(msg, Constants.BotAnswers.RM_ALL_NOT_CONFIRM)
+
+# endregion
\ No newline at end of file
diff --git a/txt2SpeechBot/helpers/constants.py b/txt2SpeechBot/helpers/constants.py
index f03d500..3922aba 100755
--- a/txt2SpeechBot/helpers/constants.py
+++ b/txt2SpeechBot/helpers/constants.py
@@ -4,10 +4,11 @@
"""
File containing Constants class.
"""
+from telebot import types
from typing import List
-from helpers.literalConstants import LiteralConstants
-from helpers.fileProcessing import FileProcessing
+from ..helpers.literalConstants import LiteralConstants
+from ..helpers.fileProcessing import FileProcessing
class Constants(LiteralConstants):
@@ -17,36 +18,86 @@ class Constants(LiteralConstants):
Class that contains the MySQL statements of this project and some constant values that needs to be
read from the disk. It inheritances from the LiteralConstants class, so all its values can be accessed.
"""
+ CONFIG: dict[str, str] = FileProcessing(LiteralConstants.FilePath.ENV_FILE, LiteralConstants.FileType.ENV).read_file()
+ TOKEN: str = CONFIG['BOT_TOKEN']
+ DB_TYPE: str = CONFIG['DB_TYPE']
+ TTS_STR: str = CONFIG['TTS_API']
+ DB_CREDENTIALS: dict[str, str] = {
+ "path": CONFIG['DB_PATH'],
+ "port": int(CONFIG.get('DB_PORT')) if CONFIG.get('DB_PORT') else '',
+ "username": CONFIG.get('DB_USERNAME'),
+ "password": CONFIG.get('DB_PASSWORD'),
+ "name": CONFIG.get('DB_NAME')
+ }
+
+ OWNER_ID: int|None = int(CONFIG.get('OWNER_ID')) if CONFIG.get('OWNER_ID') else None
+ FORWARD_ERRORS_TO_OWNER: bool = CONFIG.get('FORWARD_ERRORS_TO_OWNER', '').lower() == 'true'
+ ADMINS_IDS: set[int] = set(int(chat_id) for chat_id in CONFIG.get('ADMINS_IDS', '').split(",") if chat_id)
+ ALLOWED_CHATS: set[int] = set(int(chat_id) for chat_id in CONFIG.get('ALLOWED_CHATS', '').split(",") if chat_id)
+
+ MAX_AUDIO_SIZE: int = int(CONFIG.get('MAX_AUDIO_SIZE')) if CONFIG.get('MAX_AUDIO_SIZE') else 20
+ MAX_STORED_AUDIO: int = int(CONFIG.get('MAX_STORED_AUDIO')) if CONFIG.get('MAX_STORED_AUDIO') else 50
+ MAX_CHARACTER_DESC: int = int(CONFIG.get('MAX_CHARACTER_DESC')) if CONFIG.get('MAX_CHARACTER_DESC') else 30
+ MAX_CHARACTER_QUERY: int = int(CONFIG.get('MAX_CHARACTER_QUERY')) if CONFIG.get('MAX_CHARACTER_QUERY') else 200
+
+ SAVE_INPUT_MSG: bool = CONFIG.get('SAVE_INPUT_MSG', '').lower() == 'true'
+ SAVE_INPUT_QUERIES: bool = CONFIG.get('SAVE_INPUT_QUERIES', '').lower() == 'true'
+ SAVE_UNAUTHORIZED: bool = CONFIG.get('SAVE_UNAUTHORIZED', '').lower() == 'true'
+
+ # bot info for later user, get's filled in core.py
+ BOT_INFO: types.User
+
+
+class DBMeta(type):
+ def __init__(cls, name, bases, namespace):
+ super().__init__(name, bases, namespace)
+ cols = [cls._quote(lang) for lang in LiteralConstants.SORTED_LANGUAGES.values()]
+ cls.LAN_READ = f"SELECT {', '.join(cols)} FROM Lan_Results WHERE id = {cls.PLACEHOLDER}"
+
+class DBStatements(metaclass=DBMeta):
+ @classmethod
+ def _quote(cls, col: str) -> str:
+ if cls.IDENTIFIER_QUOTE:
+ return f"{cls.IDENTIFIER_QUOTE}{col}{cls.IDENTIFIER_QUOTE}"
+ return col
+
+ db_type = Constants.DB_TYPE.lower()
+
+ if db_type in ("mysql", "mariadb", "postgresql"):
+ PLACEHOLDER = "%s"
+ IDENTIFIER_QUOTE = '"' if db_type == "postgresql" else "`"
+ elif db_type == "sqlite":
+ PLACEHOLDER = "?"
+ IDENTIFIER_QUOTE = '"'
+ else:
+ raise ValueError(f"Unsupported DB_TYPE: {db_type} - Supported Databases: {Constants.SUPPORTED_DB}")
+
+ """ Table User_Info """
+ USER_READ = f"SELECT * FROM User_Info WHERE id = {PLACEHOLDER}"
+ USER_INSERT = f"INSERT INTO User_Info(id, username, first_name, last_name) VALUES ({PLACEHOLDER}, {PLACEHOLDER}, {PLACEHOLDER}, {PLACEHOLDER})"
+ USER_UPDATE = f"UPDATE User_Info SET username = {PLACEHOLDER}, first_name = {PLACEHOLDER}, last_name = {PLACEHOLDER} WHERE id = {PLACEHOLDER}"
+ USER_GET_LAST_LANG = f"SELECT last_lang FROM User_Info WHERE id = {PLACEHOLDER}"
+ USER_SET_LAST_LANG = f"UPDATE User_Info SET last_lang = {PLACEHOLDER} WHERE id = {PLACEHOLDER}"
+
+ """ Table Own_Audios """
+ AUDIOS_READ_FOR_QUERY_BUILD = f"""SELECT row_id, file_id, description, file_type, user_audio_id, callback_code, times_used FROM Own_Audios WHERE id = {PLACEHOLDER}"""
+ AUDIOS_READ_FOR_CHOSEN_RESULT = f"SELECT file_id, times_used FROM Own_Audios WHERE id = {PLACEHOLDER} AND user_audio_id = {PLACEHOLDER}"
+ AUDIOS_READ_FOR_CALLBACK_QUERY = f"SELECT description FROM Own_Audios WHERE callback_code = {PLACEHOLDER}"
+ AUDIOS_READ_COUNT = f"SELECT COUNT(file_id) FROM Own_Audios WHERE id = {PLACEHOLDER}"
+ AUDIOS_READ_FOR_CHECKING = f"SELECT file_id FROM Own_Audios WHERE id = {PLACEHOLDER} AND description = {PLACEHOLDER}"
+ AUDIOS_READ_FOR_LISTING = f"SELECT file_id, description, file_type, duration, size FROM Own_Audios WHERE id = {PLACEHOLDER}"
+ AUDIOS_READ_FOR_REMOVING = f"SELECT file_id, description FROM Own_Audios WHERE id = {PLACEHOLDER}"
+ AUDIOS_READ_USER_IDS = f"SELECT user_audio_id FROM Own_Audios WHERE id = {PLACEHOLDER}"
+ AUDIOS_INSERT = f"""INSERT INTO Own_Audios(file_id, id, description, file_type, duration, size, user_audio_id, callback_code) VALUES ({PLACEHOLDER}, {PLACEHOLDER}, {PLACEHOLDER}, {PLACEHOLDER}, {PLACEHOLDER}, {PLACEHOLDER}, {PLACEHOLDER}, {PLACEHOLDER})"""
+ AUDIOS_UPDATE_FOR_CHOSEN_RESULT = f"UPDATE Own_Audios SET times_used = {PLACEHOLDER} WHERE file_id = {PLACEHOLDER}"
+ AUDIOS_REMOVE = f"DELETE FROM Own_Audios WHERE id = {PLACEHOLDER} AND description = {PLACEHOLDER}"
+ AUDIOS_REMOVE_ALL = f"DELETE FROM Own_Audios WHERE id = {PLACEHOLDER}"
+ AUDIOS_DUPLICATE_CHECK = f"SELECT 1 FROM Own_Audios WHERE id = {PLACEHOLDER} AND file_id = {PLACEHOLDER}"
+ DELETE_USER_DATA = f"DELETE FROM User_Info WHERE id = {PLACEHOLDER}"
+ USER_EXISTS = f"SELECT 1 FROM User_Info WHERE id = {PLACEHOLDER}"
+
+ """ Table Lan_Results """
+ LAN_READ: str # filled by DBMeta
+ LAN_INSERT = f"INSERT INTO Lan_Results(id) VALUES({PLACEHOLDER})"
+ LAN_UPDATE_FOR_CHOSEN_RESULT = f"UPDATE Lan_Results SET {{}} = {PLACEHOLDER} WHERE id = {PLACEHOLDER}"
- TOKEN: str = FileProcessing(LiteralConstants.FilePath.TOKEN, LiteralConstants.FileType.REG).read_file().rstrip()
- DB_CREDENTIALS: List[str] = FileProcessing(LiteralConstants.FilePath.DB, LiteralConstants.FileType.JSON).read_file()
- TTS_STR: str = FileProcessing(LiteralConstants.FilePath.TTS, LiteralConstants.FileType.REG).read_file()
- HELP_MSG: str = FileProcessing(LiteralConstants.FilePath.HELP_MSG, LiteralConstants.FileType.REG).read_file()
-
- class DBStatements:
- """ MySQL statements to perform operations in Database system. """
-
- USER_READ: str = """SELECT * FROM User_Info WHERE id = '%s'"""
- USER_INSERT: str = """INSERT INTO User_Info(id, username, first_name, last_name) VALUES ('%s', '%s', '%s', '%s')"""
- USER_UPDATE: str = """UPDATE User_Info SET username = '%s', first_name = '%s', last_name = '%s' WHERE id = '%s'"""
- """ Table User_Info """
-
- AUDIOS_READ_FOR_QUERY_BUILD: str = """SELECT `file_id`, `description`, `user_audio_id`, `callback_code`, `times_used` FROM Own_Audios WHERE id = '%s'"""
- AUDIOS_READ_FOR_CHOSEN_RESULT: str = """SELECT `file_id`, `times_used` FROM Own_Audios WHERE id = '%s' AND user_audio_id = '%i'"""
- AUDIOS_READ_FOR_CALLBACK_QUERY: str = """SELECT `description` FROM Own_Audios WHERE callback_code = '%s'"""
- AUDIOS_READ_COUNT: str = """SELECT COUNT(`file_id`) FROM Own_Audios WHERE id = '%s'"""
- AUDIOS_READ_FOR_CHECKING: str = """SELECT `file_id` FROM Own_Audios WHERE id = '%s' AND description = '%s'"""
- AUDIOS_READ_FOR_LISTING: str = """SELECT `file_id`, `description`, `duration`, `size` FROM Own_Audios WHERE id = '%s'"""
- AUDIOS_READ_FOR_REMOVING: str = """SELECT `file_id`, `description` FROM Own_Audios WHERE id = '%s'"""
- AUDIOS_READ_USER_IDS: str = """SELECT `user_audio_id` FROM Own_Audios WHERE id = '%s'"""
- AUDIOS_INSERT: str = """INSERT INTO Own_Audios(file_id, id, description, duration, size, user_audio_id, callback_code) VALUES ('%s', '%s', '%s', '%i', '%i', '%i', '%s')"""
- AUDIOS_UPDATE_FOR_CHOSEN_RESULT: str = """UPDATE Own_Audios SET `times_used` = '%i' WHERE file_id = '%s'"""
- AUDIOS_REMOVE: str = """DELETE FROM Own_Audios WHERE id = '%s' AND description = '%s'"""
- AUDIOS_REMOVE_ALL: str = """DELETE FROM Own_Audios WHERE id = '%s'"""
- """ Table Own_Audios """
-
- LAN_READ: str = """SELECT `""" + """`, `""".join(LiteralConstants.SORTED_LANGUAGES.values()) \
- + """` FROM Lan_Results WHERE id = '%s'"""
- LAN_INSERT: str = """INSERT INTO Lan_Results(id) VALUES('%s')"""
- LAN_UPDATE_FOR_CHOSEN_RESULT: str = """UPDATE Lan_Results SET `%s` = '%d' WHERE id = '%s'"""
- """ Table Lan_Results """
diff --git a/txt2SpeechBot/helpers/custom_decorators.py b/txt2SpeechBot/helpers/custom_decorators.py
new file mode 100644
index 0000000..4905c56
--- /dev/null
+++ b/txt2SpeechBot/helpers/custom_decorators.py
@@ -0,0 +1,82 @@
+import sys, threading
+from functools import wraps
+from telebot import types
+
+from ..helpers.constants import Constants
+from ..core import tts
+
+class Decorators:
+ @staticmethod
+ def auto_delete_temp_message(delay_seconds: int = 60):
+ """
+ Decorator to auto delete returned message or messages from og func.
+ Default cooldown is 60 seconds.
+ """
+ def decorator(func):
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ result = func(*args, **kwargs)
+
+ def schedule_deletion(message):
+ if hasattr(result, 'chat') and hasattr(result, 'message_id'):
+ chat_id = result.chat.id
+ message_id = result.message_id
+
+ def delete_later():
+ try:
+ tts.bot.delete_message(chat_id, message_id)
+ except Exception as e:
+ if "message to delete not found" not in str(e).lower():
+ Constants.STA_LOG.logger.warning(f"Auto-delete failed: {e}")
+
+ threading.Timer(delay_seconds, delete_later).start()
+
+ if hasattr(result, 'chat') and hasattr(result, 'message_id'):
+ schedule_deletion(result)
+ elif isinstance(result, (list, tuple, set)):
+ for msg in result:
+ schedule_deletion(msg)
+
+ return result
+ return wrapper
+ return decorator
+
+ @staticmethod
+ def catch_and_report(func):
+ """
+ Decorator to catch exceptions and send message to bot owner
+ """
+ def wrapper(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception:
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ Constants.STA_LOG.logger.exception(Constants.ExceptionMessages.UNEXPECTED_ERROR, exc_info=(exc_type, exc_value, exc_traceback))
+
+ # send generic error message to user
+ for arg in args:
+ if isinstance(arg, types.Message):
+ try:
+ tts.bot.reply_to(arg, Constants.BotAnswers.GENERIC_ERROR)
+ except:
+ pass
+ break
+
+ elif isinstance(arg, types.CallbackQuery):
+ try:
+ tts.bot.answer_callback_query(arg.id, Constants.BotAnswers.GENERIC_ERROR)
+ tts.bot.edit_message_text(Constants.BotAnswers.GENERIC_ERROR, chat_id=arg.message.chat.id, message_id=arg.message.message_id)
+ except:
+ pass
+ break
+
+ elif isinstance(arg, types.InlineQuery):
+ try:
+ tts.bot.answer_inline_query(arg.id, [], switch_pm_text=Constants.BotAnswers.GENERIC_ERROR, switch_pm_parameter="start")
+ except:
+ pass
+ break
+
+ else:
+ break
+ return wrapper
diff --git a/txt2SpeechBot/helpers/database.py b/txt2SpeechBot/helpers/database.py
index 754f8a0..e042aa6 100755
--- a/txt2SpeechBot/helpers/database.py
+++ b/txt2SpeechBot/helpers/database.py
@@ -4,17 +4,330 @@
"""
File containing Database class.
"""
-
+import os
import re
-import mysql.connector as mysqldb
-from mysql.connector.errors import Error as MySQLdbError
+
+import sqlite3
+from sqlite3 import Error as SQLiteError
+
+import mysql.connector
+from mysql.connector import Error as MySQLError
+
+import psycopg
+from psycopg import Error as PostgreSQLError
+
from typing import Optional, List, Tuple, Union
-from helpers.constants import Constants
+from ..helpers.constants import Constants
+
+
+class DatabaseStartup:
+ @staticmethod
+ def startup_db(db_type: str, db_credentials: dict[str, str]):
+ """
+ Startup db depensing on type
+ """
+ db_type = db_type.lower()
+
+ if db_type == 'sqlite':
+ db_path = db_credentials['path']
+ DatabaseStartup._initialize_sqlite_db(db_path)
+ elif db_type == 'postgresql':
+ DatabaseStartup._initialize_postgresql_db(db_credentials['path'], db_credentials['port'], db_credentials['username'], db_credentials['password'], db_credentials['name'])
+ elif db_type in ('mysql', 'mariadb'):
+ DatabaseStartup._initialize_mysql_db(db_credentials['path'], db_credentials['port'], db_credentials['username'], db_credentials['password'], db_credentials['name'])
+ else:
+ raise ValueError(f"{db_type} is not a supported Database! Supported Databases: {Constants.SUPPORTED_DB}")
+
+ @staticmethod
+ def _initialize_sqlite_db(path: str) -> None:
+ """
+ Initializes the SQLite database file and creates necessary tables if they don't exist.
+ """
+ is_new = not os.path.exists(path)
+ if is_new:
+ Constants.STA_LOG.logger.info(f"Creating new SQLite DB at {path}")
+ else:
+ Constants.STA_LOG.logger.info(f"Opening existing {Constants.DB_TYPE} DB at {path}, ensuring tables exist")
+
+ conn = sqlite3.connect(path)
+ cursor = conn.cursor()
+
+ try:
+ # Create tables
+ cursor.execute("PRAGMA foreign_keys = ON;")
+ cursor.executescript("""
+
+ CREATE TABLE IF NOT EXISTS User_Info (
+ row_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, -- Row number
+ id TEXT NOT NULL, -- User id
+ username TEXT DEFAULT NULL, -- Username
+ first_name TEXT DEFAULT NULL, -- First name
+ last_name TEXT DEFAULT NULL, -- Second name
+ last_lang TEXT DEFAULT NULL, -- Last Langage used by user
+ UNIQUE(row_id),
+ UNIQUE(id)
+ );
+
+ CREATE TABLE IF NOT EXISTS Lan_Results (
+ row_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, -- Row number
+ id TEXT NOT NULL, -- User id
+ "Ar" INTEGER NOT NULL DEFAULT 0, -- Arabic
+ "De-de" INTEGER NOT NULL DEFAULT 0, -- German
+ "En-uk" INTEGER NOT NULL DEFAULT 0, -- English UK
+ "En-us" INTEGER NOT NULL DEFAULT 0, -- English US
+ "Es-es" INTEGER NOT NULL DEFAULT 0, -- Español ES
+ "Es-mx" INTEGER NOT NULL DEFAULT 0, -- Español MX
+ "Fr-fr" INTEGER NOT NULL DEFAULT 0, -- French
+ "It-it" INTEGER NOT NULL DEFAULT 0, -- Italian
+ "Pt-pt" INTEGER NOT NULL DEFAULT 0, -- Portuguese
+ "El-gr" INTEGER NOT NULL DEFAULT 0, -- Greek
+ "Ru-ru" INTEGER NOT NULL DEFAULT 0, -- Russian
+ "Tr-tr" INTEGER NOT NULL DEFAULT 0, -- Turkish
+ "Zh-cn" INTEGER NOT NULL DEFAULT 0, -- Chinese
+ "Ja" INTEGER NOT NULL DEFAULT 0, -- Japanese
+ "Pl" INTEGER NOT NULL DEFAULT 0, -- Polish
+ UNIQUE(row_id),
+ UNIQUE(id),
+ FOREIGN KEY(id) REFERENCES User_Info(id) ON DELETE CASCADE
+ );
+
+ CREATE TABLE IF NOT EXISTS Own_Audios (
+ row_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, -- Row number
+ file_id TEXT NOT NULL, -- Telegram file id
+ id TEXT NOT NULL, -- User id
+ description TEXT NOT NULL, -- Short description of the audio
+ file_type TEXT NOT NULL, -- Audio File Type (.voice or .audio)
+ duration INTEGER NOT NULL, -- Audio duration in seconds
+ size INTEGER NOT NULL, -- File size in bytes
+ user_audio_id INTEGER NOT NULL, -- User audio identification
+ callback_code VARCHAR(36) NOT NULL, -- Description callback code
+ times_used INTEGER NOT NULL DEFAULT 0, -- Times that audio has been used by user
+ UNIQUE(row_id),
+ UNIQUE(id, file_id),
+ FOREIGN KEY(id) REFERENCES User_Info(id) ON DELETE CASCADE
+ );
+
+ """)
+
+ conn.commit()
+
+ except SQLiteError as err:
+ Constants.STA_LOG.logger.exception(f"SQLite Startup error: {err}")
+ raise
+ finally:
+ cursor.close()
+ conn.close()
+
+ @staticmethod
+ def _initialize_mysql_db(host: str, port: str, user: str, password: str, database: str) -> None:
+ """
+ Initializes the MySQL database and creates necessary tables if they don't exist.
+ """
+ try:
+ conn = mysql.connector.connect(
+ host=host,
+ port=port,
+ user=user,
+ password=password
+ )
+
+ cursor = conn.cursor()
+ cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{database}` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
+ cursor.execute(f"USE `{database}`")
+
+ Constants.STA_LOG.logger.info(f"MySQL database '{database}' initialized or already exists.")
+
+ cursor.execute("SET foreign_key_checks = 0;")
+
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS User_Info (
+ row_id INT(10) NOT NULL AUTO_INCREMENT COMMENT 'Row number',
+ id VARCHAR(15) NOT NULL COMMENT 'User id',
+ username VARCHAR(32) DEFAULT NULL COMMENT 'Username',
+ first_name VARCHAR(255) DEFAULT NULL,
+ last_name VARCHAR(255) DEFAULT NULL COMMENT 'Second name',
+ last_lang VARCHAR(10) DEFAULT NULL COMMENT 'Last Lang used by the user',
+ PRIMARY KEY (id),
+ UNIQUE KEY row_id (row_id)
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Info about every user that uses at least once the bot';
+ """)
+
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS Lan_Results (
+ row_id INT(10) NOT NULL AUTO_INCREMENT COMMENT 'Row number',
+ id VARCHAR(15) NOT NULL COMMENT 'User id',
+ `Ar` INT(11) NOT NULL DEFAULT 0 COMMENT 'Arabic',
+ `De-de` INT(11) NOT NULL DEFAULT 0 COMMENT 'German',
+ `En-uk` INT(11) NOT NULL DEFAULT 0 COMMENT 'English UK',
+ `En-us` INT(11) NOT NULL DEFAULT 0 COMMENT 'English US',
+ `Es-es` INT(11) NOT NULL DEFAULT 0 COMMENT 'Español ES',
+ `Es-mx` INT(11) NOT NULL DEFAULT 0 COMMENT 'Español MX',
+ `Fr-fr` INT(11) NOT NULL DEFAULT 0 COMMENT 'French',
+ `It-it` INT(11) NOT NULL DEFAULT 0 COMMENT 'Italian',
+ `Pt-pt` INT(11) NOT NULL DEFAULT 0 COMMENT 'Portuguese',
+ `El-gr` INT(11) NOT NULL DEFAULT 0 COMMENT 'Greek',
+ `Ru-ru` INT(11) NOT NULL DEFAULT 0 COMMENT 'Russian',
+ `Tr-tr` INT(11) NOT NULL DEFAULT 0 COMMENT 'Turkish',
+ `Zh-cn` INT(11) NOT NULL DEFAULT 0 COMMENT 'Chinese',
+ `Ja` INT(11) NOT NULL DEFAULT 0 COMMENT 'Japanese',
+ `Pl` INT(11) NOT NULL DEFAULT 0 COMMENT 'Polish',
+ PRIMARY KEY (id),
+ UNIQUE KEY row_id (row_id),
+ FOREIGN KEY (id) REFERENCES User_Info(id) ON DELETE CASCADE
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Chosen languages by user';
+ """)
+
+ cursor.execute(f"""
+ CREATE TABLE IF NOT EXISTS Own_Audios (
+ row_id INT(10) NOT NULL AUTO_INCREMENT COMMENT 'Row number',
+ file_id VARCHAR(300) NOT NULL COMMENT 'Telegram file id',
+ id VARCHAR(15) NOT NULL COMMENT 'User id',
+ description VARCHAR({Constants.MAX_CHARACTER_DESC}) NOT NULL COMMENT 'Short description of the audio',
+ file_type VARCHAR(15) NOT NULL COMMENT 'Audio File Type (.voice or .audio)',
+ duration INT(11) NOT NULL COMMENT 'Audio duration in seconds',
+ size INT(11) NOT NULL COMMENT 'File size in bytes',
+ user_audio_id INT(11) NOT NULL COMMENT 'User audio identification',
+ callback_code VARCHAR(36) NOT NULL,
+ times_used INT(11) NOT NULL DEFAULT 0 COMMENT 'Times that audio has been used by user',
+ PRIMARY KEY (id, file_id),
+ UNIQUE KEY row_id (row_id),
+ FOREIGN KEY (id) REFERENCES User_Info(id) ON DELETE CASCADE
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='Stores the information of own user audios';
+ """)
+
+ conn.commit()
+ Constants.STA_LOG.logger.info("MySQL tables created or confirmed to exist.")
+
+ except MySQLError as err:
+ Constants.STA_LOG.logger.exception(f"MySQL Startup error: {err}")
+ raise
+ finally:
+ if cursor:
+ cursor.close()
+ if conn:
+ conn.close()
+
+ @staticmethod
+ def _initialize_postgresql_db(host: str, port: str, user: str, password: str, database: str) -> None:
+ """
+ Initializes the PostgreSQL database and creates necessary tables if they don't exist.
+ """
+ conn = None
+ cursor = None
+
+ try:
+ # 1) Connect to the 'postgres' maintenance DB to create the target DB if needed
+ conn = psycopg.connect(
+ host=host,
+ port=port,
+ user=user,
+ password=password,
+ dbname="postgres"
+ )
+ conn.autocommit = True
+ cursor = conn.cursor()
+
+ # Check if our database already exists
+ cursor.execute(
+ "SELECT 1 FROM pg_database WHERE datname = %s",
+ (database,)
+ )
+ if cursor.fetchone() is None:
+ # Safely create the database
+ cursor.execute(
+ psycopg.sql.SQL("CREATE DATABASE {} ENCODING 'UTF8'").format(
+ psycopg.sql.Identifier(database)
+ )
+ )
+ Constants.STA_LOG.logger.info(f"PostgreSQL database '{database}' created.")
+ else:
+ Constants.STA_LOG.logger.info(f"PostgreSQL database '{database}' already exists.")
+
+ cursor.close()
+ conn.close()
+
+ # 2) Reconnect, this time to our target database
+ conn = psycopg.connect(
+ host=host,
+ port=port,
+ user=user,
+ password=password,
+ dbname=database
+ )
+ conn.autocommit = True
+ cursor = conn.cursor()
+
+ Constants.STA_LOG.logger.info(f"Connected to PostgreSQL database '{database}' — ready to create tables.")
+
+ # Create tables
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS User_Info (
+ row_id SERIAL PRIMARY KEY,
+ id VARCHAR(15) NOT NULL UNIQUE,
+ username VARCHAR(32),
+ first_name VARCHAR(255),
+ last_name VARCHAR(255),
+ last_lang VARCHAR(10)
+ );
+ """)
+
+ cursor.execute("""
+ CREATE TABLE IF NOT EXISTS Lan_Results (
+ row_id SERIAL PRIMARY KEY,
+ id VARCHAR(15) NOT NULL UNIQUE,
+ "Ar" INTEGER NOT NULL DEFAULT 0,
+ "De-de" INTEGER NOT NULL DEFAULT 0,
+ "En-uk" INTEGER NOT NULL DEFAULT 0,
+ "En-us" INTEGER NOT NULL DEFAULT 0,
+ "Es-es" INTEGER NOT NULL DEFAULT 0,
+ "Es-mx" INTEGER NOT NULL DEFAULT 0,
+ "Fr-fr" INTEGER NOT NULL DEFAULT 0,
+ "It-it" INTEGER NOT NULL DEFAULT 0,
+ "Pt-pt" INTEGER NOT NULL DEFAULT 0,
+ "El-gr" INTEGER NOT NULL DEFAULT 0,
+ "Ru-ru" INTEGER NOT NULL DEFAULT 0,
+ "Tr-tr" INTEGER NOT NULL DEFAULT 0,
+ "Zh-cn" INTEGER NOT NULL DEFAULT 0,
+ "Ja" INTEGER NOT NULL DEFAULT 0,
+ "Pl" INTEGER NOT NULL DEFAULT 0,
+ FOREIGN KEY (id) REFERENCES User_Info(id) ON DELETE CASCADE
+ );
+ """)
+
+ cursor.execute(f"""
+ CREATE TABLE IF NOT EXISTS Own_Audios (
+ row_id SERIAL PRIMARY KEY,
+ file_id VARCHAR(300) NOT NULL,
+ id VARCHAR(15) NOT NULL,
+ description VARCHAR({Constants.MAX_CHARACTER_DESC}) NOT NULL,
+ file_type VARCHAR(15) NOT NULL,
+ duration INTEGER NOT NULL,
+ size INTEGER NOT NULL,
+ user_audio_id INTEGER NOT NULL,
+ callback_code VARCHAR(36) NOT NULL,
+ times_used INTEGER NOT NULL DEFAULT 0,
+ UNIQUE(id, file_id),
+ FOREIGN KEY (id) REFERENCES User_Info(id) ON DELETE CASCADE
+ );
+ """)
+
+ Constants.STA_LOG.logger.info("PostgreSQL tables created or confirmed to exist.")
+
+ except PostgreSQLError as err:
+ Constants.STA_LOG.logger.exception(f"PostgreSQL Startup error: {err}")
+ raise
+ finally:
+ if cursor:
+ cursor.close()
+ if conn:
+ conn.close()
class Database:
"""
- Python class to perform a different set of operations in a MySQL Database.
+ Database wrapper:
+ - For MySQL, MariaDB, PostgreSQL: persistent connection with health-check and reconnect
+ - For SQLite: short-lived connections per operation
This class provides a simpler way to interact with application database than the common python
MySQL connectors. The connection is done as soon as you create an instance, but cursors are
@@ -23,20 +336,14 @@ class Database:
and provides a method to prepare a string for database.
"""
- def __init__(self, host: str, user: str, password: str, database: str) -> None:
- """
- Opens a connection to a MySQL Database system prepared to work.
+ def __init__(self, db_type: str, db_credentials: Tuple[str, ...]):
+ self.db_type = db_type.lower()
+ self.db_credentials = db_credentials
+ self.connection = None
- :param host: Host address to Database machine.
- :param user: Username used for login in Database system.
- :param password: Password used for login in Database system.
- :param database: Database where the connection will point.
- """
- self.host: str = host
- self.user: str = user
- self.database: str = database
- self.connection: Optional[mysqldb.MySQLConnection] = None
- self.connect_to_db(password)
+ # Initialize persistent connection for server-based DBs
+ if self.db_type in ("mysql", "mariadb", "postgresql"):
+ self.connect_to_db()
def __del__(self) -> None:
"""
@@ -44,114 +351,175 @@ def __del__(self) -> None:
"""
self.disconnect_from_db()
- def connect_to_db(self, password: str) -> None:
+ @staticmethod
+ def db_str(text: str) -> str:
"""
- Tries to connect to a specific Database is connections is not already done.
+ Translates a string to make it compatible with the Database system.
+
+ :param text: string to be translated
+ :return: String prepared for Database manipulation.
+ :rtype: Str.
+ """
+ if text:
+ return re.sub(r'[^A-Za-z0-9ñ\s]+', '', text)
+ else:
+ return text or ''
- :param password: Password used for login in Database system.
+ def connect_to_db(self) -> None:
"""
+ Establishes a persistent connection if not already connected.
+ """
+ if self.connection:
+ return
try:
- if not self.connection or self.connection.is_closed():
- self.connection = mysqldb.connect(host=self.host, user=self.user,
- password=password, database=self.database)
- except MySQLdbError:
- Constants.STA_LOG.logger.error(Constants.ExceptionMessages.DB_UNCONNECTED, exc_info=True)
+ if self.db_type in ("mysql", "mariadb"):
+ self.connection = mysql.connector.connect(
+ host=self.db_credentials['path'],
+ port=self.db_credentials['port'],
+ user=self.db_credentials['username'],
+ password=self.db_credentials['password'],
+ database=self.db_credentials['name']
+ )
+ elif self.db_type == "postgresql":
+ self.connection = psycopg.connect(
+ host=self.db_credentials['path'],
+ port=self.db_credentials['port'],
+ user=self.db_credentials['username'],
+ password=self.db_credentials['password'],
+ dbname=self.db_credentials['name']
+ )
+ except (MySQLError, PostgreSQLError) as err:
+ Constants.STA_LOG.logger.exception(f"Error establishing connection: {err}")
+ self.connection = None
+ raise
def disconnect_from_db(self) -> None:
"""
- Closes a Database connection.
+ Closes the persistent connection if it exists.
"""
- self.connection.close()
+ try:
+ if self.connection:
+ self.connection.close()
+ except Exception:
+ pass
+ finally:
+ self.connection = None
- def read_all(self, read_query: str) -> List[Tuple]:
+ def test_connection_and_reconnect_if_necessary(self) -> None:
"""
- Executes a read operation and returns all rows of a query result set.
+ Tests persistent connection; reconnects if lost.
+ """
+ try:
+ if self.connection:
+ cursor = self.connection.cursor()
+ cursor.execute("SELECT 1")
+ cursor.close()
+ else:
+ self.connect_to_db()
+ except Exception:
+ self.disconnect_from_db()
+ self.connect_to_db()
- :param read_query: MySQL read statement.
- :return: Query result set from execution of a MySQL statement.
- :rtype: List[tuple] or empty list[].
+ def _get_connection(self):
"""
+ Returns a connection. For SQLite, opens a new one each call; for others, returns persistent.
+ """
+ if self.db_type == "sqlite":
+ conn = sqlite3.connect(self.db_credentials['path'])
+ conn.row_factory = sqlite3.Row
+ conn.execute("PRAGMA foreign_keys = ON;")
+ return conn
+
+ # For server-based, ensure health
self.test_connection_and_reconnect_if_necessary()
+ return self.connection
+
+ def read_all(self, read_query: str, params: Tuple = ()) -> List[Tuple]:
+ """
+ Executes a read and returns all rows.
+ """
+ conn = None
cursor = None
try:
- cursor = self.connection.cursor(buffered=True)
- cursor.execute(read_query)
- result = cursor.fetchall()
- return result
- except MySQLdbError:
+ conn = self._get_connection()
+ cursor = conn.cursor()
+ cursor.execute(read_query, params)
+ return cursor.fetchall()
+ except (SQLiteError, MySQLError, PostgreSQLError) as e:
Constants.STA_LOG.logger.exception(Constants.ExceptionMessages.DB_READ + read_query, exc_info=True)
return []
finally:
- if cursor:
- cursor.close()
+ if self.db_type == "sqlite":
+ if cursor: cursor.close()
+ if conn: conn.close()
+ else:
+ if cursor: cursor.close()
- def read_one(self, read_query: str) -> Optional[Tuple]:
+ def read_one(self, read_query: str, params: Tuple = ()) -> Optional[Tuple]:
"""
- Executes a read operation and returns just one row of the query result set.
-
- :param read_query: MySQL read statement.
- :return: Query result row from execution of a MySQL statement.
- :rtype: Tuple[] or None.
+ Executes a read and returns a single row.
"""
- self.test_connection_and_reconnect_if_necessary()
+ conn = None
cursor = None
try:
- cursor = self.connection.cursor(buffered=True)
- cursor.execute(read_query)
- result = cursor.fetchone()
- return result
- except MySQLdbError:
+ conn = self._get_connection()
+ cursor = conn.cursor()
+ cursor.execute(read_query, params)
+ return cursor.fetchone()
+ except (SQLiteError, MySQLError, PostgreSQLError):
Constants.STA_LOG.logger.exception(Constants.ExceptionMessages.DB_READ + read_query, exc_info=True)
return None
finally:
- if cursor:
- cursor.close()
+ if self.db_type == "sqlite":
+ if cursor: cursor.close()
+ if conn: conn.close()
+ else:
+ if cursor: cursor.close()
- def write_all(self, write_query: str) -> Union[int, bool]:
+ def delete_one(self, delete_query: str, params: Tuple = ()) -> Union[int, bool]:
"""
- Executes an insert or update operation and returns the number of modified rows or False if
- the operation does not change anything.
-
- :param write_query: MySQL insert or update statement.
- :return: Number of modified rows or False if no changes were done.
- :rtype: Int or bool.
+ Executes a delete and returns affected rowcount or False.
"""
- self.test_connection_and_reconnect_if_necessary()
+ conn = None
cursor = None
try:
- cursor = self.connection.cursor(buffered=True)
- cursor.execute(write_query)
- self.connection.commit()
+ conn = self._get_connection()
+ cursor = conn.cursor()
+ cursor.execute(delete_query, params)
+ conn.commit()
return cursor.rowcount
- except MySQLdbError:
- if self.connection.is_connected():
- self.connection.rollback()
- Constants.STA_LOG.logger.exception(Constants.ExceptionMessages.DB_WRITE + write_query, exc_info=True)
+ except (SQLiteError, MySQLError, PostgreSQLError):
+ if conn:
+ conn.rollback()
+ Constants.STA_LOG.logger.exception(Constants.ExceptionMessages.DB_DELETE + delete_query, exc_info=True)
return False
finally:
- if cursor:
- cursor.close()
+ if self.db_type == "sqlite":
+ if cursor: cursor.close()
+ if conn: conn.close()
+ else:
+ if cursor: cursor.close()
- def test_connection_and_reconnect_if_necessary(self) -> None:
+ def write_all(self, write_query: str, params: Tuple = ()) -> Union[int, bool]:
"""
- Tests if connection is available and reconnects if the connections has been lost.
+ Executes insert/update and returns affected rowcount or False.
"""
- if not self.connection or self.connection.is_closed():
- try:
- self.connection.ping(reconnect=True, attempts=5, delay=0)
- except MySQLdbError:
- Constants.STA_LOG.logger.error(Constants.ExceptionMessages.DB_UNCONNECTED, exc_info=True)
-
- @staticmethod
- def db_str(text: str) -> str:
- """
- Translates a string to make it compatible with the Database system.
-
- :param text: string to be translated
- :return: String prepared for Database manipulation.
- :rtype: Str.
- """
- if text:
- return re.sub('[^A-Za-z0-9ñ\s]+', '', text)
- else:
- return text or ''
+ conn = None
+ cursor = None
+ try:
+ conn = self._get_connection()
+ cursor = conn.cursor()
+ cursor.execute(write_query, params)
+ conn.commit()
+ return cursor.rowcount
+ except (SQLiteError, MySQLError, PostgreSQLError):
+ if conn:
+ conn.rollback()
+ Constants.STA_LOG.logger.exception(Constants.ExceptionMessages.DB_WRITE + write_query, exc_info=True)
+ return False
+ finally:
+ if self.db_type == "sqlite":
+ if cursor: cursor.close()
+ if conn: conn.close()
+ else:
+ if cursor: cursor.close()
diff --git a/txt2SpeechBot/helpers/fileProcessing.py b/txt2SpeechBot/helpers/fileProcessing.py
index 721d348..fb890af 100755
--- a/txt2SpeechBot/helpers/fileProcessing.py
+++ b/txt2SpeechBot/helpers/fileProcessing.py
@@ -6,9 +6,11 @@
"""
import json
+import os
from typing import Optional, List, Tuple, Dict, Union
from pathlib import Path
-from helpers.literalConstants import LiteralConstants
+from ..helpers.literalConstants import LiteralConstants
+from dotenv import dotenv_values
class FileProcessing:
@@ -42,16 +44,29 @@ def read_file(self) -> Optional[Union[str, List, Tuple, Dict, bytes]]:
:rtype: Str or list or tuple or dict or bytes or None.
"""
try:
- file = open(self.path, 'r') if self.file_type != LiteralConstants.FileType.BYTES else open(self.path, 'rb')
- if self.file_type == LiteralConstants.FileType.REG or self.file_type == LiteralConstants.FileType.BYTES:
- return file.read()
- elif self.file_type == LiteralConstants.FileType.JSON:
- return json.load(file)
+ if self.file_type == LiteralConstants.FileType.ENV:
+ file_env = dotenv_values(self.path)
+
+ # Merge file values with os.environ, giving priority to env vars set in the environment
+ merged_env = {**file_env, **os.environ}
+
+ if file_env:
+ LiteralConstants.STA_LOG.logger.info(f"Config loaded from .env file at {self.path} with environment overrides.")
+ else:
+ LiteralConstants.STA_LOG.logger.info(f"No .env file found at {self.path}. Loaded config from environment variables only.")
+
+ return merged_env
+
+ with open(self.path, 'r' if self.file_type != LiteralConstants.FileType.BYTES else 'rb') as file:
+ if self.file_type == LiteralConstants.FileType.REG or self.file_type == LiteralConstants.FileType.BYTES:
+ return file.read()
+ elif self.file_type == LiteralConstants.FileType.JSON:
+ return json.load(file)
+
except EnvironmentError:
LiteralConstants.STA_LOG.logger.exception(LiteralConstants.ExceptionMessages.FILE_CANT_OPEN, exc_info=True)
return None
- else:
- file.close()
+
def write_file(self, data: Union[str, List, Tuple, Dict]) -> bool:
"""
@@ -63,15 +78,13 @@ def write_file(self, data: Union[str, List, Tuple, Dict]) -> bool:
:rtype: Bool.
"""
try:
- file = open(self.path, 'w') if self.file_type != LiteralConstants.FileType.BYTES else open(self.path, 'wb')
- if self.file_type == LiteralConstants.FileType.REG or self.file_type == LiteralConstants.FileType.BYTES:
- file.write(data)
- return True
- elif self.file_type == LiteralConstants.FileType.JSON:
- json.dump(data, file)
- return True
+ with open(self.path, 'w' if self.file_type != LiteralConstants.FileType.BYTES else 'wb') as file:
+ if self.file_type == LiteralConstants.FileType.REG or self.file_type == LiteralConstants.FileType.BYTES:
+ file.write(data)
+ return True
+ elif self.file_type == LiteralConstants.FileType.JSON:
+ json.dump(data, file)
+ return True
except EnvironmentError:
LiteralConstants.STA_LOG.logger.exception(LiteralConstants.ExceptionMessages.FILE_CANT_WRITE, exc_info=True)
return False
- else:
- file.close()
diff --git a/txt2SpeechBot/helpers/literalConstants.py b/txt2SpeechBot/helpers/literalConstants.py
index 65f4e34..87bc943 100755
--- a/txt2SpeechBot/helpers/literalConstants.py
+++ b/txt2SpeechBot/helpers/literalConstants.py
@@ -9,7 +9,7 @@
from collections import OrderedDict
from operator import itemgetter
from typing import Dict, List
-from helpers.logger import Logger
+from ..helpers.logger import Logger
class LiteralConstants:
@@ -24,6 +24,7 @@ class FileType(enum.Enum):
REG = 1
JSON = 2
BYTES = 3
+ ENV = 4
class ChatType:
""" Supported telegram chat types by the bot. """
@@ -34,35 +35,113 @@ class BotCommands:
""" Available commands for Txt2SpeechBot. """
HELP: str = "help"
START: str = "start"
+ TO_VOICE: str = "to_voice"
ADD_AUDIO: str = "addaudio"
LST_AUDIO: str = "listaudio"
RM_AUDIO: str = "rmaudio"
RM_ALL_AUDIOS: str = "rmallaudios"
+ TTS: str = "tts"
+ PRIVACY_INFO: str = "privacy"
+ DELETE_USER_DATA: str = "delete_my_data"
+
+ class AdminCommands:
+ """ Available commands for admins of Txt2SpeechBot. """
+ DELETE_USER: str = "delete_user"
+ CMD_LIST: str = "cmd"
+
+ class OwnerCommands:
+ """ Available commands for owner of Txt2SpeechBot. """
+ LOG: str = "log"
class BotAnswers:
""" Bot answers to user interaction. """
- SEND_AUDIO: str = "Send audio or voice note."
- MAX_OWN_AUDIOS: str = "Sorry, you reached maximun number of stored audios (50). Try removing some of them with /rmaudio command."
- PROVIDE_DESC: str = "Saved!\n\nProvide now a short description for the audio. 30 character allowed."
- NOT_AUDIO: str = "Audio file are not detected. Are you sure you've uploaded the correct file? Try it again with /addaudio command."
- WRONG_DESC: str = "Wrong input. Write a short description to save the audio. 30 characters maximum."
+ random_callback_answers = [
+ "❌ These buttons are not for you. Get your personal buttons with /tts.",
+ "❌ Must. Click. Button. Use /tts",
+ "❌ Sure you like buttons. How about /tts?",
+ "❌ Buttons stealer! Steal your own buttons with /tts",
+ ]
+ SEND_AUDIO: str = "Send or quote reply an audio or voice note "
+ MAX_OWN_AUDIOS: str = "Sorry, you reached maximun number of stored audios ({}). Try removing some of them with /rmaudio command."
+ PROVIDE_DESC: str = "Provide now a short description for the audio. {} character allowed."
+ NOT_AUDIO: str = "Audio file not detected. Are you sure it's a correct multimedia file? Try it again with /{} command."
+ WRONG_DESC: str = "Wrong input. Write a short description to save the audio. {} characters maximum."
USED_DESC: str = "Description is already in use. Please, write another one."
- SAVED: str = "Saved audio with description: \"%s\""
+ SAVED: str = "Saved audio with description: \"{}\""
LST_NONE_AUDIO: str = "Sorry, you don't have any uploaded audio... Try to upload one with /addaudio command."
RM_AUDIO: str = "Send the description of the audio you want to remove."
- RM_ALL_AUDIO: str = "Are you completely sure you want to delete all your audios? Answer 'CONFIRM' in uppercase to verify this action."
+ RM_ALL_AUDIO: str = "Are you completely sure you want to delete all your {}? Answer 'CONFIRM' in uppercase to verify this action."
RM_ALL_NOT_CONFIRM: str = "You should have answered 'CONFIRM' to validate the deletion. Canceling action."
RM_DESC_NOT_TEXT: str = "Wrong input. Send the description of the audio you want to remove. Try again /rmaudio."
RM_USED_DESC: str = "No audio with the provided description. Please, send the correct description. Try again /rmaudio."
DELETED_AUDIO: str = "The file was deleted from your audios."
- DELETED_ALL_AUDIO: str = "All your audios were deleted successfully."
+ DELETED_ALL_AUDIO: str = "All {} deleted successfully."
+ CONVERTING_TO_VOICE: str = "Parsing file to telegram voice format..."
+ FILE_SIZE_TOO_BIG: str = "Sorry, only up to 20MB files can be processed. Your file size is {}"
+ UNKNOWN_MEDIA_TYPE: str = "Unknown file format {}. Please, send another media file."
+ REPLY_TO_MSG: str = "The command /{} can only be used by replying to a message"
+ IS_ALREADY_VOICE: str = "The replied message is already a Telegram Voice file."
+ AUDIO_ALREADY_SAVED: str = "You already saved this file. View it with /listaudio or inline."
+ GENERIC_ERROR: str = "❌ Something went wrong. The developer has been notified."
+ USER_NOT_FOUND: str = "User {} not found in database."
+ UNABLE_TO_DELETE_USER: str = "Unable to delete User {} from database"
+ LANGUAGE_UNSUPPORTED: str = "❌ Unsupported language code: {lang_code}.\n\nChoose one of the following: {supported_text}"
+ NO_TTS_TEXT: str = "Please provide text to convert or reply to a text message.\nQuote & Reply feature is supported.\nExamples:\n/tts こんにちは\n/tts_ja こんにちは"
+ ADMIN_CMD_LIST: str = "/delete_user {user_id} > Manually delete a user from database\n/log > Send log files"
+ HELP_MSG: str = """
+Hello, thanks for using me.
+I'm @{bot_username}, the leading Text to Speech inline telegram bot. I can offer you audios in different languages from your own text.
+Write my username in any chat and the text you want in the audio. For example, try to write the next sequence on any chat:
+
+@{bot_username} Hello World
+
+Then, choose the language and you'll be able to enjoy your audio.
+
+‼‼️️ NEW FEATURE ‼️‼️
+
+@{bot_username} has a new feature! You can store up to {max_audios} audios in the bot so you can use them inline in any conversation.
+
+Open a private chat with the bot (@{bot_username}) and write /addaudio to store any audio.
+Send one audio in telegram voice note format (.ogg) or any other audio format, with no size limitation, and a description for that audio.
+For using that uploaded audio, just write my username in any chat like this:
+
+@{bot_username}
+
+and a list with your uploaded audios will appear, then choose one to send.
+
+You can manage your uploaded audios with /listaudio, /rmaudio and /rmallaudios commands in bot dm.
+
+Additional commands:
+- /to_voice > reply to a multimedia file (max size: {max_size}) to convert it to Telegram Voice.
+- /tts > write text or reply to a text file for non inline text2speech. Can also be used by specifying langcode: /tts_en_us
+
+- /privacy > see what data this bot collects for functioning in bot dm.
+- /delete_my_data > removes all stored data about you in database in bot dm.
+ """
+ PRIVACY_MSG: str = """
+You can delete all your stored data at any time with /delete_my_data
+
+This bot collects the following data:
+
+From Telegram (public info):
+ - User ID: to recognize your Telegram account
+ - Username, first name, last name: if publicly available
+
+Bot-related data:
+ - Last language used: to default to your recent preference
+ - Language usage statistics: a count of how often each language was used
+
+When using /addaudio:
+ - Telegram File ID: the unique reference used by Telegram to resend your file (files are not downloaded nor stored by the bot)
+ - Description: the name or label you assign to the file
+ - File Type, Duration, Size: metadata used to organize your audio files
+ - Total saved files: how many you've saved
+ - Usage count per file: how often each saved file is played or used
+ """
class FilePath:
""" File path to required project files. """
- TOKEN: str = "data/token.txt"
- DB: str = "data/db.json"
- TTS: str = "data/magic.txt"
- HELP_MSG: str = "data/help.txt"
+ ENV_FILE: str = ".env"
STA_LOG: str = "data/status.log"
MSG_LOG: str = "data/messages.log"
QRY_LOG: str = "data/queries.log"
@@ -74,10 +153,13 @@ class ExceptionMessages:
DB_DISCONNECTED: str = "DB | Disconnected from MySQL database server"
DB_READ: str = "DB | Unable to fetch data from database\nSQL query: "
DB_WRITE: str = "DB | Unable to write data in database\nSQL query: "
+ DB_DELETE: str = "DB | Unable to delete data in database\nSQL query: "
FILE_CANT_OPEN: str = "File | Unable to open requested file\n"
FILE_CANT_WRITE: str = "File | Unable to write provided data in this file\n"
AUDIO_ERROR: str = "AUDIO | Unable to open file with mimetype %s\n"
UNEXPECTED_ERROR: str = "Error | An unexpected error has occured\n"
+ DB_USER_NOT_FOUND: str = "DB | Unable to find User {} in database"
+ DB_USER_DELETE_ERROR: str = "Unable to delete User {} from database requested by {}"
STA_LOG: Logger = Logger("Status log", FilePath.STA_LOG)
MSG_LOG: Logger = Logger("Message logger", FilePath.MSG_LOG)
@@ -131,3 +213,8 @@ class ExceptionMessages:
CONTENT_TYPES: List[str] = ['audio', 'voice', 'video']
MIME_TYPES: List[str] = ['audio', 'video']
+
+ SUPPORTED_DB: List[str] = ['sqlite', 'mysql', 'mariadb', 'postgresql']
+
+ # max allowed download size depending on telegram bot library used, for PyTelegramBot it's 20MB
+ MAX_DOWNLOAD_BYTES: int = 20 * 1024 * 1024
diff --git a/txt2SpeechBot/helpers/logger.py b/txt2SpeechBot/helpers/logger.py
index 1e182fb..4578850 100755
--- a/txt2SpeechBot/helpers/logger.py
+++ b/txt2SpeechBot/helpers/logger.py
@@ -4,11 +4,13 @@
"""
File containing Logger class.
"""
-
-import logging
import os
import sys
-from helpers.loggerFormatter import LoggerFormatter
+import logging
+from logging.handlers import TimedRotatingFileHandler
+
+from ..helpers.loggerFormatter import LoggerFormatter
+from ..helpers.telegram_logger import TelegramExceptionHandler
class Logger:
@@ -24,23 +26,45 @@ class Logger:
BASE_PATH: str = os.getcwd() + "/"
""" Root project directory path. """
- def __init__(self, name: str, file: str, level: int = logging.INFO) -> None:
+ def __init__(self, name: str, file: str, level: int = logging.INFO, rotating_when: str = "W0", rotation_interval: int = 1, backup_count: int = 4, utc: bool = True) -> None:
"""
- Creates a logger channel ready to record different actions.
+ Creates a rotating logger channel ready to record different actions.
+ Default rotation is every Monday UTC with 4 backups (total 35d of history)
:param name: Name of the logger.
:param file: Path to logger file in disk.
:param level: Sets the threshold of the logger, defaults to logging.INFO.
+ :param when: When to rotate log ('midnight', 'D', 'W0', etc.).
+ :param interval: Number of time units between rotations.
+ :param backup_count: Number of backup logs to keep.
+ :param utc: Whether to use UTC time or local system time for rotation.
"""
self.filename: str = Logger.BASE_PATH + file
self.name: str = name
self.level: int = level
- self.handler: logging.FileHandler = logging.FileHandler(self.filename)
+
+ log_dir = os.path.dirname(self.filename)
+ os.makedirs(log_dir, exist_ok=True)
+
+ #self.handler: logging.FileHandler = logging.FileHandler(self.filename)
+ self.handler: TimedRotatingFileHandler = TimedRotatingFileHandler(
+ self.filename,
+ when=rotating_when,
+ interval=rotation_interval,
+ backupCount=backup_count,
+ encoding='utf-8',
+ utc=utc
+ )
self.handler.setFormatter(LoggerFormatter())
self.logger: logging.Logger = logging.getLogger(self.name)
self.logger.setLevel(self.level)
self.logger.addHandler(self.handler)
+ def add_telegram_handler(self, tts_instance, log_file_path):
+ telegram_handler = TelegramExceptionHandler(tts_instance, log_file_path)
+ telegram_handler.setLevel(logging.ERROR)
+ self.logger.addHandler(telegram_handler)
+
def log_also_to_stdout(self) -> None:
"""
Add a channel to the logger to display its messages also in standard output.
diff --git a/txt2SpeechBot/helpers/telegram_logger.py b/txt2SpeechBot/helpers/telegram_logger.py
new file mode 100644
index 0000000..d76f0fc
--- /dev/null
+++ b/txt2SpeechBot/helpers/telegram_logger.py
@@ -0,0 +1,37 @@
+import traceback, logging
+from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
+from telebot.formatting import escape_html
+
+
+class TelegramExceptionHandler(logging.Handler):
+ def __init__(self, tts_instance, log_file_path):
+ super().__init__()
+ self.tts = tts_instance
+ self.log_file_path = log_file_path
+
+ def emit(self, record):
+ try:
+ if record.exc_info:
+ exc_type, exc_value, exc_traceback = record.exc_info
+ full_traceback = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
+ escaped_traceback = escape_html(full_traceback)
+
+ log_msg = escape_html(record.getMessage())
+
+ error_summary = (
+ f"🚨 Exception: {exc_type.__name__} — {exc_value}\n"
+ f"📌 Context: {log_msg}\n"
+ f"——————————————\nTraceBack:{escaped_traceback}"
+ )
+
+ buttons = [
+ InlineKeyboardButton(
+ text="Send log file",
+ callback_data=f"log|{self.log_file_path}"
+ )
+ ]
+
+ self.tts.send_error_to_owner(error_summary, reply_markup=InlineKeyboardMarkup([buttons]), parse_mode='HTML')
+
+ except Exception as telegram_error:
+ logging.getLogger("Status log").warning(f"Could not notify owner: {telegram_error}")
\ No newline at end of file
diff --git a/txt2SpeechBot/helpers/user.py b/txt2SpeechBot/helpers/user.py
index a53cd99..4256260 100644
--- a/txt2SpeechBot/helpers/user.py
+++ b/txt2SpeechBot/helpers/user.py
@@ -7,9 +7,9 @@
from telebot import types
from typing import Optional
-from helpers.constants import Constants
-from helpers.database import Database
-from helpers.utils import Utils
+from ..helpers.constants import Constants, DBStatements
+from ..helpers.database import Database
+from ..helpers.utils import Utils
class User:
@@ -21,7 +21,7 @@ class User:
"""
def __init__(self, internal_id: str = "", user_id: str = "", username: str = "", first_name: str = "",
- last_name: str = "") -> None:
+ last_name: str = "", last_lang: Optional[str] = None) -> None:
"""
Creates a object that represent an user of the application.
@@ -33,9 +33,10 @@ def __init__(self, internal_id: str = "", user_id: str = "", username: str = "",
"""
self.internal_id = internal_id
self.user_id = user_id
- self.username = username
- self.first_name = first_name
- self.last_name = last_name
+ self.username = username or ""
+ self.first_name = first_name or ""
+ self.last_name = last_name or ""
+ self.last_lang = last_lang
def __eq__(self, other_user: 'User') -> bool:
"""
@@ -65,15 +66,32 @@ def get_user_from_db(cls, user_id: str) -> Optional['User']:
:rtype: User if exists, None in other case.
"""
db_conn = Utils.create_db_conn()
- sql_read = Constants.DBStatements.USER_READ % str(user_id)
- result = db_conn.read_one(sql_read)
+ #sql_read = Constants.DBStatements.USER_READ % str(user_id)
+ result = db_conn.read_one(DBStatements.USER_READ, (str(user_id),))
return cls(
internal_id=result[0],
user_id=result[1],
username=result[2],
first_name=result[3],
- last_name=result[4]
+ last_name=result[4],
+ last_lang=result[5]
) if result else None
+
+ @classmethod
+ def get_last_lang(cls, user_id: int) -> Optional[str]:
+ db = Utils.create_db_conn()
+ result = db.read_one(DBStatements.USER_GET_LAST_LANG, (str(user_id),))
+ return result[0] if result and result[0] else None
+
+ @classmethod
+ def set_last_lang(cls, user_id: int, lang_code: str) -> None:
+ db = Utils.create_db_conn()
+ db.write_all(DBStatements.USER_SET_LAST_LANG, (lang_code, str(user_id)))
+
+ def update_last_lang(self, lang_code: str) -> None:
+ self.last_lang = lang_code
+ db = Utils.create_db_conn()
+ db.write_all(DBStatements.USER_SET_LAST_LANG, (lang_code, str(self.user_id)))
@classmethod
def get_user_from_telegram_user(cls, user: types.User) -> 'User':
@@ -96,19 +114,18 @@ def store(self) -> None:
Initializes and stores an user in our bot application.
"""
db_conn = Utils.create_db_conn()
- sql_insert_user_info = Constants.DBStatements.USER_INSERT % (self.user_id, self.username,
- self.first_name, self.last_name)
- sql_insert_chosen_result = Constants.DBStatements.LAN_INSERT % self.user_id
- db_conn.write_all(sql_insert_user_info)
- db_conn.write_all(sql_insert_chosen_result)
+ #sql_insert_user_info = Constants.DBStatements.USER_INSERT % (self.user_id, self.username, self.first_name, self.last_name)
+ #sql_insert_chosen_result = Constants.DBStatements.LAN_INSERT % self.user_id
+ db_conn.write_all(DBStatements.USER_INSERT, (str(self.user_id), self.username, self.first_name, self.last_name))
+ db_conn.write_all(DBStatements.LAN_INSERT, (str(self.user_id),))
def update(self) -> None:
"""
Updates user's parameters in database.
"""
- sql_update = Constants.DBStatements.USER_UPDATE % (self.username, self.first_name, self.last_name, self.user_id)
+ #sql_update = Constants.DBStatements.USER_UPDATE % (self.username, self.first_name, self.last_name, self.user_id)
db_conn = Utils.create_db_conn()
- db_conn.write_all(sql_update)
+ db_conn.write_all(DBStatements.USER_UPDATE, (self.username, self.first_name, self.last_name, str(self.user_id)))
@staticmethod
def validate_user_from_telegram(user: types.User) -> None:
diff --git a/txt2SpeechBot/helpers/utils.py b/txt2SpeechBot/helpers/utils.py
index 17108a8..1193151 100755
--- a/txt2SpeechBot/helpers/utils.py
+++ b/txt2SpeechBot/helpers/utils.py
@@ -4,11 +4,11 @@
"""
File containing Utils class.
"""
-
import uuid
from telebot import types
-from helpers.constants import Constants
-from helpers.database import Database
+
+from ..helpers.constants import Constants
+from ..helpers.database import Database
class Utils:
@@ -24,8 +24,10 @@ def create_db_conn() -> Database:
:return: Database object connection.
:rtype: Database.
"""
- return Database(Constants.DB_CREDENTIALS[0], Constants.DB_CREDENTIALS[1],
- Constants.DB_CREDENTIALS[2], Constants.DB_CREDENTIALS[3])
+ db_type = Constants.DB_TYPE.lower()
+ db_credentials = Constants.DB_CREDENTIALS
+
+ return Database(db_type, db_credentials)
@staticmethod
def generate_unique_str() -> str:
@@ -44,12 +46,14 @@ def record_message(msg: types.Message) -> None:
:param msg: Telegram message.
"""
+ if not Constants.SAVE_INPUT_MSG:
+ return
+
if msg.content_type == 'text':
if msg.chat.type == Constants.ChatType.PRIVATE:
text = "%s (%s): %s" % (msg.from_user.username, str(msg.chat.id), msg.text)
else:
- text = "%s (%s) in %s [%s]: %s" % (msg.from_user.username, str(msg.from_user.id), msg.chat.title,
- str(msg.chat.id), msg.text)
+ text = "%s (%s) in %s [%s]: %s" % (msg.from_user.username, str(msg.from_user.id), msg.chat.title, str(msg.chat.id), msg.text)
Constants.MSG_LOG.logger.info(text)
@staticmethod
@@ -59,5 +63,26 @@ def record_query(query: types.InlineQuery) -> None:
:param query: Telegram inline query.
"""
+ if not Constants.SAVE_INPUT_QUERIES:
+ return
+
text = "%s (%s) %s" % (query.from_user.username, str(query.from_user.id), query.query)
Constants.QRY_LOG.logger.info(text)
+
+ @staticmethod
+ def format_bytes(size_bytes: int) -> str:
+ """
+ Converts a size in bytes to a human-readable string in KB, MB, or GB.
+
+ :param size_bytes: Size in bytes.
+ :return: Human-readable size string.
+ :rtype: str
+ """
+ if size_bytes < 1024:
+ return f"{size_bytes} B"
+ elif size_bytes < 1024 ** 2:
+ return f"{size_bytes / 1024:.2f} KB"
+ elif size_bytes < 1024 ** 3:
+ return f"{size_bytes / (1024 ** 2):.2f} MB"
+ else:
+ return f"{size_bytes / (1024 ** 3):.2f} GB"
diff --git a/txt2SpeechBot/textToSpeech/language.py b/txt2SpeechBot/textToSpeech/language.py
index c7c99ee..1ced3d8 100644
--- a/txt2SpeechBot/textToSpeech/language.py
+++ b/txt2SpeechBot/textToSpeech/language.py
@@ -7,7 +7,7 @@
from operator import attrgetter
from typing import List, Tuple
-from helpers.constants import Constants
+from ..helpers.constants import Constants
class Language:
diff --git a/txt2SpeechBot/textToSpeech/ttsAudio.py b/txt2SpeechBot/textToSpeech/ttsAudio.py
index 8cf55a4..0579d70 100644
--- a/txt2SpeechBot/textToSpeech/ttsAudio.py
+++ b/txt2SpeechBot/textToSpeech/ttsAudio.py
@@ -10,9 +10,10 @@
from telebot import types
from collections import OrderedDict
from typing import List, Optional, Tuple
-from helpers.constants import Constants
-from helpers.utils import Utils
-from textToSpeech.language import Language
+from ..helpers.constants import Constants, DBStatements
+from ..helpers.utils import Utils
+from ..helpers.user import User
+from ..textToSpeech.language import Language
class TTSAudio:
@@ -35,6 +36,13 @@ class TTSAudio:
REGEX_ARABIC: str = "[\u0600-\u06ff]|[\u0750-\u077f]|[\ufb50-\ufbc1]|[\ufbd3-\ufd3f]|" + \
"[\ufd50-\ufd8f]|[\ufd92-\ufdc7]|[\ufe70-\ufefc]|[\uFDF0-\uFDFD]"
"""Regex to find Arabic characters."""
+
+ @staticmethod
+ def process_single(lang_code: str, text: str, queries: OrderedDict) -> Tuple[str, types.InlineKeyboardButton]:
+ magic = TTSAudio.generate_voice_content(text) + lang_code
+ markup = types.InlineKeyboardMarkup()
+ markup.add(types.InlineKeyboardButton("Text", callback_data=TTSAudio.store_tts_query(text, queries)))
+ return magic, markup
@staticmethod
def create_inline_results_tts_audio(query: types.InlineQuery, queries: OrderedDict
@@ -52,8 +60,8 @@ def create_inline_results_tts_audio(query: types.InlineQuery, queries: OrderedDi
markup = types.InlineKeyboardMarkup()
magic = TTSAudio.generate_voice_content(query.query)
markup.add(types.InlineKeyboardButton("Text", callback_data=TTSAudio.store_tts_query(query.query, queries)))
- sql_read = Constants.DBStatements.LAN_READ % str(query.from_user.id)
- result = db_conn.read_one(sql_read)
+ #sql_read = Constants.DBStatements.LAN_READ % str(query.from_user.id)
+ result = db_conn.read_one(DBStatements.LAN_READ, (str(query.from_user.id),))
if result:
return TTSAudio.__create_inline_results_with_db_entry(query.query, magic, markup, result)
else:
@@ -113,7 +121,7 @@ def __build_all_inline_results(query: str, languages: List[Language], magic: str
else:
for index, language in enumerate(languages):
inline_results.append(types.InlineQueryResultVoice(
- str(index + 1),
+ f"tts:{language.code}",
magic + language.code,
language.title,
reply_markup=markup
@@ -149,15 +157,26 @@ def update_chosen_results_tts_audio(chosen_result: types.ChosenInlineResult) ->
:param chosen_result: Telegram chosen inline result.
"""
+ if not chosen_result.result_id.startswith("tts:"):
+ return
+
+ lang_code = chosen_result.result_id[4:]
+
db_conn = Utils.create_db_conn()
- sql_read = Constants.DBStatements.LAN_READ % str(chosen_result.from_user.id)
- result = db_conn.read_one(sql_read)
+ #sql_read = Constants.DBStatements.LAN_READ % str(chosen_result.from_user.id)
+ result = db_conn.read_one(DBStatements.LAN_READ, (str(chosen_result.from_user.id),))
+ times = 1
if result:
sorted_languages = Language.get_languages_sorted_for_user(result)
- lan = sorted_languages[int(chosen_result.result_id) - 1].code
- times = sorted_languages[int(chosen_result.result_id) - 1].record_use()
- sql_update = Constants.DBStatements.LAN_UPDATE_FOR_CHOSEN_RESULT % (lan, times, chosen_result.from_user.id)
- db_conn.write_all(sql_update)
+ for lang in sorted_languages:
+ if lang.code == lang_code:
+ times = lang.record_use()
+ break
+
+ #sql_update = Constants.DBStatements.LAN_UPDATE_FOR_CHOSEN_RESULT % (lan, times, chosen_result.from_user.id)
+ sql_update = DBStatements.LAN_UPDATE_FOR_CHOSEN_RESULT.format(DBStatements._quote(lang_code))
+ db_conn.write_all(sql_update, (times, str(chosen_result.from_user.id)))
+ User.set_last_lang(chosen_result.from_user.id, lang_code)
@staticmethod
def store_tts_query(text: str, queries: OrderedDict) -> str:
diff --git a/txt2SpeechBot/ttsbot.py b/txt2SpeechBot/ttsbot.py
index 8b285ce..04fe00d 100644
--- a/txt2SpeechBot/ttsbot.py
+++ b/txt2SpeechBot/ttsbot.py
@@ -4,22 +4,20 @@
"""
File containing Text_To_Speech_Bot class.
"""
-
-import requests
-import time
-import telebot
+import os
import magic
+import telebot
from telebot import types
-from typing import List, Callable, Dict, Optional
-from collections import OrderedDict
-from pydub import AudioSegment
from io import BytesIO
-from helpers.constants import Constants
-from helpers.database import Database
-from helpers.utils import Utils
-from helpers.user import User
-from textToSpeech.ttsAudio import TTSAudio
-from audioStore.storedAudio import StoredAudio
+from pydub import AudioSegment
+from collections import OrderedDict
+from typing import List, Callable, Dict, Optional, Any, Union
+
+from .helpers.constants import Constants
+from .helpers.utils import Utils
+from .helpers.user import User
+from .textToSpeech.ttsAudio import TTSAudio
+from .audioStore.storedAudio import StoredAudio
# region Bot Class
@@ -37,14 +35,149 @@ def __init__(self, bot: telebot.TeleBot):
"""
self.bot: telebot.TeleBot = bot
self.bot.set_update_listener(self.listener)
- self.bot.enable_save_next_step_handlers(delay=3)
- self.bot.load_next_step_handlers()
self.queries: OrderedDict = OrderedDict()
self.next_step_focused: Dict[str, types.Message] = {}
Constants.MSG_LOG.log_also_to_stdout()
Constants.QRY_LOG.log_also_to_stdout()
Constants.STA_LOG.log_also_to_stdout()
+ @staticmethod
+ def get_chatid(update_or_id) -> tuple[int|None, str]:
+ if hasattr(update_or_id, 'chat') and update_or_id.chat:
+ chat_id = update_or_id.chat.id
+ username = getattr(update_or_id.chat, 'username', 'Unknown')
+ else:
+ chat_id = None
+ username = 'Unknown'
+
+ return chat_id, username
+
+ @staticmethod
+ def get_userid(update_or_id) -> tuple[int|None, str]:
+ if hasattr(update_or_id, 'from_user') and update_or_id.from_user:
+ chat_id = update_or_id.from_user.id
+ username = getattr(update_or_id.from_user, 'username', 'Unknown')
+ else:
+ chat_id = None
+ username = 'Unknown'
+
+ return chat_id, username
+
+ @staticmethod
+ def is_owner(update_or_id: Union[Any, int], log_unauthorized: bool = Constants.SAVE_UNAUTHORIZED) -> bool:
+ if isinstance(update_or_id, int):
+ return update_or_id == Constants.OWNER_ID
+
+ chat_id, username = Text_To_Speech_Bot.get_userid(update_or_id)
+
+ is_owner = chat_id == Constants.OWNER_ID if Constants.OWNER_ID else False
+
+ if is_owner:
+ return True
+
+ if log_unauthorized:
+ Constants.STA_LOG.logger.info(f"Unauthorized owner access attempt from chat {username} {chat_id}")
+
+ return False
+
+ @staticmethod
+ def is_admin(update_or_id: Union[Any, int], log_unauthorized: bool = Constants.SAVE_UNAUTHORIZED) -> bool:
+ if isinstance(update_or_id, int):
+ return update_or_id in Constants.ADMINS_IDS
+
+ chat_id, username = Text_To_Speech_Bot.get_userid(update_or_id)
+ is_admin = chat_id in Constants.ADMINS_IDS if Constants.ADMINS_IDS else False
+
+ if is_admin:
+ return True
+
+ if log_unauthorized:
+ Constants.STA_LOG.logger.info(f"Unauthorized admin access attempt from chat {username} {chat_id}")
+
+ return False
+
+ @staticmethod
+ def is_allowed_chat(update_or_id: Union[Any, int], log_unauthorized: bool = Constants.SAVE_UNAUTHORIZED) -> bool:
+ if isinstance(update_or_id, int):
+ return update_or_id in Constants.ALLOWED_CHATS
+
+ chat_id, username = Text_To_Speech_Bot.get_chatid(update_or_id)
+ # callback query and inline doesn't return chat_id
+ if not chat_id:
+ chat_id, username = Text_To_Speech_Bot.get_userid(update_or_id)
+
+ user = getattr(update_or_id, "from_user", None)
+ user_id = user.id if user else None
+
+ if not Constants.ALLOWED_CHATS or (chat_id is not None and chat_id in Constants.ALLOWED_CHATS):
+ if Constants.ALLOWED_CHATS and chat_id in Constants.ALLOWED_CHATS and (user_id is not None and user_id not in Constants.ALLOWED_CHATS):
+ Constants.ALLOWED_CHATS.add(user_id)
+ Constants.STA_LOG.logger.info(f"Added User {user_id} in APPROVED_CHATS because it's a member of {chat_id}")
+ return True
+
+ if log_unauthorized:
+ Constants.STA_LOG.logger.info(f"Unauthorized access attempt from chat: {username} {chat_id}")
+
+ return False
+
+ @staticmethod
+ def is_private_chat(update) -> bool:
+ if hasattr(update, 'chat'):
+ return update.chat.type == 'private'
+
+ if hasattr(update, 'message'):
+ return update.message.chat.type == 'private'
+
+ return False
+
+ def start_polling(self) -> None:
+ """
+ Starts the polling loop.
+ """
+ from . import handlers
+ self.bot.enable_save_next_step_handlers(delay=3)
+ self.bot.load_next_step_handlers()
+ self.bot.infinity_polling()
+
+ def send_error_to_owner(self, error_text: str, **kargs):
+ if Constants.OWNER_ID:
+ return self.bot.send_message(Constants.OWNER_ID, error_text, **kargs)
+ return False
+
+ def get_user_or_chat_info(self, user_id: int|str = None) -> types.User|types.Chat:
+ """
+ Returns user info (Chat object) if user_id is provided,
+ otherwise returns the bot's own User object.
+
+ :param user_id: the user id or username of a Telegram user/chat
+ """
+ if user_id is None:
+ return self.bot.get_me()
+ return self.bot.get_chat(user_id)
+
+ def send_document_if_exists(self, chat_id, file_path, caption=None, reply_markup=None, parse_mode=None):
+ """
+ Sends a document if it exists, otherwise sends a warning message.
+
+ :param bot: The TeleBot instance.
+ :param chat_id: The chat ID to send the document to.
+ :param file_path: The path to the file to send.
+ :param caption: Optional caption for the file.
+ :param reply_markup: Optional inline keyboard markup.
+ :param parse_mode: Optional parse mode (e.g., 'HTML', 'Markdown').
+ """
+ if os.path.isfile(file_path) and os.path.getsize(file_path) > 0:
+ self.bot.send_document(
+ chat_id,
+ types.InputFile(file_path),
+ caption=caption,
+ reply_markup=reply_markup,
+ parse_mode=parse_mode
+ )
+ else:
+ reason = "File not found" if not os.path.isfile(file_path) else "File is empty"
+ self.bot.send_message(chat_id, f"⚠️ {reason}: {file_path}", parse_mode="HTML")
+
def listener(self, messages: List) -> None:
"""
Listens and deals with all Telegram messages processed by bot.
@@ -52,6 +185,9 @@ def listener(self, messages: List) -> None:
:param messages: List of Telegram messages.
"""
for msg in messages:
+ # filters out not allowed users
+ if not self.is_allowed_chat(msg, log_unauthorized=False):
+ continue
User.validate_user_from_telegram(msg.from_user)
Utils.record_message(msg)
@@ -82,6 +218,31 @@ def next_step(self, input_msg: types.Message, text_to_send: str, function: Calla
reply = self.bot.reply_to(input_msg, text_to_send, reply_markup=types.ForceReply(selective=False))
self.bot.register_next_step_handler(reply, function)
+ def extract_file_link(self, message: types.Message) -> tuple[Optional[types.File], str]:
+ """
+ Given a Telegram message that’s guaranteed to contain some multimedia
+ (voice, audio, or video), return a tuple (file_link, file_type).
+
+ file_type will be one of: 'voice', 'audio', or 'video'.
+
+ If it isn’t one of those, we’ll convert it to voice; if conversion
+ fails, returns (None, '').
+ """
+ if self.is_file_valid_telegram_voice(message.content_type):
+ return message.voice, 'voice'
+
+ if self.is_file_valid_audio(message.content_type):
+ return message.audio, 'audio'
+
+ if self.is_file_valid_video(message.content_type):
+ return message.video, 'video'
+
+ converted = self.convert_to_voice(message, message)
+ if converted:
+ return converted.voice, 'voice'
+
+ return None, ''
+
def convert_to_voice(self, desc_msg: types.Message, file_msg: types.Message) -> Optional[types.Message]:
"""
Converts any supported multimedia file in a Telegram compatible voice format.
@@ -90,8 +251,15 @@ def convert_to_voice(self, desc_msg: types.Message, file_msg: types.Message) ->
:param file_msg: Telegram message containing audio file.
"""
file_link = self.get_file_link(file_msg)
+
+ file_size = file_link.file_size
+ if file_size > Constants.MAX_DOWNLOAD_BYTES:
+ readable_size = Utils.format_bytes(file_size)
+ self.bot.reply_to(file_msg, Constants.BotAnswers.FILE_SIZE_TOO_BIG.format(readable_size))
+ return None
+
downloaded_file = self.bot.download_file(self.bot.get_file(file_link.file_id).file_path)
- self.bot.send_message(file_msg.from_user.id, "Parsing file to telegram voice format")
+ info_msg = self.bot.reply_to(file_msg, Constants.BotAnswers.CONVERTING_TO_VOICE)
try:
song = AudioSegment.from_file(BytesIO(downloaded_file))
io_file = BytesIO()
@@ -99,236 +267,34 @@ def convert_to_voice(self, desc_msg: types.Message, file_msg: types.Message) ->
except:
mimetype = magic.from_buffer(downloaded_file, mime=True)
Constants.STA_LOG.logger.exception(Constants.ExceptionMessages.AUDIO_ERROR % mimetype, exc_info=True)
- self.bot.send_message(6216877, 'Error trying to parse file with mimetype %s.' % mimetype)
- self.bot.reply_to(file_msg, "Unknown file format %s. Please, send another media file." % mimetype)
+ self.send_error_to_owner('Error trying to parse file with mimetype %s.' % mimetype)
+ self.bot.reply_to(file_msg, Constants.BotAnswers.UNKNOWN_MEDIA_TYPE.format(mimetype))
return None
else:
- new_msg = self.bot.send_voice(file_msg.from_user.id, io_file.read())
+ new_msg = self.bot.send_voice(file_msg.chat.id, io_file.read())
self.next_step_focused[str(desc_msg.from_user.id)] = new_msg
return new_msg
+ finally:
+ self.bot.delete_message(file_msg.chat.id, info_msg.message_id)
-# endregion
-
-
-my_bot = telebot.TeleBot(Constants.TOKEN)
-tts = Text_To_Speech_Bot(my_bot)
-
-
-# region Inline Mode
-
-@my_bot.inline_handler(lambda query: 0 <= len(query.query) <= 201)
-def query_handler(query: types.InlineQuery) -> None:
- """
- Answers with different purpose audios an inline query from an user.
-
- :param query: Telegram query.
- """
- User.validate_user_from_telegram(query.from_user)
- Utils.record_query(query)
- if not query.query:
- inline_results = tts.create_inline_results_stored_audio(query)
- else:
- inline_results = tts.create_inline_results_tts_audio(query, tts.queries)
- try:
- tts.bot.answer_inline_query(str(query.id), inline_results, cache_time=1)
- except Exception as query_exc:
- Constants.STA_LOG.logger.exception('Query: "' + query.query + '"', exc_info=True)
- tts.bot.send_message(6216877, 'Query: "' + query.query + '"\n' + str(query_exc))
-
-
-@my_bot.chosen_inline_handler(func=lambda chosen_inline_result: True)
-def chosen_result_handler(chosen_inline_result: types.ChosenInlineResult) -> None:
- """
- Updates previous database record with the selected inline result.
-
- :param chosen_inline_result: Telegram chosen inline result.
- """
- if len(chosen_inline_result.query) == 0:
- tts.update_chosen_results_stored_audio(chosen_inline_result)
- else:
- tts.update_chosen_results_tts_audio(chosen_inline_result)
- # tts.bot.send_message(6216877, 'a' + str(chosen_inline_result))
-
-
-@my_bot.callback_query_handler(func=lambda call: True)
-def callback_query_handler(callback: types.CallbackQuery) -> None:
- """
- Provides the user a description of the sent audio.
-
- :param callback: Telegram callback query.
- """
- text = tts.get_callback_query(callback)
- if len(text) > 54:
- tts.bot.answer_callback_query(callback.id, text, show_alert=True)
- else:
- tts.bot.answer_callback_query(callback.id, text)
-
-# endregion
-
-
-# region Bot Commands
-
-@my_bot.message_handler(commands=[Constants.BotCommands.START, Constants.BotCommands.HELP])
-def command_help(msg: types.Message) -> None: # TODO improve help message
- """
- Answers the user with a help message to help him to understand the purpose of this bot.
-
- :param msg: Telegram message with /help command.
- """
- tts.bot.send_message(msg.from_user.id, Constants.HELP_MSG)
-
-
-@my_bot.message_handler(commands=[Constants.BotCommands.ADD_AUDIO])
-def add_audio_start(msg: types.Message) -> None:
- """
- Initializes the process of audio uploading for user (1/3 Add Audio).
-
- :param msg: Telegram message with /addaudio command.
- """
- db_conn = Utils.create_db_conn()
- result = db_conn.read_one(Constants.DBStatements.AUDIOS_READ_COUNT % str(msg.from_user.id))
- if result and result[0] < 50:
- tts.next_step(msg, Constants.BotAnswers.SEND_AUDIO, add_audio_file)
- else:
- tts.bot.reply_to(msg, Constants.BotAnswers.MAX_OWN_AUDIOS)
-
-
-def add_audio_file(msg: types.Message) -> None:
- """
- Validates file received from user (2/3 Add Audio).
-
- :param msg: Telegram message with attached file.
- """
- if tts.validate_multimedia_file(msg):
- tts.next_step_focused[str(msg.from_user.id)] = msg
- tts.next_step(msg, Constants.BotAnswers.PROVIDE_DESC, add_audio_description)
- else:
- tts.bot.reply_to(msg, Constants.BotAnswers.NOT_AUDIO)
-
-
-def add_audio_description(msg: types.Message) -> None:
- """
- Downloads audio file and saves it with its respective description (3/3 Add Audio).
-
- :param msg: Telegram message with audio description.
- """
- db_conn = Utils.create_db_conn()
- description = Database.db_str(msg.text.strip())
- if msg.content_type == 'text' and len(description) <= 30:
- user_id = str(msg.from_user.id)
- result = db_conn.read_one(Constants.DBStatements.AUDIOS_READ_FOR_CHECKING % (user_id, description))
- if result is None:
- file_message = tts.next_step_focused[str(msg.from_user.id)]
- if not tts.is_file_valid_telegram_voice(file_message.content_type):
- file_message = tts.convert_to_voice(msg, file_message)
- if not file_message:
- return
-
- file_link = file_message.voice
- db_return = db_conn.read_all(Constants.DBStatements.AUDIOS_READ_USER_IDS % user_id)
- if len(db_return) > 0:
- taken_ids = [audio_id[0] for audio_id in db_return]
- user_audio_id = tts.get_stored_audio_free_id(taken_ids)
- else:
- user_audio_id = 1
- callback_code = Utils.generate_unique_str()
- db_conn.write_all(Constants.DBStatements.AUDIOS_INSERT % (file_link.file_id, user_id,
- description, file_link.duration,
- file_link.file_size, user_audio_id,
- callback_code))
- tts.bot.reply_to(file_message, Constants.BotAnswers.SAVED % description)
- else:
- tts.next_step(msg, Constants.BotAnswers.USED_DESC, add_audio_description)
- else:
- tts.next_step(msg, Constants.BotAnswers.WRONG_DESC, add_audio_description)
-
-
-@my_bot.message_handler(commands=[Constants.BotCommands.LST_AUDIO])
-def list_stored_audios(msg: types.Message) -> None:
- """
- Lists all the stored audios by a certain user and their details.
-
- :param msg: Telegram message with /listaudios command.
- """
- db_conn = Utils.create_db_conn()
- audio_str_list = tts.get_stored_audios_listing(str(msg.from_user.id), db_conn)
- if audio_str_list:
- tts.bot.reply_to(msg, audio_str_list)
- else:
- tts.bot.reply_to(msg, Constants.BotAnswers.LST_NONE_AUDIO)
-
-
-@my_bot.message_handler(commands=[Constants.BotCommands.RM_AUDIO])
-def rm_audio_start(msg: types.Message) -> None:
- """
- Lists all the stored audios by a certain user asks him to delete one of them
- (1/2 Remove One Audio).
-
- :param msg: Telegram message with /rmaudio command.
- """
- list_stored_audios(msg)
- db_conn = Utils.create_db_conn()
- result = db_conn.read_one(Constants.DBStatements.AUDIOS_READ_FOR_REMOVING % str(msg.from_user.id))
- if result:
- tts.next_step(msg, Constants.BotAnswers.RM_AUDIO, rm_audio_select)
-
-
-def rm_audio_select(msg: types.Message) -> None:
- """
- Removes an uploaded audio by a determined user if description equals the received
- message from that user (2/2 Remove One Audio).
-
- :param msg: Telegram message with removing confirmation.
- """
- if msg.content_type == 'text' and msg.text:
- db_conn = Utils.create_db_conn()
- audio_to_rm = Database.db_str(msg.text.strip())
- user_id = str(msg.from_user.id)
- result = db_conn.read_one(Constants.DBStatements.AUDIOS_READ_FOR_CHECKING % (user_id, audio_to_rm))
- if result:
- db_conn.write_all(Constants.DBStatements.AUDIOS_REMOVE % (user_id, audio_to_rm))
- tts.bot.reply_to(msg, Constants.BotAnswers.DELETED_AUDIO)
- else:
- tts.bot.reply_to(msg, Constants.BotAnswers.RM_USED_DESC)
- else:
- tts.bot.reply_to(msg, Constants.BotAnswers.RM_DESC_NOT_TEXT)
-
-
-@my_bot.message_handler(commands=[Constants.BotCommands.RM_ALL_AUDIOS])
-def rm_all_audios(msg: types.Message) -> None:
- """
- Lists all the stored audios by a certain user asks him to delete all of them.
- (2/2 Remove All Audios)
-
- :param msg: Telegram message with /rmallaudios command.
- """
- list_stored_audios(msg)
- db_conn = Utils.create_db_conn()
- result = db_conn.read_one(Constants.DBStatements.AUDIOS_READ_FOR_REMOVING % str(msg.from_user.id))
- if result:
- tts.next_step(msg, Constants.BotAnswers.RM_ALL_AUDIO, confirm_rm_all_audios)
-
-
-def confirm_rm_all_audios(msg: types.Message) -> None:
- """
- Removes all previous uploaded audios by a determined user (2/2 Remove All Audios).
-
- :param msg: Telegram message with removing confirmation.
- """
- if msg.content_type == 'text' and msg.text and msg.text.strip() == 'CONFIRM':
- db_conn = Utils.create_db_conn()
- remove_result = db_conn.write_all(Constants.DBStatements.AUDIOS_REMOVE_ALL % str(msg.from_user.id))
- if remove_result:
- tts.bot.reply_to(msg, Constants.BotAnswers.DELETED_ALL_AUDIO)
- else:
- tts.bot.reply_to(msg, Constants.BotAnswers.RM_ALL_NOT_CONFIRM)
-
-# endregion
-
-
-while True:
- try:
- tts.bot.polling(none_stop=True)
- except requests.exceptions.ConnectionError as requests_exc:
- Constants.STA_LOG.logger.exception(Constants.ExceptionMessages.UNEXPECTED_ERROR, exc_info=True)
- time.sleep(10)
+ def enforce_max_length(self, message, text, max_chars: int) -> bool:
+ """
+ Checks that `text` is no longer than `max_chars`.
+ If it is too long, replies to the user and returns False; otherwise returns True.
+
+ :param message: The incoming Message object to reply to
+ :param text: The text to check
+ :param max_chars: Maximum allowed length
+ :return: True if text is OK; False if too long (and a reply was sent)
+ """
+ if text and len(text) > max_chars:
+ self.bot.reply_to(
+ message,
+ f"❌ Text too long ({len(text)}/{max_chars} characters).\n"
+ "Please send a shorter text.",
+ parse_mode="HTML"
+ )
+ return False
+ return True
+
+# endregion
\ No newline at end of file
diff --git a/wait_for_db.sh b/wait_for_db.sh
new file mode 100755
index 0000000..c607e0a
--- /dev/null
+++ b/wait_for_db.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+set -e
+
+echo "⏳ Waiting for DB ($DB_TYPE) to be ready…"
+
+case "$DB_TYPE" in
+ postgresql)
+ # wait for pg_isready
+ until pg_isready -h "$DB_PATH" -U "$DB_USERNAME"; do
+ echo " → Postgres not ready, retrying in 1s..."
+ sleep 1
+ done
+ ;;
+ mysql|mariadb)
+ # wait for mysqladmin
+ until mysqladmin ping -h "$DB_PATH" -u"$DB_USERNAME" -p"$DB_PASSWORD"; do
+ echo " → MySQL/MariaDB not ready, retrying in 1s..."
+ sleep 1
+ done
+ ;;
+ *)
+ # sqlite or unknown: no waiting
+ echo " → No DB container to wait for (using $DB_TYPE)"
+ ;;
+esac
+
+echo "✅ DB is up — launching bot"
+exec "$@"