Authentication Implementation Overview#

        sequenceDiagram
    participant Client as Client
    participant FastAPI as FastAPI App
    participant AuthServer as AuthServer
    participant DB as User Store
    Client->>FastAPI: POST /token (username + password form)
    FastAPI->>AuthServer: issue_token(username, password)
    AuthServer->>DB: authen_user(username, password)
    DB-->>AuthServer: user dict or None
    AuthServer-->>FastAPI: JWT access token
    FastAPI-->>Client: {access_token, token_type: bearer}
    Client->>FastAPI: POST /protected (Authorization: Bearer token)
    FastAPI->>AuthServer: verify_token(token)
    AuthServer-->>FastAPI: decoded claims
    FastAPI-->>Client: protected resource
    

Authentication in FastAPI typically involves:

  • Verifying user credentials (username/password)

  • Issuing JWT tokens

  • Protecting routes by validating tokens on each request

  • Integrating with FastAPI’s dependency injection system

Build AuthServer#

Install required packages#

pip install python-jose passlib[bcrypt]

Implement base AuthServer#

from datetime import datetime, timedelta
from jose import JWTError, jwt


class AuthServer:
    def __init__(self):
        self._access_token_expire_mins = 30
        self._secret_key = "supersecretkey123"
        self._algorithm = "HS256"

    def authen_user(self, *args, **kwargs) -> dict:
        raise NotImplementedError()

    def issue_token(self, *args, **kwargs):
        user_dict = self.authen_user(*args, **kwargs)
        if user_dict is None:
            raise Exception("User not authenticated")

        to_encode = user_dict.copy()
        expire = datetime.now() + timedelta(minutes=self._access_token_expire_mins)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, self._secret_key, algorithm=self._algorithm)

    def verify_token(self, token: str) -> dict:
        return jwt.decode(token, self._secret_key, algorithms=[self._algorithm])


class FakeAuthServer(AuthServer):
    def authen_user(self, *args, **kwargs):
        return kwargs


# Sample test
auth_server = FakeAuthServer()
token = auth_server.issue_token(user_id="HoHai", password="123")
print(f"Token: {token}")
print(f"Decode token: {auth_server.verify_token(token)}")

Implement AuthUserPassServer#

import inspect
import asyncio
import nest_asyncio

class AuthUserPassServer(AuthServer):
    def __init__(self, fn_check_userpass):
        super().__init__()
        assert fn_check_userpass is not None, "Must set `fn_check_userpass`"
        self.fn = fn_check_userpass

    def authen_user(self, username, password) -> dict:
        # Support async functions
        if inspect.iscoroutinefunction(self.fn):
            nest_asyncio.apply()
            loop = asyncio.get_event_loop()
            task = loop.create_task(self.fn(username, password))
            loop.run_until_complete(task)
            return task.result()
        else:
            return self.fn(username, password)


# Sample test
auth_server = AuthUserPassServer(fn_check_userpass=lambda u, p: {"user": u, "password": p})
token = auth_server.issue_token(username="hohai", password="13")
print(f"Token: {token}")
print(f"Decode token: {auth_server.verify_token(token)}")

Apply in FastAPI (Minimal)#

import uvicorn
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordRequestForm
import nest_asyncio
import threading

nest_asyncio.apply()

app = FastAPI()

auth_server = AuthUserPassServer(fn_check_userpass=lambda u, p: {"user": u, "password": p})


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    token = auth_server.issue_token(form_data.username, form_data.password)
    return {"access_token": token, "token_type": "bearer"}

@app.post("/me")
async def verify(token: str):
    return auth_server.verify_token(token)


def run_app():
    uvicorn.run(app, host="0.0.0.0", port=8000)

thread = threading.Thread(target=run_app, daemon=True, name="Thread run FASTAPI")
thread.start()
        graph TD
    Req[POST /token\nContent-Type: application/x-www-form-urlencoded\ngrant_type=password\nusername=johndoe\npassword=secret] --> DI[FastAPI Depends\nOAuth2PasswordRequestForm]
    DI --> Parse[Parse form fields\nusername password scope\ngrant_type client_id client_secret]
    Parse --> Handler[login form_data\nform_data.username\nform_data.password]
    Handler --> Token[return access_token]
    style DI fill:#9cf,stroke:#333
    style Parse fill:#9f9,stroke:#333
    

What is OAuth2PasswordRequestForm in FastAPI#

In short#

It’s a dependency class that parses and validates a login form sent with these fields:

  • username

  • password

  • (optional) scope

  • (optional) grant_type

  • (optional) client_id

  • (optional) client_secret

Where it comes from#

from fastapi.security import OAuth2PasswordRequestForm

Typical usage:

from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordRequestForm

app = FastAPI()

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = {"username": form_data.username}
    # verify username & password here...
    return {"access_token": user_dict["username"], "token_type": "bearer"}

Expected request format#

The client must send the data as application/x-www-form-urlencoded, not JSON:

POST /token
Content-Type: application/x-www-form-urlencoded

grant_type=password&username=johndoe&password=secret

Why it’s useful#

  • Automatically reads and validates form data.

  • Matches the OAuth2 “Resource Owner Password Credentials” grant specification.

  • Works seamlessly with FastAPI’s OAuth2 and security utilities.

Password Hashing#

Use passlib or bcrypt for secure hashing. Never store plain passwords.

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str):
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str):
    return pwd_context.verify(plain_password, hashed_password)

Role-Based Access Control (RBAC)#

async def require_role(role: str):
    async def role_checker(current_user = Depends(get_current_user)):
        if current_user.role != role:
            raise HTTPException(status_code=403, detail="Not enough permissions")
    return role_checker

@app.get("/admin")
async def admin_dashboard(role_check = Depends(require_role("admin"))):
    return {"msg": "Welcome Admin"}

CORS Configuration#

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # or specific domains
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)