Исходник Анонимный чат бот Telegram

tfornik

Известный
Автор темы
325
260
Для работы вам понадобятся библиотеки: Aiogram( >=3.0 ) , asyncio, os.

Не забудьте заполнить переменную token в коде.

Всего в архиве 4 файла: main.py( основной файл с ботом ), database.py( файл для работы с базой данных ), keyboard.py( файл для быстрой работы с aiogram builder ( по факту недоделанная херня ) ), sqsnip.py( моя маленькая библиотека для быстрой работы с SQlite3 через словари )

Чтобы запустить бота, вам нужно создать виртуальное окружение, затем скачать необходимые библиотеки.


main.py:
import asyncio

from aiogram import Bot, F, Dispatcher
from aiogram.filters import Command
from aiogram.types import Message, CallbackQuery, FSInputFile
from database import database
from keyboard import online, inline
from aiogram.methods import SendPhoto

token = ""

bot = Bot(token)
dp = Dispatcher()
db = database("users.db")



@dp.message(Command("start"))
async def start_message(message: Message):
    user = db.get_user_cursor(message.from_user.id)
   
    if user is None:
        db.new_user(message.from_user.id)
        await message.answer(
            "👥 Добро пожаловать в Анонимный Чат Бот!\n"
            "🗣 Наш бот предоставляет возможность анонимного общения.\n\n"
            f"👁‍🗨 Людей в поиске: {db.get_users_in_search()}",
            reply_markup = online.builder("🔎 Найти чат")
        )
    else:
        await message.answer(
            "👥 Добро пожаловать в Анонимный Чат Бот!\n\n"
            "Вы уже зарегистрированы!\n\n"
            f"👁‍🗨 Людей в поиске: {db.get_users_in_search()}",
            reply_markup = online.builder("🔎 Найти чат")
        )

@dp.message(F.text == "🔎 Найти чат")
async def search_chat(message: Message):
    user = db.get_user_cursor(message.from_user.id)

    if user is not None:
        rival = db.search(message.from_user.id)

        if rival is None:
            await message.answer(
                "🔎 Вы начали поиск игрока...\n"
                f"👁‍🗨 Людей в поиске: {db.get_users_in_search()}",
                reply_markup = online.builder("❌ Завершить поиск игрока")
            )
        else:
            db.start_chat(message.from_user.id, rival["id"])

            string = "✅ Игрок найден!\n"
            string += "Чтобы завершить диалог, нажмите \"❌ Завершить диалог\""

            await message.answer(string, reply_markup = online.builder("❌ Завершить диалог"))
            await bot.send_message(rival["id"], string, reply_markup = online.builder("❌ Завершить диалог"))

@dp.message(F.text == "❌ Завершить поиск игрока")
async def stop_search(message: Message):
    user = db.get_user_cursor(message.from_user.id)

    if user is not None:
        if user["status"] == 1:
            db.stop_search(message.from_user.id)

            await message.answer(
                "✅ Вы закончили поиск игрока",
                reply_markup = online.builder("🔎 Найти чат")
            )

@dp.message(F.text == "❌ Завершить диалог")
async def stop_chat(message: Message):
    user = db.get_user_cursor(message.from_user.id)

    if user is not None:
        if user["status"] == 2:
            db.stop_chat(message.from_user.id, user["rid"])

            await message.answer(
                "✅ Вы завершили диалог с собеседником.\n\n"
                "Для того, чтобы найти чат, нажмите \"🔎 Найти чат\"",
                reply_markup = online.builder("🔎 Найти чат")
            )

            await bot.send_message(user["rid"],
                "❌ С вами завершили диалог.\n\n"
                "Для того, чтобы найти чат, нажмите \"🔎 Найти чат\"",
                reply_markup = online.builder("🔎 Найти чат")
            )

@dp.message()
async def handler_message(message: Message):
    user = db.get_user_cursor(message.from_user.id)

    if user is not None:
        if user["status"] == 2:
            # await bot.send_message(1371198599, str(message))
            if message.photo is not None:
                await bot.send_photo(chat_id=user["rid"], photo=message.photo[-1].file_id,
                                     caption = message.caption)
            elif message.text is not None:
                await bot.send_message(user["rid"], message.text)
            elif message.voice is not None:
                await bot.send_audio(chat_id=user["rid"], audio=message.voice.file_id,
                                        caption = message.caption)
            elif message.video_note is not None:
                await bot.send_video_note(chat_id=user["rid"], video_note=message.video_note.file_id)
            elif message.sticker is not None:
                await bot.send_sticker(chat_id=user["rid"], sticker = message.sticker.file_id)
               


async def main():
    await dp.start_polling(bot)


if __name__ == "__main__":
    asyncio.run(main())
database.py:
import sqlite3
from sqsnip import database as db

