partner-tg/main.py

439 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import os
from aiogram import Bot, Dispatcher, types, F
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.filters import CommandStart
from dotenv import load_dotenv
import json
import aiohttp
from aiogram.fsm.context import FSMContext
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.fsm.state import State, StatesGroup
from hashlib import sha256
load_dotenv()
API_TOKEN = os.getenv('TG_BOT_TOKEN')
API_URL = os.getenv('API_URL')
COMPANY_KEY = os.getenv('COMPANY_KEY')
bot = Bot(token=API_TOKEN)
dp = Dispatcher()
storage = MemoryStorage()
dp.fsm.storage = storage
# Текст приветствия для главного экрана
WELCOME_TEXT = (
'👋 Добро пожаловать!\n\n'
'Это официальный бот для партнерской программы.\n'
'Здесь вы найдете свои партнерские ссылки, промо материалы, статистику и сможете задать вопросы.'
)
# Количество ссылок/рефералов на одной странице (для пагинации)
LINKS_PER_PAGE = 10
PROMO_URL = 'https://telegra.ph/Test-05-24-363'
PARTNER_AGREEMENT_URL = 'https://telegra.ph/Test-05-24-363'
# Клавиатура главного экрана
main_keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text='Мои ссылки', callback_data='links'),
InlineKeyboardButton(text='Промо материалы', url=PROMO_URL)
],
[
InlineKeyboardButton(text='Партнерское соглашение', url=PARTNER_AGREEMENT_URL),
InlineKeyboardButton(text='Задать вопрос', callback_data='question')
],
[
InlineKeyboardButton(text='Моя статистика', callback_data='stats'),
InlineKeyboardButton(text='Создать заявку на вывод средств', callback_data='withdraw')
]
])
# Вместо загрузки из файла будем хранить кэш токенов пользователей
user_tokens = {}
# =====================
# Формат my_links_mock_data.json:
# [
# {
# "ref": "<uuid>", # ссылка с уникальным идентификатором
# "description": "Описание/название ссылки" # описание/название ссылки
# },
# ...
# ]
# =====================
async def get_links_from_api(token: str):
headers = {"Authorization": f"Bearer {token}"}
async with aiohttp.ClientSession() as session:
async with session.get(f'{API_URL}/ref', headers=headers) as resp:
if resp.status == 200:
return await resp.json()
return []
async def render_links_page(page: int = 0, token: str = None):
"""
Формирует текст и клавиатуру для страницы с партнерскими ссылками.
Получает данные с API.
"""
links = await get_links_from_api(token)
total = len(links)
start = page * LINKS_PER_PAGE
end = start + LINKS_PER_PAGE
page_links = links[start:end]
header = f"{'ref':<36} | {'promocode':<10} | description"
sep = '-' * 36 + '-|-' + '-' * 10 + '-|-' + '-' * 30
rows = []
for item in page_links:
ref = item['ref']
promocode = item.get('promocode', '')
description = item['description']
url = f"https://{item['ref']}"
rows.append(f"{ref:<36} | {promocode:<10} | <a href=\"{url}\">{description}</a>")
if not rows:
table = 'Нет ссылок.'
else:
table = '\n'.join([header, sep] + rows)
text = f"🔗 Ваши партнерские ссылки (стр. {page+1}):\n\n<pre>{table}</pre>"
nav_buttons = []
if start > 0:
nav_buttons.append(InlineKeyboardButton(text='⬅️ Предыдущая', callback_data=f'links_page_{page-1}'))
if end < total:
nav_buttons.append(InlineKeyboardButton(text='Следующая ➡️', callback_data=f'links_page_{page+1}'))
keyboard_rows = []
if nav_buttons:
keyboard_rows.append(nav_buttons)
keyboard_rows.append([InlineKeyboardButton(text='Создать новую', callback_data='create_link')])
keyboard_rows.append([InlineKeyboardButton(text='Назад', callback_data='back_to_main')])
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_rows)
return text, keyboard
@dp.message(CommandStart())
async def send_welcome(message: types.Message):
"""Обрабатывает команду /start, авторизует пользователя и показывает главный экран."""
tg_id = message.from_user.id
chat_id = message.chat.id
name = message.from_user.full_name
login = message.from_user.username
token = None
error = None
hash_value = sha256(f"{tg_id}sold".encode()).hexdigest()
company_key = COMPANY_KEY
async with aiohttp.ClientSession() as session:
try:
# 1. Пробуем авторизоваться по hash
async with session.post(f'{API_URL}/tg_auth', json={'hash': hash_value}) as auth_resp:
if auth_resp.status == 200:
token = hash_value
else:
# 2. Если не найден — регистрируем
async with session.post(f'{API_URL}/register', json={'tg_id': tg_id, 'chat_id': chat_id, 'name': name, 'login': login, 'company_key': company_key}) as reg_resp:
if reg_resp.status == 200:
# После регистрации снова пробуем авторизоваться
async with session.post(f'{API_URL}/tg_auth', json={'hash': hash_value}) as auth_resp2:
if auth_resp2.status == 200:
token = hash_value
else:
error = 'Ошибка авторизации после регистрации'
else:
error = 'Ошибка регистрации пользователя'
except Exception as e:
error = f'Ошибка соединения с сервером авторизации: {e}'
if token:
user_tokens[tg_id] = token
await message.answer(WELCOME_TEXT, reply_markup=main_keyboard)
else:
await message.answer(f'Ошибка авторизации: {error}', reply_markup=None)
@dp.callback_query(F.data == 'links')
async def show_links(callback: types.CallbackQuery):
"""Показывает страницу с партнерскими ссылками (первая страница)."""
tg_id = callback.from_user.id
token = user_tokens.get(tg_id)
text, keyboard = await render_links_page(0, token)
await callback.message.edit_text(text, reply_markup=keyboard, parse_mode='HTML', disable_web_page_preview=True)
await callback.answer()
@dp.callback_query(lambda c: c.data and c.data.startswith('links_page_'))
async def paginate_links(callback: types.CallbackQuery):
"""Пагинация по партнерским ссылкам."""
page = int(callback.data.split('_')[-1])
tg_id = callback.from_user.id
token = user_tokens.get(tg_id)
text, keyboard = await render_links_page(page, token)
await callback.message.edit_text(text, reply_markup=keyboard, parse_mode='HTML', disable_web_page_preview=True)
await callback.answer()
@dp.callback_query(F.data == 'back_to_main')
async def back_to_main(callback: types.CallbackQuery):
"""Возврат на главный экран."""
await callback.message.edit_text(WELCOME_TEXT, reply_markup=main_keyboard)
await callback.answer()
# Состояния для FSM
class LinkStates(StatesGroup):
waiting_for_description = State()
link_created = State()
class WithdrawStates(StatesGroup):
waiting_for_amount = State()
@dp.callback_query(F.data == 'create_link')
async def create_link(callback: types.CallbackQuery, state: FSMContext):
sent = await callback.message.edit_text(
'Введите описание новой ссылке:',
reply_markup=InlineKeyboardMarkup(
inline_keyboard=[[InlineKeyboardButton(text='Назад', callback_data='back_to_links')]]
)
)
# Сохраняем id сообщения, чтобы потом убрать у него кнопки
await state.set_state(LinkStates.waiting_for_description)
await state.update_data(desc_msg_id=sent.message_id)
await callback.answer()
# Обработка кнопки "Назад" из состояния создания ссылки
@dp.callback_query(F.data == 'back_to_links', LinkStates.waiting_for_description)
async def back_to_links_from_create(callback: types.CallbackQuery, state: FSMContext):
tg_id = callback.from_user.id
token = user_tokens.get(tg_id)
text, keyboard = await render_links_page(0, token)
await callback.message.edit_text(text, reply_markup=keyboard, parse_mode='HTML', disable_web_page_preview=True)
await state.clear()
await callback.answer()
# Обработка ввода описания новой ссылки
@dp.message(LinkStates.waiting_for_description)
async def process_new_link_description(message: types.Message, state: FSMContext):
description = message.text.strip()
tg_id = message.from_user.id
token = user_tokens.get(tg_id)
headers = {"Authorization": f"Bearer {token}"}
data = await state.get_data()
desc_msg_id = data.get('desc_msg_id')
if desc_msg_id:
try:
await message.bot.edit_message_reply_markup(
chat_id=message.chat.id,
message_id=desc_msg_id,
reply_markup=None
)
except Exception:
pass
async with aiohttp.ClientSession() as session:
async with session.post(f'{API_URL}/ref/add', json={"description": description}, headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
ref = data.get('ref')
promocode = data.get('promocode')
text = f"Новая ссылка успешно создана!\n\n<pre>ref: {ref}\npromocode: {promocode}\ndescription: {description}</pre>"
keyboard = InlineKeyboardMarkup(
inline_keyboard=[[InlineKeyboardButton(text='Назад', callback_data='back_to_links_from_success')]]
)
await message.answer(text, reply_markup=keyboard, parse_mode='HTML')
await state.set_state(LinkStates.link_created)
else:
await message.answer('Ошибка при создании ссылки. Попробуйте еще раз.')
# Обработка кнопки "Назад" после успешного создания ссылки
@dp.callback_query(F.data == 'back_to_links_from_success', LinkStates.link_created)
async def back_to_links_from_success(callback: types.CallbackQuery, state: FSMContext):
tg_id = callback.from_user.id
token = user_tokens.get(tg_id)
text, keyboard = await render_links_page(0, token)
await callback.message.edit_text(text, reply_markup=keyboard, parse_mode='HTML', disable_web_page_preview=True)
await state.clear()
await callback.answer()
@dp.callback_query(F.data == 'withdraw')
async def withdraw_funds(callback: types.CallbackQuery, state: FSMContext):
"""
Запрашивает сумму для вывода средств.
"""
sent = await callback.message.edit_text(
'Введите сумму для вывода:',
reply_markup=InlineKeyboardMarkup(
inline_keyboard=[[InlineKeyboardButton(text='Назад', callback_data='back_to_main_from_withdraw')]]
)
)
await state.set_state(WithdrawStates.waiting_for_amount)
await state.update_data(withdraw_msg_id=sent.message_id)
await callback.answer()
@dp.callback_query(F.data == 'back_to_main_from_withdraw', WithdrawStates.waiting_for_amount)
async def back_to_main_from_withdraw(callback: types.CallbackQuery, state: FSMContext):
await callback.message.edit_text(WELCOME_TEXT, reply_markup=main_keyboard)
await state.clear()
await callback.answer()
@dp.message(WithdrawStates.waiting_for_amount)
async def process_withdraw_amount(message: types.Message, state: FSMContext):
try:
amount = float(message.text.strip())
if amount <= 0:
await message.answer('Сумма для вывода должна быть положительной.')
return
except ValueError:
await message.answer('Пожалуйста, введите корректное число для суммы.')
return
tg_id = message.from_user.id
token = user_tokens.get(tg_id)
if not token:
await message.answer('Ошибка авторизации. Пожалуйста, перезапустите бота командой /start.')
await state.clear()
return
headers = {"Authorization": f"Bearer {token}"}
data = await state.get_data()
withdraw_msg_id = data.get('withdraw_msg_id')
if withdraw_msg_id:
try:
await message.bot.edit_message_reply_markup(
chat_id=message.chat.id,
message_id=withdraw_msg_id,
reply_markup=None
)
except Exception:
pass
async with aiohttp.ClientSession() as session:
try:
async with session.post(f'{API_URL}/withdraw', json={'tg_id': tg_id, 'amount': amount}, headers=headers) as resp:
if resp.status == 200:
response_data = await resp.json()
transaction_id = response_data.get('transaction_id')
await message.answer(
f'Запрос на вывод средств успешно создан!\nСумма: {amount:.2f}\nID транзакции: {transaction_id}',
reply_markup=InlineKeyboardMarkup(
inline_keyboard=[[InlineKeyboardButton(text='Назад', callback_data='back_to_main')]]
)
)
else:
error_detail = 'Неизвестная ошибка'
try:
error_json = await resp.json()
if 'detail' in error_json:
error_detail = error_json['detail']
except Exception:
pass
await message.answer(f'Ошибка при создании заявки на вывод средств: {error_detail}')
except aiohttp.ClientConnectorError:
await message.answer('Не удалось подключиться к серверу API. Пожалуйста, попробуйте позже.')
except Exception as e:
await message.answer(f'Произошла непредвиденная ошибка: {e}')
await state.clear()
@dp.callback_query(F.data == 'question')
async def not_implemented_question(callback: types.CallbackQuery):
await callback.answer('Функция "Задать вопрос" пока не реализована.', show_alert=True)
# =====================
# Формат my_ststs_mock_data.json:
# {
# "totalSales": int, # общее количество продаж
# "totalIncome": float, # общий доход
# "availableWithdrawal": float, # доступно к выводу
# "refData": [ # список данных по рефералам
# {
# "name": str, # название реферальной ссылки
# "sales": int, # количество продаж по ссылке
# "income": float # доход по ссылке
# },
# ...
# ]
# }
# =====================
async def get_stat_from_api(token: str):
headers = {"Authorization": f"Bearer {token}"}
async with aiohttp.ClientSession() as session:
async with session.get(f'{API_URL}/stat', headers=headers) as resp:
if resp.status == 200:
return await resp.json()
return None
async def get_ref_stat_from_api(token: str):
headers = {"Authorization": f"Bearer {token}"}
async with aiohttp.ClientSession() as session:
async with session.get(f'{API_URL}/ref/stat', headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
return data.get('refData', [])
return []
async def render_stats_page(page: int = 0, token: str = None):
"""
Формирует текст и клавиатуру для страницы со статистикой.
Показывает общую статистику и таблицу по рефералам (по 10 на страницу).
"""
stats = await get_stat_from_api(token)
ref_data = await get_ref_stat_from_api(token)
total = len(ref_data)
start = page * LINKS_PER_PAGE
end = start + LINKS_PER_PAGE
page_refs = ref_data[start:end]
# Общая статистика
if stats:
stat_text = (
f"<b>Ваша статистика</b>\n"
f"Всего продаж: <b>{stats.get('totalSales', 0)}</b>\n"
f"Всего заработано: <b>{stats.get('totalIncome', 0):.1f}</b>\n"
f"Доступно к выводу: <b>{stats.get('availableWithdrawal', 0):.1f}</b>\n"
)
else:
stat_text = "<b>Ваша статистика</b>\nОшибка получения данных."
# Таблица по рефералам
header = f"{'description':<30} | {'sales':<5} | income"
sep = '-' * 30 + '-|-' + '-' * 5 + '-|-' + '-' * 10
rows = []
for item in page_refs:
description = item.get('description', '')[:30]
sales = item.get('sales', 0)
income = item.get('income', 0)
rows.append(f"{description:<30} | {sales:<5} | {income:.1f}")
if not rows:
table = 'Нет данных.'
else:
table = '\n'.join([header, sep] + rows)
text = stat_text + '\n<pre>' + table + '</pre>'
# Кнопки навигации и перехода
nav_buttons = []
if start > 0:
nav_buttons.append(InlineKeyboardButton(text='⬅️ Предыдущая', callback_data=f'stats_page_{page-1}'))
if end < total:
nav_buttons.append(InlineKeyboardButton(text='Следующая ➡️', callback_data=f'stats_page_{page+1}'))
keyboard_rows = []
if nav_buttons:
keyboard_rows.append(nav_buttons)
keyboard_rows.append([InlineKeyboardButton(text='Мои ссылки', callback_data='links')])
keyboard_rows.append([InlineKeyboardButton(text='Назад', callback_data='back_to_main')])
keyboard = InlineKeyboardMarkup(inline_keyboard=keyboard_rows)
return text, keyboard
@dp.callback_query(F.data == 'stats')
async def show_stats(callback: types.CallbackQuery):
tg_id = callback.from_user.id
token = user_tokens.get(tg_id)
text, keyboard = await render_stats_page(0, token)
await callback.message.edit_text(text, reply_markup=keyboard, parse_mode='HTML', disable_web_page_preview=True)
await callback.answer()
@dp.callback_query(lambda c: c.data and c.data.startswith('stats_page_'))
async def paginate_stats(callback: types.CallbackQuery):
page = int(callback.data.split('_')[-1])
tg_id = callback.from_user.id
token = user_tokens.get(tg_id)
text, keyboard = await render_stats_page(page, token)
await callback.message.edit_text(text, reply_markup=keyboard, parse_mode='HTML', disable_web_page_preview=True)
await callback.answer()
async def main():
await dp.start_polling(bot)
if __name__ == '__main__':
asyncio.run(main())