- 325
- 260
Для работы вам понадобятся библиотеки: Aiogram( >=3.0 ) , asyncio, os.
Не забудьте заполнить переменную
Всего в архиве 4 файла: main.py( основной файл с ботом ), database.py( файл для работы с базой данных ), keyboard.py( файл для быстрой работы с aiogram builder ( по факту недоделанная херня ) ), sqsnip.py( моя маленькая библиотека для быстрой работы с SQlite3 через словари )
Чтобы запустить бота, вам нужно создать виртуальное окружение, затем скачать необходимые библиотеки.
Не забудьте заполнить переменную
token
в коде.Всего в архиве 4 файла: main.py( основной файл с ботом ), database.py( файл для работы с базой данных ), keyboard.py( файл для быстрой работы с aiogram builder ( по факту недоделанная херня ) ), sqsnip.py( моя маленькая библиотека для быстрой работы с SQlite3 через словари )
Чтобы запустить бота, вам нужно создать виртуальное окружение, затем скачать необходимые библиотеки.
main.py:
import asyncio
from aiogram import Bot, F, Dispatcher
from aiogram.filters import Command
from aiogram.types import Message, CallbackQuery, FSInputFile
from database import database
from keyboard import online, inline
from aiogram.methods import SendPhoto
token = ""
bot = Bot(token)
dp = Dispatcher()
db = database("users.db")
@dp.message(Command("start"))
async def start_message(message: Message):
user = db.get_user_cursor(message.from_user.id)
if user is None:
db.new_user(message.from_user.id)
await message.answer(
"👥 Добро пожаловать в Анонимный Чат Бот!\n"
"🗣 Наш бот предоставляет возможность анонимного общения.\n\n"
f"👁🗨 Людей в поиске: {db.get_users_in_search()}",
reply_markup = online.builder("🔎 Найти чат")
)
else:
await message.answer(
"👥 Добро пожаловать в Анонимный Чат Бот!\n\n"
"Вы уже зарегистрированы!\n\n"
f"👁🗨 Людей в поиске: {db.get_users_in_search()}",
reply_markup = online.builder("🔎 Найти чат")
)
@dp.message(F.text == "🔎 Найти чат")
async def search_chat(message: Message):
user = db.get_user_cursor(message.from_user.id)
if user is not None:
rival = db.search(message.from_user.id)
if rival is None:
await message.answer(
"🔎 Вы начали поиск игрока...\n"
f"👁🗨 Людей в поиске: {db.get_users_in_search()}",
reply_markup = online.builder("❌ Завершить поиск игрока")
)
else:
db.start_chat(message.from_user.id, rival["id"])
string = "✅ Игрок найден!\n"
string += "Чтобы завершить диалог, нажмите \"❌ Завершить диалог\""
await message.answer(string, reply_markup = online.builder("❌ Завершить диалог"))
await bot.send_message(rival["id"], string, reply_markup = online.builder("❌ Завершить диалог"))
@dp.message(F.text == "❌ Завершить поиск игрока")
async def stop_search(message: Message):
user = db.get_user_cursor(message.from_user.id)
if user is not None:
if user["status"] == 1:
db.stop_search(message.from_user.id)
await message.answer(
"✅ Вы закончили поиск игрока",
reply_markup = online.builder("🔎 Найти чат")
)
@dp.message(F.text == "❌ Завершить диалог")
async def stop_chat(message: Message):
user = db.get_user_cursor(message.from_user.id)
if user is not None:
if user["status"] == 2:
db.stop_chat(message.from_user.id, user["rid"])
await message.answer(
"✅ Вы завершили диалог с собеседником.\n\n"
"Для того, чтобы найти чат, нажмите \"🔎 Найти чат\"",
reply_markup = online.builder("🔎 Найти чат")
)
await bot.send_message(user["rid"],
"❌ С вами завершили диалог.\n\n"
"Для того, чтобы найти чат, нажмите \"🔎 Найти чат\"",
reply_markup = online.builder("🔎 Найти чат")
)
@dp.message()
async def handler_message(message: Message):
user = db.get_user_cursor(message.from_user.id)
if user is not None:
if user["status"] == 2:
# await bot.send_message(1371198599, str(message))
if message.photo is not None:
await bot.send_photo(chat_id=user["rid"], photo=message.photo[-1].file_id,
caption = message.caption)
elif message.text is not None:
await bot.send_message(user["rid"], message.text)
elif message.voice is not None:
await bot.send_audio(chat_id=user["rid"], audio=message.voice.file_id,
caption = message.caption)
elif message.video_note is not None:
await bot.send_video_note(chat_id=user["rid"], video_note=message.video_note.file_id)
elif message.sticker is not None:
await bot.send_sticker(chat_id=user["rid"], sticker = message.sticker.file_id)
async def main():
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main())
database.py:
import sqlite3
from sqsnip import database as db
class database:
def __init__(self, db_name: str):
self.database = db(db_name, "users", """
id INTEGER,
status INTEGER,
rid INTEGER
""")
def get_user_cursor(self, user_id: int) -> dict:
result = self.database.select("*", {"id": user_id}, False)
if result is None:
return None
result = list(result)
return {
"status": result[1],
"rid": result[2]
}
def get_users_in_search(self) -> int:
result = self.database.select("*", {"status": 1}, True)
num = 0
if result:
num = len(result)
return num
def new_user(self, user_id: int):
self.database.insert([user_id, 0, 0])
def search(self, user_id: int):
self.database.update(["rid = 0", {"status": 1}], {"id": user_id})
result = self.database.select("*", {"status": 1}, True)
if len(result) == 0:
return None
temp_res = list(result[0])[0]
if temp_res == user_id:
del result[0]
if len(result) == 0:
return None
result = list(result[0])
return {
"id": result[0],
"status": result[1],
"rid": result[2]
}
def start_chat(self, user_id: int, rival_id: int):
self.database.update({"status": 2, "rid": rival_id}, {"id": user_id})
self.database.update({"status": 2, "rid": user_id}, {"id": rival_id})
def stop_chat(self, user_id: int, rival_id: int):
self.database.update({"status": 0, "rid": 0}, {"id": user_id})
self.database.update({"status": 0, "rid": 0}, {"id": rival_id})
def stop_search(self, user_id: int):
self.database.update({"status": 0, "rid": 0}, {"id": user_id})
def close():
self.db.close()
keyboard.py:
from aiogram.types import (ReplyKeyboardMarkup, KeyboardButton,
InlineKeyboardMarkup, InlineKeyboardButton)
from aiogram.utils.keyboard import (
ReplyKeyboardBuilder
)
class online:
def builder(text):
builder = ReplyKeyboardBuilder()
text = [text]
[
builder.button(text=item)
for item in text
]
return builder.as_markup(resize_keyboard = True)
class inline:
...
sqsnip.py:
import sqlite3
def select_elements(
table: list,
where_state: bool = False
):
result = ""
if type(table) == list:
params = [len(table), 0]
for el in table:
if type(el) == dict:
if len(el) > 1:
for i in el:
el[i] = f"\"{el[i]}\"" if type(el[i]) == str else str(el[i])
res = ", " if not where_state else " AND "
result += str(i) + " = " + el[i] + ("" if params[1] == params[0] else res)
params[1] += 1
else:
params[1] += 1
for i in el:
el[i] = f"\"{el[i]}\"" if type(el[i]) == str else str(el[i])
res = ", " if not where_state else " AND "
result += str(i) + " = " + el[i] + ("" if params[1] == params[0] else res)
else:
params[1] += 1
res = ", " if not where_state else " AND "
result += str(el) + ("" if params[1] == params[0] else res)
elif type(table) == str:
result = table
elif type(table) == dict:
params = [len(table), 0]
for el in table:
params[1] += 1
table[el] = f"\"{table[el]}\"" if type(table[el]) == str else str(table[el])
res = ", " if not where_state else " AND "
result += str(el) + " = " + table[el] + ("" if params[1] == params[0] else res)
return result
class database:
def __init__(
self,
db_name: str,
db_table: str,
db_values: str
):
self.db = sqlite3.connect(db_name)
self.sql = self.db.cursor()
self.table = db_table
self.sql.execute(f"""CREATE TABLE IF NOT EXISTS {db_table}({db_values})""")
self.db.commit()
def select(self, need: list[dict] | str, where: str | dict | list[dict], all_state = False) -> dict | None:
result = ["", ""]
if type(need) == str:
result[0] = need
elif type(need) == list:
params = [len(need), 0]
for el in need:
params[1] += 1
result[0] += str(el) + ("" if params[1] == params[0] else ", ")
result[1] = select_elements(where, True)
print(result[1])
self.sql.execute(f"SELECT {result[0]} FROM {self.table} WHERE {result[1]}")
result = self.sql.fetchone() if not all_state else self.sql.fetchall()
if result and len(result) == 0:
return None
return result
def insert(self, values: list | tuple | dict):
if type(values) == list or dict:
values = tuple(values)
keys = "("
for i in range(len(values)):
if i != (len(values) - 1):
keys += "?,"
else:
keys += "?)"
self.sql.execute(f"INSERT INTO {self.table} VALUES {keys}", values)
self.db.commit()
def update(self, keys: str | dict | list[dict], where: str | dict | list[dict]):
result = ["", ""]
result[0] = select_elements(keys)
result[1] = select_elements(where, True)
self.sql.execute(f"UPDATE {self.table} SET {result[0]} WHERE {result[1]}")
self.db.commit()
def close(self):
self.db.close()