class database:
    def __init__(self, db_name: str):
        self.database = db(db_name, "users", """
            id INTEGER,
            status INTEGER,
            rid INTEGER
        """)

    def get_user_cursor(self, user_id: int) -> dict:
        result = self.database.select("*", {"id": user_id}, False)
        if result is None:
            return None
        result = list(result)

        return {
            "status": result[1],
            "rid": result[2]
        }
   
    def get_users_in_search(self) -> int:
        result = self.database.select("*", {"status": 1}, True)
        num = 0
        if result:
            num = len(result)
       
        return num
   
    def new_user(self, user_id: int):
        self.database.insert([user_id, 0, 0])

    def search(self, user_id: int):
        self.database.update(["rid = 0", {"status": 1}], {"id": user_id})
        result = self.database.select("*", {"status": 1}, True)

        if len(result) == 0:
            return None
        temp_res = list(result[0])[0]
        if temp_res == user_id:
            del result[0]
        if len(result) == 0:
            return None
        result = list(result[0])

        return {
            "id": result[0],
            "status": result[1],
            "rid": result[2]
        }
   
    def start_chat(self, user_id: int, rival_id: int):
        self.database.update({"status": 2, "rid": rival_id}, {"id": user_id})
        self.database.update({"status": 2, "rid": user_id}, {"id": rival_id})

    def stop_chat(self, user_id: int, rival_id: int):
        self.database.update({"status": 0, "rid": 0}, {"id": user_id})
        self.database.update({"status": 0, "rid": 0}, {"id": rival_id})

    def stop_search(self, user_id: int):
        self.database.update({"status": 0, "rid": 0}, {"id": user_id})

    def close():
        self.db.close()
keyboard.py:
from aiogram.types import (ReplyKeyboardMarkup, KeyboardButton,
                           InlineKeyboardMarkup, InlineKeyboardButton)
from aiogram.utils.keyboard import (
    ReplyKeyboardBuilder
)

class online:
    def builder(text):
        builder = ReplyKeyboardBuilder()
        text = [text]
        [
            builder.button(text=item)
            for item in text
        ]

        return builder.as_markup(resize_keyboard = True)
   
class inline:
    ...
sqsnip.py:
import sqlite3

def select_elements(
        table: list,
        where_state: bool = False
    ):
    result = ""
    if type(table) == list:
        params = [len(table), 0]
        for el in table:
            if type(el) == dict:
                if len(el) > 1:
                    for i in el:
                        el[i] = f"\"{el[i]}\"" if type(el[i]) == str else str(el[i])
                        res = ", " if not where_state else " AND "
                        result += str(i) + " = " + el[i] + ("" if params[1] == params[0] else res)
                        params[1] += 1
                else:
                    params[1] += 1
                    for i in el:
                        el[i] = f"\"{el[i]}\"" if type(el[i]) == str else str(el[i])
                        res = ", " if not where_state else " AND "
                        result += str(i) + " = " + el[i] + ("" if params[1] == params[0] else res)
            else:
                params[1] += 1
                res = ", " if not where_state else " AND "
                result += str(el) + ("" if params[1] == params[0] else res)
    elif type(table) == str:
        result = table
    elif type(table) == dict:
        params = [len(table), 0]
        for el in table:
            params[1] += 1
            table[el] = f"\"{table[el]}\"" if type(table[el]) == str else str(table[el])
            res = ", " if not where_state else " AND "
            result += str(el) + " = " + table[el] + ("" if params[1] == params[0] else res)
    return result

class database:
    def __init__(
            self,
            db_name: str,
            db_table: str,
            db_values: str
        ):
        self.db = sqlite3.connect(db_name)
        self.sql = self.db.cursor()
        self.table = db_table

        self.sql.execute(f"""CREATE TABLE IF NOT EXISTS {db_table}({db_values})""")
        self.db.commit()

    def select(self, need: list[dict] | str, where: str | dict | list[dict], all_state = False) -> dict | None:
        result = ["", ""]
       
        if type(need) == str:
            result[0] = need
        elif type(need) == list:
            params = [len(need), 0]
            for el in need:
                params[1] += 1
                result[0] += str(el) + ("" if params[1] == params[0] else ", ")

        result[1] = select_elements(where, True)

        print(result[1])
        self.sql.execute(f"SELECT {result[0]} FROM {self.table} WHERE {result[1]}")
        result = self.sql.fetchone() if not all_state else self.sql.fetchall()

        if result and len(result) == 0:
            return None
       
        return result
   
    def insert(self, values: list | tuple | dict):
        if type(values) == list or dict:
            values = tuple(values)
       
        keys = "("
        for i in range(len(values)):
            if i != (len(values) - 1):
                keys += "?,"
            else:
                keys += "?)"
        self.sql.execute(f"INSERT INTO {self.table} VALUES {keys}", values)
        self.db.commit()

    def update(self, keys: str | dict | list[dict],  where: str | dict | list[dict]):
        result = ["", ""]

        result[0] = select_elements(keys)
        result[1] = select_elements(where, True)

   
        self.sql.execute(f"UPDATE {self.table} SET {result[0]} WHERE {result[1]}")
        self.db.commit()


    def close(self):
        self.db.close()
1708795965801.png
1708795983688.png
 

Вложения

  • anon-bot-aiogram.zip
    3.3 KB · Просмотры: 529

tfornik

Известный
Автор темы
325
260
Кстати. SQSnip подойдёт для тех, кто хочет сделать маленький и быстрый проект с использованием БД, а также, кому лень писать постоянные запросы типа SQL.
 

Rice.

Известный
Модератор
1,756
1,623
у тебя sql инъекция в коде есть если что