- 9
- 2
Вряд ли конечно кому то может понадобиться этот код, но может как нибудь, каким нибудь саком, он попадет к кому то в руки, он станет миллиардером, он вспомнит меня и займет мне денег на годовую подписку амбреллы
Очень коленный код, который я кровью и потом писал 7 часов.
Это первый код который я когда либо писал и разочаровался я в самый конец, когда исправил все ошибки, узнал, что интеграция гемини платная...
Python:
# python
import telegram
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
import vertexai
from vertexai.generative_models import GenerativeModel
import os
import json
import asyncio
import threading
import google.auth.transport.requests
import google.oauth2.service_account
# --- Настройки ---
TELEGRAM_BOT_TOKEN = "ТОКЕН БОТА"
CREDENTIALS_FILE = "ФАЙЛ СКАЧАННЫЙ С ГУГЛ КЛАУДА"
PROJECT_ID = "АЙДИ ПРОЕКТА В КЛАУДЕ"
LOCATION = "ЛОКАЦИЯ (лучше ставить us-central1)"
# --- Инициализация Google Cloud AI ---
model = None
try:
credentials = google.oauth2.service_account.Credentials.from_service_account_file(CREDENTIALS_FILE)
vertexai.init(project=PROJECT_ID, location=LOCATION, credentials=credentials)
model = GenerativeModel(model_name="gemini-pro")
print("Google Cloud AI успешно инициализирован с явными учетными данными.")
except Exception as e:
print(f"Ошибка инициализации Google Cloud AI: {e}")
# --- ОбраБОТчеке команд ---
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text('Привет! Я бот, работающий на Gemini.')
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_message = update.message.text
if model:
try:
response = await model.generate_content(user_message)
bot_response = response.text
await update.message.reply_text(bot_response)
except Exception as e:
print(f"Ошибка при взаимодействии с Gemini: {e}")
await update.message.reply_text('Произошла ошибка при обработке вашего запроса.')
else:
await update.message.reply_text('В данный момент Gemini недоступен. бля')
async def error(update: Update, context: ContextTypes.DEFAULT_TYPE):
print(f"Update {update} вызвал ошибку {context.error}")
# --- Фу ---
def run_bot(application):
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(application.run_polling())
finally:
loop.close()
# --- Запуск бота ---
async def main():
application = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
application.add_error_handler(error)
print("Бот готовится к запуску в отдельном потоке. фу")
thread = threading.Thread(target=run_bot, args=(application,))
thread.start()
print("Бот запущен в отдельном потоке. Нажмите Ctrl+C для остановки основного потока.фу ")
# Попа
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("Основной поток остановлен. Попытка остановить бота...")
# Попытка корректно остановить приложение (может не сработать из-за проблем с циклом)
await application.shutdown()
thread.join()
except Exception as e:
print(f"Произошла непредвиденная ошибка в основном потоке: {e}")
if __name__ == '__main__':
asyncio.run(main())
Если у кого то есть деньги, если у какого то безумца вообще появилось этим воспользоваться - я готов полностью помочь(