Добавлено хеширование паролей для аккаунтов, обновлены функции авторизации и заполнения базы данных. Теперь пароли хранятся в виде хешей, улучшена безопасность системы.

This commit is contained in:
Redsandyg 2025-06-03 11:36:54 +03:00
parent 4c4a84eefe
commit 1f11bd8012
2 changed files with 46 additions and 13 deletions

View File

@ -5,6 +5,7 @@ from main import AUTH_DB_ENGINE, TgAgent, Ref, Sale, Transaction, Account
from sqlalchemy import text from sqlalchemy import text
from datetime import datetime, timedelta from datetime import datetime, timedelta
from hashlib import sha256 from hashlib import sha256
from passlib.context import CryptContext
# Константа: список user_ids # Константа: список user_ids
@ -62,6 +63,11 @@ LOGINS = [
ALL_DESCRIPTIONS = DESCRIPTIONS ALL_DESCRIPTIONS = DESCRIPTIONS
# --- # ---
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password):
return pwd_context.hash(password)
def get_date_list(days=7): def get_date_list(days=7):
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
return [today - timedelta(days=i) for i in range(days, -1, -1)] return [today - timedelta(days=i) for i in range(days, -1, -1)]
@ -80,7 +86,7 @@ def fill_db():
for i in range(4): for i in range(4):
acc = Account( acc = Account(
login=f"user{i+1}", login=f"user{i+1}",
password="password123", # В реальном проекте пароли должны быть захешированы! password_hash=get_password_hash("password123"), # теперь храним хеш
name=NAMES[i % len(NAMES)], name=NAMES[i % len(NAMES)],
email=f"user{i+1}@example.com", email=f"user{i+1}@example.com",
balance=round(random.uniform(1000, 10000), 2) balance=round(random.uniform(1000, 10000), 2)

51
main.py
View File

@ -9,6 +9,7 @@ from uuid import uuid4
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from sqlalchemy import func from sqlalchemy import func
from hashlib import sha256 from hashlib import sha256
import jwt
# Конфигурация # Конфигурация
AUTH_DATABASE_ADDRESS = "sqlite:///partner.db" AUTH_DATABASE_ADDRESS = "sqlite:///partner.db"
@ -54,7 +55,7 @@ class Transaction(SQLModel, table=True):
class Account(SQLModel, table=True): class Account(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True) id: Optional[int] = Field(default=None, primary_key=True)
login: str = Field(index=True, unique=True) login: str = Field(index=True, unique=True)
password: str password_hash: str # теперь хранится hash пароля
name: Optional[str] = None name: Optional[str] = None
email: Optional[str] = None email: Optional[str] = None
balance: float = 0.0 balance: float = 0.0
@ -127,25 +128,41 @@ def authenticate_tg_agent(engine, tg_id: int):
return None return None
return tg_agent return tg_agent
# Защищённый эндпоинт
@app.get("/protected", tags=["partner-tg"])
def protected_route(current_tg_agent: TgAgent = Depends(get_current_tg_agent)):
return {"msg": f"Hello, {current_tg_agent.tg_id}! This is a protected route."}
# Авторизация # Авторизация
SECRET_KEY = "supersecretkey" # Лучше вынести в .env
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
@app.post("/token", response_model=Token, tags=["partner-tg"]) def create_access_token(data: dict, expires_delta: timedelta = None):
async def login_for_access_token(req: TokenRequest): to_encode = data.copy()
tg_id = req.tg_id if expires_delta:
tg_agent = authenticate_tg_agent(AUTH_DB_ENGINE, tg_id) expire = datetime.utcnow() + expires_delta
if not tg_agent: else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/token", response_model=Token, tags=["bff"])
def login_account_for_access_token(
login: str = Body(...),
password: str = Body(...),
db: Session = Depends(get_db)
):
account = get_account_by_login(db, login)
if not account or not verify_password(password, account.password_hash):
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect tg_id", detail="Incorrect login or password",
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
access_token = f"session_for_{tg_agent.tg_id}" access_token = create_access_token(
data={"sub": account.login},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return Token(access_token=access_token, token_type="bearer") return Token(access_token=access_token, token_type="bearer")
@ -474,3 +491,13 @@ def tg_auth(hash: str = Body(..., embed=True), db: Session = Depends(get_db)):
if not tg_agent: if not tg_agent:
raise HTTPException(status_code=401, detail="Hash not found") raise HTTPException(status_code=401, detail="Hash not found")
return {"msg": "Auth success", "tg_id": tg_agent.tg_id} return {"msg": "Auth success", "tg_id": tg_agent.tg_id}
# --- Новый функционал для Account ---
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_account_by_login(db: Session, login: str) -> Optional[Account]:
statement = select(Account).where(Account.login == login)
return db.exec(statement).first()