from sqlmodel import Session, select, create_engine from passlib.context import CryptContext from typing import Optional from datetime import datetime, timedelta from bff_models import Token, TransactionStatus from sql_models import Company, TgAgent, Account, AgentBalance, AgentTransaction, PartnerTransaction, Sale, Ref, IntegrationToken, CompanyBalance from hashlib import sha256 import jwt from jwt.exceptions import InvalidTokenError from fastapi import HTTPException, status, Depends, Request, Header from fastapi.security import OAuth2PasswordBearer import hashlib # Конфигурация AUTH_DATABASE_ADDRESS = "sqlite:///partner.db" AUTH_DB_ENGINE = create_engine(AUTH_DATABASE_ADDRESS, echo=True) SECRET_KEY = "supersecretkey" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 # JWT Configuration for Integration API INTEGRATION_SECRET_KEY = "your-super-secret-jwt-key" # TODO: Замените на реальный секретный ключ из переменных окружения INTEGRATION_ALGORITHM = "HS256" INTEGRATION_ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # Токен действителен 7 дней oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def get_integration_db(): with Session(AUTH_DB_ENGINE) as session: yield session def create_integration_jwt_token(company_id: int): expires = datetime.utcnow() + timedelta(minutes=INTEGRATION_ACCESS_TOKEN_EXPIRE_MINUTES) payload = { "sub": str(company_id), "exp": expires, "type": "access" } return jwt.encode(payload, INTEGRATION_SECRET_KEY, algorithm=INTEGRATION_ALGORITHM) async def get_current_company_from_jwt( token: str = Depends(OAuth2PasswordBearer(tokenUrl="/token")), db: Session = Depends(get_integration_db) ): """ Зависимость для получения текущей компании на основе JWT токена для Integration API. """ try: payload = jwt.decode(token, INTEGRATION_SECRET_KEY, algorithms=[INTEGRATION_ALGORITHM]) company_id: int = int(payload.get("sub")) if company_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Недействительная полезная нагрузка токена", headers={"WWW-Authenticate": "Bearer"}, ) company = db.exec(select(Company).where(Company.id == company_id)).first() if not company: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Компания не найдена") return company except InvalidTokenError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Недействительный токен", headers={"WWW-Authenticate": "Bearer"}, ) except jwt.ExpiredSignatureError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Срок действия токена истек", headers={"WWW-Authenticate": "Bearer"}, ) def get_tg_agent_by_tg_id(db: Session, tg_id: int) -> Optional[TgAgent]: statement = select(TgAgent).where(TgAgent.tg_id == tg_id) return db.exec(statement).first() def get_db(): with Session(AUTH_DB_ENGINE) as session: yield session def get_current_account(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) login: str = payload.get("sub") if login is None: raise credentials_exception except InvalidTokenError: raise credentials_exception account = get_account_by_login(db, login) if account is None: raise credentials_exception return account async def get_current_tg_agent(request: Request, db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): raise credentials_exception hash_value = auth_header.replace("Bearer ", "").strip() tg_agent = db.exec(select(TgAgent).where(TgAgent.hash == hash_value)).first() if tg_agent is None: raise credentials_exception return tg_agent def authenticate_tg_agent(engine, tg_id: int): with Session(engine) as db: tg_agent = get_tg_agent_by_tg_id(db, tg_id) if not tg_agent: return None return tg_agent def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_account_by_login(db: Session, login: str) -> Optional[Account]: statement = select(Account).where(Account.login == login) return db.exec(statement).first()