partner-core/main.py

449 lines
17 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status, Query
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlmodel import SQLModel, Field, create_engine, Session, select
from passlib.context import CryptContext
from typing import Optional, List, Dict
from datetime import datetime, timedelta
from models import RefAddRequest, RefResponse, RegisterRequest, Token, TokenRequest
from uuid import uuid4
from fastapi.responses import JSONResponse
from sqlalchemy import func
# Конфигурация
AUTH_DATABASE_ADDRESS = "sqlite:///partner.db"
#SQLModel
class TgAgent(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
tg_id: int = Field(index=True, unique=True)
chat_id: Optional[int] = None
phone: Optional[str] = None
name: Optional[str] = None
login: Optional[str] = None
create_dttm: datetime = Field(default_factory=datetime.utcnow)
update_dttm: datetime = Field(default_factory=datetime.utcnow)
class Ref(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
tg_agent_id: int = Field(foreign_key="tgagent.id")
ref: str
description: Optional[str] = None
create_dttm: datetime = Field(default_factory=datetime.utcnow)
update_dttm: datetime = Field(default_factory=datetime.utcnow)
class Sale(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
cost: float
crediting: float # сколько начислено за продажу
ref: int = Field(foreign_key="ref.id")
sale_id: str
create_dttm: datetime = Field(default_factory=datetime.utcnow)
update_dttm: datetime = Field(default_factory=datetime.utcnow)
class Transaction(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
transaction_id: str = Field(default_factory=lambda: str(uuid4()), index=True, unique=True)
sum: float
tg_agent_id: int = Field(foreign_key="tgagent.id")
status: str # 'process' || 'done' || 'error' || 'waiting'
create_dttm: datetime = Field(default_factory=datetime.utcnow)
update_dttm: datetime = Field(default_factory=datetime.utcnow)
# Создание движка базы данных
AUTH_DB_ENGINE = create_engine(AUTH_DATABASE_ADDRESS, echo=True)
SQLModel.metadata.create_all(AUTH_DB_ENGINE)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
# FastAPI app
app = FastAPI()
# CRUD
def get_tg_agent_by_tg_id(db: Session, tg_id: int) -> Optional[TgAgent]:
statement = select(TgAgent).where(TgAgent.tg_id == tg_id)
return db.exec(statement).first()
# Dependency
def get_db():
with Session(AUTH_DB_ENGINE) as session:
yield session
# Авторизация
async def get_current_tg_agent(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Ожидаем токен вида 'session_for_{tg_id}'
if not token.startswith("session_for_"):
raise credentials_exception
try:
tg_id = int(token.replace("session_for_", ""))
except Exception:
raise credentials_exception
tg_agent = get_tg_agent_by_tg_id(db, tg_id)
if tg_agent is None:
raise credentials_exception
return tg_agent
# Регистрация
@app.post("/register", tags=["partner-tg"])
def register(req: RegisterRequest, db: Session = Depends(get_db)):
tg_id = req.tg_id
chat_id = req.chat_id
phone = req.phone
name = getattr(req, 'name', None)
login = getattr(req, 'login', None)
print(f'tg_id: {tg_id}, chat_id: {chat_id}, phone: {phone}, name: {name}, login: {login}')
tg_agent = get_tg_agent_by_tg_id(db, tg_id)
if tg_agent:
raise HTTPException(status_code=400, detail="tg_id already registered")
new_tg_agent = TgAgent(tg_id=tg_id, chat_id=chat_id, phone=phone, name=name, login=login)
db.add(new_tg_agent)
db.commit()
db.refresh(new_tg_agent)
return {"msg": "TgAgent registered successfully"}
def authenticate_tg_agent(engine, tg_id: int):
with Session(engine) as db:
tg_agent = get_tg_agent_by_tg_id(db, tg_id)
if not tg_agent:
return None
return tg_agent
# Защищённый эндпоинт
@app.get("/protected", tags=["partner-tg"])
def protected_route(current_tg_agent: TgAgent = Depends(get_current_tg_agent)):
return {"msg": f"Hello, {current_tg_agent.tg_id}! This is a protected route."}
# Авторизация
@app.post("/token", response_model=Token, tags=["partner-tg"])
async def login_for_access_token(req: TokenRequest):
tg_id = req.tg_id
tg_agent = authenticate_tg_agent(AUTH_DB_ENGINE, tg_id)
if not tg_agent:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect tg_id",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = f"session_for_{tg_agent.tg_id}"
return Token(access_token=access_token, token_type="bearer")
@app.get("/ref", response_model=list[RefResponse], tags=["partner-tg"])
def get_refs(current_tg_agent: TgAgent = Depends(get_current_tg_agent), db: Session = Depends(get_db)):
refs = db.exec(select(Ref).where(Ref.tg_agent_id == current_tg_agent.id)).all()
return [RefResponse(ref=r.ref, description=r.description or "") for r in refs]
@app.post("/ref/add", tags=["partner-tg"])
def add_ref(req: RefAddRequest, current_tg_agent: TgAgent = Depends(get_current_tg_agent), db: Session = Depends(get_db)):
new_ref = Ref(
tg_agent_id=current_tg_agent.id,
ref=str(uuid4()),
description=req.description
)
db.add(new_ref)
db.commit()
db.refresh(new_ref)
return {"ref": new_ref.ref}
@app.get("/ref/stat", tags=["partner-tg"])
def get_ref_stat(current_tg_agent: TgAgent = Depends(get_current_tg_agent), db: Session = Depends(get_db)):
# 1. Получаем все реферальные ссылки пользователя
refs = db.exec(select(Ref).where(Ref.tg_agent_id == current_tg_agent.id)).all()
result = []
for ref in refs:
# 2. Для каждой ссылки считаем продажи и сумму
sales = db.exec(select(Sale).where(Sale.ref == ref.id)).all()
sales_count = len(sales)
income = sum(sale.crediting for sale in sales)
result.append({
"description": ref.description or "",
"sales": sales_count,
"income": income
})
return {"refData": result}
@app.get("/stat", tags=["partner-tg"])
def get_stat(current_tg_agent: TgAgent = Depends(get_current_tg_agent), db: Session = Depends(get_db)):
# 1. Получаем все реферальные ссылки пользователя
refs = db.exec(select(Ref).where(Ref.tg_agent_id == current_tg_agent.id)).all()
ref_ids = [r.id for r in refs]
# 2. Считаем totalSales (продажи по всем рефам пользователя)
total_sales = db.exec(select(Sale).where(Sale.ref.in_(ref_ids))).all()
totalSales = len(total_sales)
totalIncome = sum(sale.crediting for sale in total_sales)
withdrawals = db.exec(
select(Transaction).where(
Transaction.tg_agent_id == current_tg_agent.id
)
).all()
availableWithdrawal = totalIncome - sum(t.sum for t in withdrawals)
return {
"totalSales": totalSales,
"totalIncome": totalIncome,
"availableWithdrawal": availableWithdrawal
}
@app.get("/dashboard/cards", tags=["bff"])
def get_dashboard_cards(db: Session = Depends(get_db)):
# 1. Общий доход - сумма всех Sale.cost
total_revenue = db.exec(select(Sale)).all()
totalRevenue = sum(sale.cost for sale in total_revenue)
# 2. Общие выплаты - сумма всех Sale.crediting
totalPayouts = sum(sale.crediting for sale in total_revenue)
# 3. Активные рефералы - количество уникальных TgAgent.tg_id
unique_agents = db.exec(select(TgAgent.tg_id)).all()
activeReferrals = len(set(unique_agents))
# 4. Ожидающие выплаты - разница между суммой всех Sale.crediting и суммой всех Transaction.sum
all_transactions = db.exec(select(Transaction)).all()
totalTransactions = sum(t.sum for t in all_transactions)
pendingPayouts = totalPayouts - totalTransactions
# 5. Количество продаж
totalSales = len(total_revenue)
return {
"totalRevenue": totalRevenue,
"totalPayouts": totalPayouts,
"activeReferrals": activeReferrals,
"pendingPayouts": pendingPayouts,
"totalSales": totalSales
}
@app.get("/dashboard/chart/total", tags=["bff"])
def get_dashboard_chart_total(db: Session = Depends(get_db)):
# Группируем продажи по дате (день)
result = db.exec(
select(
func.strftime('%Y-%m-%d', Sale.create_dttm).label('date'),
func.sum(Sale.cost).label('revenue'),
func.count(Sale.id).label('sales')
).group_by(func.strftime('%Y-%m-%d', Sale.create_dttm))
.order_by(func.strftime('%Y-%m-%d', Sale.create_dttm))
).all()
# Преобразуем результат в нужный формат
data = [
{"date": row.date, "revenue": row.revenue or 0, "sales": row.sales or 0}
for row in result
]
return JSONResponse(content=data)
@app.get("/dashboard/chart/agent", tags=["bff"])
def get_dashboard_chart_agent(db: Session = Depends(get_db)):
# Получаем всех агентов
agents = db.exec(select(TgAgent)).all()
result = []
for agent in agents:
# Получаем все рефы этого агента
refs = db.exec(select(Ref).where(Ref.tg_agent_id == agent.id)).all()
ref_ids = [r.id for r in refs]
if not ref_ids:
result.append({
"name": agent.name or f"Агент {agent.id}",
"count": 0,
"sum": 0.0
})
continue
# Получаем все продажи по этим рефам
sales = db.exec(select(Sale).where(Sale.ref.in_(ref_ids))).all()
sales_count = len(sales)
sales_sum = sum(sale.cost for sale in sales)
result.append({
"name": agent.name or f"Агент {agent.id}",
"count": sales_count,
"sum": sales_sum
})
return JSONResponse(content=result)
@app.get("/stat/agents", tags=["bff"])
def get_agents_stat(
db: Session = Depends(get_db),
date_start: str = Query(None),
date_end: str = Query(None),
):
agents_query = select(TgAgent)
if date_start:
agents_query = agents_query.where(TgAgent.create_dttm >= date_start)
if date_end:
agents_query = agents_query.where(TgAgent.create_dttm <= date_end)
agents = db.exec(agents_query).all()
result = []
for agent in agents:
refs = db.exec(select(Ref).where(Ref.tg_agent_id == agent.id)).all()
ref_ids = [r.id for r in refs]
ref_count = len(ref_ids)
if not ref_ids:
result.append({
"name": agent.name or f"Агент {agent.id}",
"refCount": 0,
"salesCount": 0,
"salesSum": 0.0,
"crediting": 0.0
})
continue
sales = db.exec(select(Sale).where(Sale.ref.in_(ref_ids))).all()
sales_count = len(sales)
sales_sum = sum(sale.cost for sale in sales)
crediting_sum = sum(sale.crediting for sale in sales)
result.append({
"name": agent.name or f"Агент {agent.id}",
"refCount": ref_count,
"salesCount": sales_count,
"salesSum": sales_sum,
"crediting": crediting_sum
})
return JSONResponse(content=result)
@app.get("/stat/referrals", tags=["bff"])
def get_referrals_stat(
db: Session = Depends(get_db),
date_start: str = Query(None),
date_end: str = Query(None),
):
refs_query = select(Ref)
if date_start:
refs_query = refs_query.where(Ref.create_dttm >= date_start)
if date_end:
refs_query = refs_query.where(Ref.create_dttm <= date_end)
refs = db.exec(refs_query).all()
result = []
for ref in refs:
agent = db.exec(select(TgAgent).where(TgAgent.id == ref.tg_agent_id)).first()
sales = db.exec(select(Sale).where(Sale.ref == ref.id)).all()
sales_count = len(sales)
sales_sum = sum(sale.cost for sale in sales)
result.append({
"ref": ref.ref,
"agent": agent.name if agent and agent.name else f"Агент {ref.tg_agent_id}",
"description": ref.description or "",
"salesSum": sales_sum,
"salesCount": sales_count
})
return JSONResponse(content=result)
@app.get("/stat/sales", tags=["bff"])
def get_sales_stat(
db: Session = Depends(get_db),
date_start: str = Query(None),
date_end: str = Query(None),
):
sales_query = select(Sale)
if date_start:
sales_query = sales_query.where(Sale.create_dttm >= date_start)
if date_end:
sales_query = sales_query.where(Sale.create_dttm <= date_end)
sales = db.exec(sales_query).all()
ref_ids = list(set(sale.ref for sale in sales))
refs = db.exec(select(Ref).where(Ref.id.in_(ref_ids))).all() if ref_ids else []
ref_map = {ref.id: ref for ref in refs}
agent_ids = list(set(ref.tg_agent_id for ref in refs)) if refs else []
agents = db.exec(select(TgAgent).where(TgAgent.id.in_(agent_ids))).all() if agent_ids else []
agent_map = {agent.id: agent for agent in agents}
result = []
for sale in sales:
ref_obj = ref_map.get(sale.ref)
agent_obj = agent_map.get(ref_obj.tg_agent_id) if ref_obj else None
result.append({
"saleId": sale.sale_id,
"cost": sale.cost,
"crediting": sale.crediting,
"ref": ref_obj.ref if ref_obj else None,
"name": agent_obj.name if agent_obj else None
})
return JSONResponse(content=result)
@app.get("/billing/cards", tags=["bff"])
def get_billing_cards(db: Session = Depends(get_db)):
# 1. cost - сумма всех Sale.cost
sales = db.exec(select(Sale)).all()
cost = sum(sale.cost for sale in sales)
# 2. crediting - сумма всех Sale.crediting
crediting = sum(sale.crediting for sale in sales)
# 3. pendingPayouts - разница между crediting и суммой всех Transaction.sum
transactions = db.exec(select(Transaction)).all()
total_transactions = sum(t.sum for t in transactions)
pendingPayouts = crediting - total_transactions
return {
"cost": cost,
"crediting": crediting,
"pendingPayouts": pendingPayouts
}
@app.get("/billing/payouts/transactions", tags=["bff"])
def get_billing_payouts_transactions(
db: Session = Depends(get_db),
date_start: str = Query(None),
date_end: str = Query(None),
):
query = select(Transaction)
if date_start:
query = query.where(Transaction.create_dttm >= date_start)
if date_end:
query = query.where(Transaction.create_dttm <= date_end)
transactions = db.exec(query).all()
result = []
for t in transactions:
agent = db.exec(select(TgAgent).where(TgAgent.id == t.tg_agent_id)).first()
result.append({
"id": t.transaction_id,
"sum": t.sum,
"agent": agent.name if agent else None,
"status": t.status,
"create_dttm": t.create_dttm,
"update_dttm": t.update_dttm,
})
return result
@app.get("/billing/chart/stat", tags=["bff"])
def get_billing_chart_stat(db: Session = Depends(get_db)):
# Группируем транзакции по дате (день) и статусу
result = db.exec(
select(
func.strftime('%Y-%m-%d', Transaction.create_dttm).label('date'),
Transaction.status.label('status'),
func.count(Transaction.id).label('count')
).group_by(
func.strftime('%Y-%m-%d', Transaction.create_dttm),
Transaction.status
).order_by(
func.strftime('%Y-%m-%d', Transaction.create_dttm),
Transaction.status
)
).all()
data = [
{"date": row.date, "status": row.status, "count": row.count}
for row in result
]
return JSONResponse(content=data)
@app.get("/billing/chart/pie", tags=["bff"])
def get_billing_chart_pie(db: Session = Depends(get_db)):
result = db.exec(
select(
Transaction.status.label('status'),
func.count(Transaction.id).label('count')
).group_by(Transaction.status)
).all()
data = [
{"status": row.status, "count": row.count}
for row in result
]
return JSONResponse(content=data)