# Authentication utilities import os import hashlib import re import logging from datetime import datetime, timedelta from bson import ObjectId from utils.extensions import ext from utils.config import Config def hash_password(password: str) -> str: salt = os.urandom(32) hashed = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000) password_hash = (salt + hashed).hex() return password_hash def verify_password(stored_password: str, provided_password: str) -> bool: if not stored_password or not all(c in '0123456789abcdefABCDEF' for c in stored_password): logging.error("Định dạng mật khẩu lưu trữ không hợp lệ") return False try: stored_bytes = bytes.fromhex(stored_password) salt = stored_bytes[:32] stored_hash = stored_bytes[32:] provided_hash = hashlib.pbkdf2_hmac( 'sha256', provided_password.encode('utf-8'), salt, 100000 ) return stored_hash == provided_hash except ValueError as e: logging.error(f"Lỗi trong verify_password: {e}") return False def validate_phone(phone: str) -> bool: return bool(re.match(r'^\+84\d{9}$|^0\d{9}$', phone)) def validate_email(email: str) -> bool: return bool(re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email)) def reset_query_count(user_id): try: user = ext.db.users.find_one({'_id': ObjectId(user_id)}) if not user or user.get('account_type') == 'unlimited': return last_reset = user.get('last_reset') if last_reset and datetime.utcnow() - last_reset > timedelta(days=1): ext.db.users.update_one( {'_id': ObjectId(user_id)}, {'$set': {'query_count': 0, 'last_reset': datetime.utcnow()}} ) logging.info(f"Đã reset số lượt truy vấn cho người dùng {user_id}") except Exception as e: logging.error(f"Error resetting query count: {e}")