partner-core/helpers_bff.py

90 lines
3.3 KiB
Python

from sqlmodel import Session, select, create_engine
from passlib.context import CryptContext
from typing import Optional
from datetime import datetime, timedelta
from bff_models import Token, TransactionStatus
from sql_models import Company, TgAgent, Account, AgentBalance, AgentTransaction, PartnerTransaction, Sale, Ref
from hashlib import sha256
import jwt
from jwt.exceptions import InvalidTokenError
from fastapi import HTTPException, status, Depends, Request
from fastapi.security import OAuth2PasswordBearer
# Конфигурация
AUTH_DATABASE_ADDRESS = "sqlite:///partner.db"
AUTH_DB_ENGINE = create_engine(AUTH_DATABASE_ADDRESS, echo=True)
SECRET_KEY = "supersecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_tg_agent_by_tg_id(db: Session, tg_id: int) -> Optional[TgAgent]:
statement = select(TgAgent).where(TgAgent.tg_id == tg_id)
return db.exec(statement).first()
def get_db():
with Session(AUTH_DB_ENGINE) as session:
yield session
def get_current_account(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
login: str = payload.get("sub")
if login is None:
raise credentials_exception
except InvalidTokenError:
raise credentials_exception
account = get_account_by_login(db, login)
if account is None:
raise credentials_exception
return account
async def get_current_tg_agent(request: Request, db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise credentials_exception
hash_value = auth_header.replace("Bearer ", "").strip()
tg_agent = db.exec(select(TgAgent).where(TgAgent.hash == hash_value)).first()
if tg_agent is None:
raise credentials_exception
return tg_agent
def authenticate_tg_agent(engine, tg_id: int):
with Session(engine) as db:
tg_agent = get_tg_agent_by_tg_id(db, tg_id)
if not tg_agent:
return None
return tg_agent
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_account_by_login(db: Session, login: str) -> Optional[Account]:
statement = select(Account).where(Account.login == login)
return db.exec(statement).first()