from sqlmodel import Session, select, create_engine from passlib.context import CryptContext from typing import Optional from datetime import datetime, timedelta from bff_models import Token, TransactionStatus from sql_models import Company, TgAgent, Account, AgentBalance, AgentTransaction, PartnerTransaction, Sale, Ref from hashlib import sha256 import jwt from jwt.exceptions import InvalidTokenError from fastapi import HTTPException, status, Depends, Request from fastapi.security import OAuth2PasswordBearer # Конфигурация AUTH_DATABASE_ADDRESS = "sqlite:///partner.db" AUTH_DB_ENGINE = create_engine(AUTH_DATABASE_ADDRESS, echo=True) SECRET_KEY = "supersecretkey" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def get_tg_agent_by_tg_id(db: Session, tg_id: int) -> Optional[TgAgent]: statement = select(TgAgent).where(TgAgent.tg_id == tg_id) return db.exec(statement).first() def get_db(): with Session(AUTH_DB_ENGINE) as session: yield session def get_current_account(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) login: str = payload.get("sub") if login is None: raise credentials_exception except InvalidTokenError: raise credentials_exception account = get_account_by_login(db, login) if account is None: raise credentials_exception return account async def get_current_tg_agent(request: Request, db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): raise credentials_exception hash_value = auth_header.replace("Bearer ", "").strip() tg_agent = db.exec(select(TgAgent).where(TgAgent.hash == hash_value)).first() if tg_agent is None: raise credentials_exception return tg_agent def authenticate_tg_agent(engine, tg_id: int): with Session(engine) as db: tg_agent = get_tg_agent_by_tg_id(db, tg_id) if not tg_agent: return None return tg_agent def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_account_by_login(db: Session, login: str) -> Optional[Account]: statement = select(Account).where(Account.login == login) return db.exec(statement).first()