partner-core/helpers_bff.py

142 lines
5.6 KiB
Python

from sqlmodel import Session, select, create_engine
from passlib.context import CryptContext
from typing import Optional
from datetime import datetime, timedelta
from bff_models import Token, TransactionStatus
from sql_models import Company, TgAgent, Account, AgentBalance, AgentTransaction, PartnerTransaction, Sale, Ref, IntegrationToken, CompanyBalance
from hashlib import sha256
import jwt
from jwt.exceptions import InvalidTokenError
from fastapi import HTTPException, status, Depends, Request, Header
from fastapi.security import OAuth2PasswordBearer
import hashlib
# Конфигурация
AUTH_DATABASE_ADDRESS = "sqlite:///partner.db"
AUTH_DB_ENGINE = create_engine(AUTH_DATABASE_ADDRESS, echo=True)
SECRET_KEY = "supersecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
# JWT Configuration for Integration API
INTEGRATION_SECRET_KEY = "your-super-secret-jwt-key" # TODO: Замените на реальный секретный ключ из переменных окружения
INTEGRATION_ALGORITHM = "HS256"
INTEGRATION_ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # Токен действителен 7 дней
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_integration_db():
with Session(AUTH_DB_ENGINE) as session:
yield session
def create_integration_jwt_token(company_id: int):
expires = datetime.utcnow() + timedelta(minutes=INTEGRATION_ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": str(company_id),
"exp": expires,
"type": "access"
}
return jwt.encode(payload, INTEGRATION_SECRET_KEY, algorithm=INTEGRATION_ALGORITHM)
async def get_current_company_from_jwt(
token: str = Depends(OAuth2PasswordBearer(tokenUrl="/token")),
db: Session = Depends(get_integration_db)
):
"""
Зависимость для получения текущей компании на основе JWT токена для Integration API.
"""
try:
payload = jwt.decode(token, INTEGRATION_SECRET_KEY, algorithms=[INTEGRATION_ALGORITHM])
company_id: int = int(payload.get("sub"))
if company_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Недействительная полезная нагрузка токена",
headers={"WWW-Authenticate": "Bearer"},
)
company = db.exec(select(Company).where(Company.id == company_id)).first()
if not company:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Компания не найдена")
return company
except InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Недействительный токен",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Срок действия токена истек",
headers={"WWW-Authenticate": "Bearer"},
)
def get_tg_agent_by_tg_id(db: Session, tg_id: int) -> Optional[TgAgent]:
statement = select(TgAgent).where(TgAgent.tg_id == tg_id)
return db.exec(statement).first()
def get_db():
with Session(AUTH_DB_ENGINE) as session:
yield session
def get_current_account(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
login: str = payload.get("sub")
if login is None:
raise credentials_exception
except InvalidTokenError:
raise credentials_exception
account = get_account_by_login(db, login)
if account is None:
raise credentials_exception
return account
async def get_current_tg_agent(request: Request, db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise credentials_exception
hash_value = auth_header.replace("Bearer ", "").strip()
tg_agent = db.exec(select(TgAgent).where(TgAgent.hash == hash_value)).first()
if tg_agent is None:
raise credentials_exception
return tg_agent
def authenticate_tg_agent(engine, tg_id: int):
with Session(engine) as db:
tg_agent = get_tg_agent_by_tg_id(db, tg_id)
if not tg_agent:
return None
return tg_agent
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_account_by_login(db: Session, login: str) -> Optional[Account]:
statement = select(Account).where(Account.login == login)
return db.exec(statement).first()