init.py#

```## main.py
```python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session


from models import schemas


from db_core.db import get_db_connection

import uvicorn

from models import crud

# Create tables
# Remove (refer to init_models)
# Base.metadata.create_all(bind=engine)


# expose auth API
from auth.auth import get_password_hash, auth_server
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from fastapi import status


# from fastapi.security import OAuth2PasswordBearer
# from fastapi import Depends, HTTPException, status

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

from starlette.middleware.trustedhost import TrustedHostMiddleware

app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["*", "*.devtunnels.ms"]
)



@app.post("/users/", response_model=schemas.UserRead)
async def create_user(user: schemas.UserCreate, db: Session = Depends(get_db_connection)):
    db_user = await crud.create_user(db=db,
                                     user=schemas.UserCreate(
                                         name=user.name,
                                         email=user.email,
                                         password=get_password_hash(user.password)))


    return db_user

@app.get("/users/")
async def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db_connection)):
    users = await crud.get_users(db, skip=skip, limit=limit)

    return users

@app.get("/users/{username}", response_model=schemas.UserRead)
async def read_user(username: str, db: Session = Depends(get_db_connection)):
    db_user = await crud.get_user_by_username(db, username==username)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

# Bad behavior
@app.post("/users/{username}")
async def update_user(username: str, user: schemas.UserCreate, db: Session = Depends(get_db_connection)):
    db_user = await crud.update_user(db, username=username, user=user)
    return db_user

@app.delete("/users/{username}")
async def delete_user(username: str, db = Depends(get_db_connection)):
    db_user = await crud.delete_user(db, username)
    if not db_user:
        raise HTTPException(status_code=404, detail="User not found")

    return db_user



@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    token_object = await auth_server.issue_token(form_data.username, form_data.password)
    if token_object is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return token_object


# from auth.auth_in_fastapi import oauth2_scheme, create_post_with_auth
from models.crud import create_post

@app.post("/post")
async def create_post_with_protected(content: str, db: Session = Depends(get_db_connection), token: str = Depends(oauth2_scheme)):
    payload = auth_server.verify_token(token) # throw exception if not

    post = await create_post(db, content)
    # result = await create_post_with_auth(content=content, db=db, token=token)
    return post



if __name__ == "__main__":
    # TODO: Should be run here to make sure that all tables (which mapped with models) will be scan in `metadata`
    # asyncio.run(init_models())



    # uvicorn.run(app, host="127.0.0.1", port=8000, reload=True)
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
    # uvicorn.run("main:app", host="127.0.0.1", port=8000)
```## auth.py
```python
# Build AuthServer
from datetime import datetime, timedelta
from jose import JWTError, jwt
import inspect
from passlib.context import CryptContext

# Need to make async :(
class AuthServerAsync:
    def __init__(self):
        # You can have more flexibilites by putting these info in contructor
        self._access_token_expire_mins = 30
        self._secret_key = "supersecretkey123"
        self._algorithm = "HS256"

    async def authen_user(self, *args, **kwargs) -> dict:
        raise NotImplementedError()

    async def issue_token(self, *args, **kwargs):
        user_dict = await self.authen_user(*args, **kwargs)
        if user_dict is None:
            raise Exception("User not authenticated")

        to_encode = user_dict.copy()
        expire = datetime.now() + timedelta(minutes=self._access_token_expire_mins)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, self._secret_key, algorithm=self._algorithm)

    async def verify_token(self, token: str) -> dict:
        try:
            payload = jwt.decode(token, self._secret_key, algorithms=[self._algorithm])
            return payload
        except JWTError as e:
            raise e



class AuthUserPassServerAsync(AuthServerAsync):
    def __init__(self, fn_check_userpass):
        super().__init__()
        assert fn_check_userpass is not None, "Must set `fn_check_userpass`"
        self.fn = fn_check_userpass

    # Overriding sync with async breaks Liskov Substitution
    async def authen_user(self, username, password) -> dict:
        # Support async functions
        # if inspect.iscoroutinefunction(self.fn):
        #     import asyncio
        #     import nest_asyncio

        #     # Support jupyter
        #     nest_asyncio.apply()

        #     loop = asyncio.get_event_loop()
        #     task = loop.create_task(self.fn(username, password))
        #     loop.run_until_complete(task)
        #     return task.result()
        # else:
        #     return self.fn(username, password)

        if not inspect.iscoroutinefunction(self.fn):
            raise Exception("Please make sure you put `sync` function")

        return await self.fn(username, password)

class UnAuthorizedException(Exception):
    def __init__(self, *args):
        super().__init__("UnAuthorized.")





import asyncio
from greenlet import greenlet

async def greenlet_spawn(func, *args, **kwargs):
    loop = asyncio.get_running_loop()
    fut = loop.create_future()

    def run():
        try:
            res = func(*args, **kwargs)
            loop.call_soon_threadsafe(fut.set_result, res)
        except BaseException as e:
            loop.call_soon_threadsafe(fut.set_exception, e)

    g = greenlet(run)
    g.switch()
    return await fut

# class AuthUserPassServerAsync:
#     def __init__(self, sync_server: AuthUserPassServer):
#         self.sync_server = sync_server

#     async def authen_user(self, username, password):
#         fn = self.sync_server.fn

#         # Case A: async function provided
#         if inspect.iscoroutinefunction(fn):
#             return await fn(username, password)

#         # Case B: sync function provided โ†’ offload to greenlet
#         return await greenlet_spawn(fn, username, password)

#     async def issue_token(self, *args, **kwargs):
#         return await greenlet_spawn(self.sync_server.issue_token, *args, **kwargs)

#     async def verify_token(self, token):
#         return await greenlet_spawn(self.sync_server.verify_token, token)

# import asyncio
# from greenlet import greenlet

# async def greenlet_spawn(func, *args, **kwargs):
#     loop = asyncio.get_running_loop()
#     fut = loop.create_future()

#     def run():
#         try:
#             res = func(*args, **kwargs)
#             loop.call_soon_threadsafe(fut.set_result, res)
#         except BaseException as e:
#             loop.call_soon_threadsafe(fut.set_exception, e)

#     greenlet(run).switch()
#     return await fut


# class AuthUserPassServerAsync:
#     def __init__(self, sync_server: AuthUserPassServer):
#         self.sync = sync_server

#     # -------------------------------------------
#     # ASYNC user authentication
#     # -------------------------------------------
#     async def authen_user(self, username, password):
#         fn = self.sync.fn

#         if inspect.iscoroutinefunction(fn):
#             return await fn(username, password)
#         else:
#             return await greenlet_spawn(fn, username, password)

#     # -------------------------------------------
#     # ASYNC token creation (reimplemented)
#     # -------------------------------------------
#     async def issue_token(self, username, password):
#         """Async version independent of sync logic."""
#         user_dict = await self.authen_user(username, password)

#         if not user_dict:
#             raise Exception("User not authenticated")

#         to_encode = user_dict.copy()
#         expire = datetime.now() + timedelta(
#             minutes=self.sync._access_token_expire_mins
#         )
#         to_encode.update({"exp": expire})

#         return jwt.encode(
#             to_encode,
#             self.sync._secret_key,
#             algorithm=self.sync._algorithm
#         )

#     # -------------------------------------------
#     # VERIFY token (still sync โ†’ run in greenlet)
#     # -------------------------------------------
#     async def verify_token(self, token):
#         return await greenlet_spawn(self.sync.verify_token, token)





pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    truncated = password.encode("utf-8")[:72].decode("utf-8", errors="ignore")
    return pwd_context.hash(truncated)

# Specified in FastAPI and our application
from models.crud import get_user_by_username
from db_core.db import get_db_connection

async def fn_check_user(username, password):
    # db = await get_db_connection()

    gen = get_db_connection()
    db = await gen.__anext__()  # get the yielded db

    try:

        user_in_db = await get_user_by_username(db, username)

        if not user_in_db or not verify_password(password, user_in_db.password):
            raise UnAuthorizedException()
        return {
            "username": "OK",
            "email": "Yes"
        }
    finally:
        await gen.aclose()

auth_server = AuthUserPassServerAsync(fn_check_user)

```## [TOBE-REMOVE]auth-bk.py
```python
# Build AuthServer
from datetime import datetime, timedelta
from jose import JWTError, jwt
import inspect
from passlib.context import CryptContext

class AuthServer:
    def __init__(self):
        # You can have more flexibilites by putting these info in contructor
        self._access_token_expire_mins = 30
        self._secret_key = "supersecretkey123"
        self._algorithm = "HS256"

    def authen_user(self, *args, **kwargs) -> dict:
        raise NotImplementedError()

    def issue_token(self, *args, **kwargs):
        user_dict = self.authen_user(*args, **kwargs)
        if user_dict is None:
            raise Exception("User not authenticated")

        to_encode = user_dict.copy()
        expire = datetime.now() + timedelta(minutes=self._access_token_expire_mins)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, self._secret_key, algorithm=self._algorithm)

    def verify_token(self, token: str) -> dict:
        return jwt.decode(token, self._secret_key, algorithms=[self._algorithm])


class AuthUserPassServer(AuthServer):
    def __init__(self, fn_check_userpass):
        super().__init__()
        assert fn_check_userpass is not None, "Must set `fn_check_userpass`"
        self.fn = fn_check_userpass

    # Overriding sync with async breaks Liskov Substitution
    def authen_user(self, username, password) -> dict:
        # Support async functions
        if inspect.iscoroutinefunction(self.fn):
            # import asyncio
            # import nest_asyncio

            # # Support jupyter
            # nest_asyncio.apply()

            # loop = asyncio.get_event_loop()
            # task = loop.create_task(self.fn(username, password))
            # loop.run_until_complete(task)
            # return task.result()


            # Delegate async fn to the async wrapper class
            # This ensures sync API never deals with async directly โ†’ LSP preserved.
            raise RuntimeError(
                "authen_user() received async function; "
                "use AuthUserPassServerAsync for async usage."
            )

        return self.fn(username, password)

import asyncio
from greenlet import greenlet

async def greenlet_spawn(func, *args, **kwargs):
    loop = asyncio.get_running_loop()
    fut = loop.create_future()

    def run():
        try:
            res = func(*args, **kwargs)
            loop.call_soon_threadsafe(fut.set_result, res)
        except BaseException as e:
            loop.call_soon_threadsafe(fut.set_exception, e)

    g = greenlet(run)
    g.switch()
    return await fut

class AuthUserPassServerAsync:
    def __init__(self, sync_server: AuthUserPassServer):
        self.sync_server = sync_server

    async def authen_user(self, username, password):
        fn = self.sync_server.fn

        # Case A: async function provided
        if inspect.iscoroutinefunction(fn):
            return await fn(username, password)

        # Case B: sync function provided โ†’ offload to greenlet
        return await greenlet_spawn(fn, username, password)

    async def issue_token(self, *args, **kwargs):
        return await greenlet_spawn(self.sync_server.issue_token, *args, **kwargs)

    async def verify_token(self, token):
        return await greenlet_spawn(self.sync_server.verify_token, token)


# import inspect
# import asyncio
# from functools import partial

# class AuthUserPassServer(AuthServer):
#     def __init__(self, fn_check_userpass):
#         assert fn_check_userpass, "Must set fn_check_userpass"
#         self.fn = fn_check_userpass

#     def authen_user(self, username, password):
#         """Synchronous API โ€” always safe to call."""
#         if inspect.iscoroutinefunction(self.fn):
#             # Run async fn inside thread-safe executor
#             loop = asyncio.get_event_loop()
#             return loop.run_until_complete(
#                 self.async_authen_user(username, password)
#             )
#         else:
#             return self.fn(username, password)

#     async def async_authen_user(self, username, password):
#         """Async API โ€” never blocks."""
#         if inspect.iscoroutinefunction(self.fn):
#             return await self.fn(username, password)

#         # Offload sync function to a background thread
#         loop = asyncio.get_running_loop()
#         return await loop.run_in_executor(
#             None,
#             partial(self.fn, username, password)
# )


# ## Version `greenlet`
# import asyncio
# from greenlet import greenlet

# # Greenlet utility, similar to SQLAlchemy's `greenlet_spawn`
# async def greenlet_spawn(func, *args, **kwargs):
#     loop = asyncio.get_running_loop()
#     fut = loop.create_future()

#     def run_in_greenlet():
#         try:
#             result = func(*args, **kwargs)
#             loop.call_soon_threadsafe(fut.set_result, result)
#         except BaseException as e:
#             loop.call_soon_threadsafe(fut.set_exception, e)

#     g = greenlet(run_in_greenlet)
#     g.switch()
#     return await fut


# class AuthUserPassServer(AuthServer):
#     def __init__(self, fn_check_userpass):
#         assert fn_check_userpass, "Must set fn_check_userpass"
#         self.fn = fn_check_userpass

#     # -----------------------------------------
#     # 1) Synchronous API
#     # -----------------------------------------
#     def authen_user(self, username, password):
#         """Synchronous wrapper that never blocks the event loop."""
#         if inspect.iscoroutinefunction(self.fn):
#             # Sync API calls async API (LSP preserved)
#             return asyncio.run(
#                 self.async_authen_user(username, password)
#             )
#         else:
#             # Direct sync call
#             return self.fn(username, password)

#     # -----------------------------------------
#     # 2) Asynchronous API
#     # -----------------------------------------
#     async def async_authen_user(self, username, password):
#         """Async API that supports both sync and async functions."""

#         if inspect.iscoroutinefunction(self.fn):
#             # Case A: fn is async โ†’ run its await in a greenlet
#             return await greenlet_spawn(
#                 lambda: asyncio.run(self.fn(username, password))
#             )

#         else:
#             # Case B: fn is sync โ†’ run sync fn in greenlet
#             return await greenlet_spawn(
#                 self.fn, username, password
#             )



class UnAuthorizedException(Exception):
    def __init__(self, *args):
        super().__init__("UnAuthorized.")




pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    truncated = password.encode("utf-8")[:72].decode("utf-8", errors="ignore")
    return pwd_context.hash(truncated)

# Specified in FastAPI and our application
from models.crud import get_user_by_username
from db_core.db import get_db_connection

async def fn_check_user(username, password):
    # db = await get_db_connection()

    gen = get_db_connection()
    db = await gen.__anext__()  # get the yielded db

    try:

        user_in_db = await get_user_by_username(db, username)

        if not user_in_db or not verify_password(password, user_in_db.password):
            raise UnAuthorizedException()
        return {
            "username": "OK",
            "email": "Yes"
        }
    finally:
        await gen.aclose()

auth_server = AuthUserPassServer(fn_check_user)

```## util.py
```python
from passlib.context import CryptContext


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    # return pwd_context.hash(password[:72])
    # print(f"[DEBUG] type(password)={type(password)}, value={repr(password)}")
    # if isinstance(password, str):
    #     password = password[:72]
    # else:
    #     raise TypeError(f"Expected str for password, got {type(password)}")
    # return pwd_context.hash(password)
    truncated = password.encode("utf-8")[:72].decode("utf-8", errors="ignore")
    return pwd_context.hash(truncated)
```## __init__.py
```python

```## [TOBE-REMOVE]auth_in_fastapi.py
```python
from datetime import datetime, timedelta
from jose import JWTError, jwt



# JWT settings: Should be keep in secret place
SECRET_KEY = "supersecretkey123"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


from .util import verify_password


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)



# Implement OAuth workflow couple with FastAPI
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends, HTTPException, status

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

from models.crud import get_user_by_username

async def authenticate_user(db, user_name:str, password: str):
    user_in_db = await get_user_by_username(db, user_name)

    if not user_in_db or not verify_password(password, user_in_db.password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    token = create_user_token(user_in_db)
    return {"access_token": token, "token_type": "bearer"}

    # return user_in_db


def create_user_token(user: dict):
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    return create_access_token(
        data={"user": user.name}, expires_delta=access_token_expires
    )

# async def get_current_user(token: str = Depends(oauth2_scheme)):
#     credentials_exception = HTTPException(
#         status_code=status.HTTP_401_UNAUTHORIZED,
#         detail="Could not validate credentials",
#         headers={"WWW-Authenticate": "Bearer"},
#     )

#     try:
#         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
#         username = payload.get("user")
#         if username is None:
#             raise credentials_exception

#         user = await get_user_by_username(username)
#         if user is None:
#             raise credentials_exception

#         return user
#     except JWTError:
#         raise credentials_exception

from models.crud import create_post
async def create_post_with_auth(content, db, token):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("user")
        if username is None:
            raise credentials_exception

        post = await create_post(db, content)


        return post
    except JWTError:
        raise credentials_exception

```## env.py
```python
from logging.config import fileConfig

# from sqlalchemy import engine_from_config # No need
from sqlalchemy import pool # No need

from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata


# HoHai: I want to import models from sources. Then becareful with
# with the PYTHONPATH which setup at runtime to allow python can
# discover python package
from models import models
target_metadata = models.expose_base_meta()


from db_core.db import engine

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    # async with engine.begin() as conn:


    # with context.begin_transaction():
    #     context.run_migrations()


async def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """

    # Option 1: Create engine from config
    # connectable = engine_from_config(
    #     config.get_section(config.config_ini_section, {}),
    #     prefix="sqlalchemy.",
    #     poolclass=pool.NullPool,
    # context.configure(
    #     connection=connection, target_metadata=target_metadata
    # )

    # with context.begin_transaction():
    #     context.run_migrations()
    # )

    # Option 2: reuse from db which we want to share

    print(target_metadata.tables)
    """Run migrations in 'online' (async) mode."""
    async with engine.connect() as conn:
        await conn.run_sync(do_run_migrations)


def do_run_migrations(connection):

    """Helper function to actually run migrations inside sync context."""
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,              # detect column type changes
        compare_server_default=True,    # detect default value changes
    )

    with context.begin_transaction():
        context.run_migrations()






# if context.is_offline_mode():
#     run_migrations_offline()
# else:
#     run_migrations_online()


import asyncio
asyncio.run(run_migrations_online())
```## 0002b4bcec43_init_database.py
```python
"""init database

Revision ID: 0002b4bcec43
Revises:
Create Date: 2025-11-10 19:34:30.997117

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '0002b4bcec43'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Upgrade schema."""
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('posts',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('content', sa.String(length=100), nullable=True),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_posts_id'), 'posts', ['id'], unique=False)
    op.create_table('users',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('name', sa.String(length=50), nullable=True),
    sa.Column('email', sa.String(length=120), nullable=True),
    sa.Column('address', sa.String(length=250), nullable=True),
    sa.Column('password', sa.String(length=100), nullable=True),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
    op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
    # ### end Alembic commands ###


def downgrade() -> None:
    """Downgrade schema."""
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_index(op.f('ix_users_id'), table_name='users')
    op.drop_index(op.f('ix_users_email'), table_name='users')
    op.drop_table('users')
    op.drop_index(op.f('ix_posts_id'), table_name='posts')
    op.drop_table('posts')
    # ### end Alembic commands ###

```## models.py
```python
from sqlalchemy import Column, Integer, String
from db_core.db import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50))
    email = Column(String(120), unique=True, index=True)
    address = Column(String(250)) # We want to add new columns
    password = Column(String(100))


# HoHai test
class Posts(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    content = Column(String(100))


def expose_base_meta():
    return Base.metadata
```## schemas.py
```python
from pydantic import BaseModel, EmailStr

class UserBase(BaseModel):
    name: str
    password: str
    email: EmailStr

class UserCreate(UserBase):
    pass

class UserRead(UserBase):
    id: int

    class Config:
        orm_mode = True  # allows conversion from SQLAlchemy objects

```## crud.py
```python
from sqlalchemy.orm import Session
from models import models

from models import schemas

def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()

# def get_users(db: Session, skip: int = 0, limit: int = 10):
#     return db.query(models.User).offset(skip).limit(limit).all()

from sqlalchemy.future import select




# Update in auth: we can expose to utitilty
# from auth.util import get_password_hash



async def get_users(db, skip: int = 0, limit: int = 100):
    result = await db.execute(select(models.User).offset(skip).limit(limit))
    return result.scalars().all()

# Actually db is not Session in the case AsyncSession (AsyncSession will wrap a Session instance)
async def create_user(db: Session, user: schemas.UserCreate):
    db_user = models.User(name=user.name, email=user.email, password=user.password)
    db.add(db_user)
    await db.commit() # Should await here
    await db.refresh(db_user)
    return db_user


# Update a use
# Bad behavior: no denote types in parameter
async def update_user(db: Session, username: str, user):

    # user_in_db = db.query(models.User).filter(models.User.id == user_id).first()

    # query_results = await db.execute(select(models.User).where(models.User.id == user_id))

    # user_in_db = query_results.scalar_one_or_none()
    user_in_db = await get_user_by_username(db, username)

    if user_in_db:
        # Update
        user_in_db.email = user.email
        user_in_db.name = user.name
        user_in_db.password = user.password


    await db.commit()
    return user_in_db


async def delete_user(db: Session, username: str):
    # Fetch the user
    # db_user = await db.get(models.User, user_id)
    db_user = await get_user_by_username(db, username)
    if db_user is None:
        return None  # or raise an exception

    # Delete the user
    await db.delete(db_user)
    await db.commit()  # must await

    return db_user  # returning the deleted object is optional


# add new method: getting user by user_name. Assume that username is unique in our app
async def get_user_by_username(db: Session, username):
    result = await db.execute(select(models.User).where(models.User.name == username))
    return result.scalar_one_or_none()


# Create/update post
async def create_post(db: Session, content: str):
    post = models.Posts(content=content)
    db.add(post)
    await db.commit()
    await db.refresh(post)
    return post

# async def get_posts(db:Session):
#     result = await db.execute(select(models.Posts))


```## db.py
```python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker


# Need to keep in secret place, don't hardcode like this
# Format: postgresql+psycopg2://user:password@host:port/database
DATABASE_URL = "postgresql+asyncpg://root:postgres@localhost:5432/example_pgdb"


engine = create_async_engine(DATABASE_URL, echo=True)





AsyncSessionLocal = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)


async def get_db_connection():
    db = AsyncSessionLocal() # Create an instance for every calling time
    try:
        yield db
    finally:
        await db.close()



from sqlalchemy.ext.declarative import declarative_base


Base = declarative_base()



async def init_models():
    async with engine.begin() as conn:
        # Run the synchronous `create_all` within the async context
        await conn.run_sync(Base.metadata.create_all)

# import asyncio
# # Run the async function
# if __name__ == "__main__":
#     asyncio.run(init_models())


```## __init__.py
```python

```## __init__.py
```python

```## main.py
```python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session


from models import schemas


from db_core.db import get_db_connection

import uvicorn

from models import crud

# Create tables
# Remove (refer to init_models)
# Base.metadata.create_all(bind=engine)

app = FastAPI()

@app.post("/users/", response_model=schemas.UserRead)
async def create_user(user: schemas.UserCreate, db: Session = Depends(get_db_connection)):
    db_user = await crud.create_user(db=db, user=user)


    return db_user

@app.get("/users/")
async def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db_connection)):
    users = await crud.get_users(db, skip=skip, limit=limit)

    return users

@app.get("/users/{user_id}", response_model=schemas.UserRead)
async def read_user(user_id: int, db: Session = Depends(get_db_connection)):
    db_user = await crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

# Bad behavior
@app.post("/users/{username}")
async def update_user(username: str, user: schemas.UserCreate, db: Session = Depends(get_db_connection)):
    db_user = await crud.update_user(db, username=username, user=user)
    return db_user




# expose auth API
from auth.auth_in_fastapi import authenticate_user
from fastapi.security import OAuth2PasswordRequestForm
from fastapi import status

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db_connection)):
    token_object = await authenticate_user(db, form_data.username, form_data.password)
    return token_object


from auth.auth_in_fastapi import oauth2_scheme, create_post_with_auth
@app.post("/post")
async def create_post_with_protected(content: str, db: Session = Depends(get_db_connection), token: str = Depends(oauth2_scheme)):
    result = await create_post_with_auth(content=content, db=db, token=token)
    return result



if __name__ == "__main__":
    # TODO: Should be run here to make sure that all tables (which mapped with models) will be scan in `metadata`
    # asyncio.run(init_models())



    # uvicorn.run(app, host="127.0.0.1", port=8000, reload=True)
    uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)
    # uvicorn.run("main:app", host="127.0.0.1", port=8000)
```## auth.py
```python
# Build AuthServer
from datetime import datetime, timedelta
from jose import JWTError, jwt
import inspect
from passlib.context import CryptContext

class AuthServer:
    def __init__(self):
        # You can have more flexibilites by putting these info in contructor
        self._access_token_expire_mins = 30
        self._secret_key = "supersecretkey123"
        self._algorithm = "HS256"

    def authen_user(self, *args, **kwargs) -> dict:
        raise NotImplementedError()

    def issue_token(self, *args, **kwargs):
        user_dict = self.authen_user(*args, **kwargs)
        if user_dict is None:
            raise Exception("User not authenticated")

        to_encode = user_dict.copy()
        expire = datetime.now() + timedelta(minutes=self._access_token_expire_mins)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, self._secret_key, algorithm=self._algorithm)

    def verify_token(self, token: str) -> dict:
        return jwt.decode(token, self._secret_key, algorithms=[self._algorithm])


class AuthUserPassServer(AuthServer):
    def __init__(self, fn_check_userpass):
        super().__init__()
        assert fn_check_userpass is not None, "Must set `fn_check_userpass`"
        self.fn = fn_check_userpass

    # Overriding sync with async breaks Liskov Substitution
    def authen_user(self, username, password) -> dict:
        # Support async functions
        if inspect.iscoroutinefunction(self.fn):
            import asyncio
            import nest_asyncio

            # Support jupyter
            nest_asyncio.apply()

            loop = asyncio.get_event_loop()
            task = loop.create_task(self.fn(username, password))
            loop.run_until_complete(task)
            return task.result()
        else:
            return self.fn(username, password)

class UnAuthorizedException(Exception):
    def __init__(self, *args):
        super().__init__("UnAuthorized.")




pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    truncated = password.encode("utf-8")[:72].decode("utf-8", errors="ignore")
    return pwd_context.hash(truncated)

# Specified in FastAPI and our application
from models.crud import get_user_by_username
from db_core.db import get_db_connection

async def fn_check_user(user_name, password):
    db = await get_db_connection()
    user_in_db = await get_user_by_username(db, user_name)
    if not user_name or verify_password(password, user_in_db.password):
        raise UnAuthorizedException()
    return {
        "user": user_in_db
    }


```## util.py
```python
from passlib.context import CryptContext


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    # return pwd_context.hash(password[:72])
    # print(f"[DEBUG] type(password)={type(password)}, value={repr(password)}")
    # if isinstance(password, str):
    #     password = password[:72]
    # else:
    #     raise TypeError(f"Expected str for password, got {type(password)}")
    # return pwd_context.hash(password)
    truncated = password.encode("utf-8")[:72].decode("utf-8", errors="ignore")
    return pwd_context.hash(truncated)
```## __init__.py
```python

```## auth_in_fastapi.py
```python
from datetime import datetime, timedelta
from jose import JWTError, jwt



# JWT settings: Should be keep in secret place
SECRET_KEY = "supersecretkey123"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


from .util import verify_password


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)



# Implement OAuth workflow couple with FastAPI
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends, HTTPException, status

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

from models.crud import get_user_by_username

async def authenticate_user(db, user_name:str, password: str):
    user_in_db = await get_user_by_username(db, user_name)

    if not user_in_db or not verify_password(password, user_in_db.password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    token = create_user_token(user_in_db)
    return {"access_token": token, "token_type": "bearer"}

    # return user_in_db


def create_user_token(user: dict):
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    return create_access_token(
        data={"user": user.name}, expires_delta=access_token_expires
    )

# async def get_current_user(token: str = Depends(oauth2_scheme)):
#     credentials_exception = HTTPException(
#         status_code=status.HTTP_401_UNAUTHORIZED,
#         detail="Could not validate credentials",
#         headers={"WWW-Authenticate": "Bearer"},
#     )

#     try:
#         payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
#         username = payload.get("user")
#         if username is None:
#             raise credentials_exception

#         user = await get_user_by_username(username)
#         if user is None:
#             raise credentials_exception

#         return user
#     except JWTError:
#         raise credentials_exception

from models.crud import create_post
async def create_post_with_auth(content, db, token):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("user")
        if username is None:
            raise credentials_exception

        post = await create_post(db, content)


        return post
    except JWTError:
        raise credentials_exception

```## env.py
```python
from logging.config import fileConfig

# from sqlalchemy import engine_from_config # No need
from sqlalchemy import pool # No need

from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata


# HoHai: I want to import models from sources. Then becareful with
# with the PYTHONPATH which setup at runtime to allow python can
# discover python package
from models import models
target_metadata = models.expose_base_meta()


from db_core.db import engine

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    # async with engine.begin() as conn:


    # with context.begin_transaction():
    #     context.run_migrations()


async def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """

    # Option 1: Create engine from config
    # connectable = engine_from_config(
    #     config.get_section(config.config_ini_section, {}),
    #     prefix="sqlalchemy.",
    #     poolclass=pool.NullPool,
    # context.configure(
    #     connection=connection, target_metadata=target_metadata
    # )

    # with context.begin_transaction():
    #     context.run_migrations()
    # )

    # Option 2: reuse from db which we want to share

    print(target_metadata.tables)
    """Run migrations in 'online' (async) mode."""
    async with engine.connect() as conn:
        await conn.run_sync(do_run_migrations)


def do_run_migrations(connection):

    """Helper function to actually run migrations inside sync context."""
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,              # detect column type changes
        compare_server_default=True,    # detect default value changes
    )

    with context.begin_transaction():
        context.run_migrations()






# if context.is_offline_mode():
#     run_migrations_offline()
# else:
#     run_migrations_online()


import asyncio
asyncio.run(run_migrations_online())
```## 0002b4bcec43_init_database.py
```python
"""init database

Revision ID: 0002b4bcec43
Revises:
Create Date: 2025-11-10 19:34:30.997117

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '0002b4bcec43'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Upgrade schema."""
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table('posts',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('content', sa.String(length=100), nullable=True),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_posts_id'), 'posts', ['id'], unique=False)
    op.create_table('users',
    sa.Column('id', sa.Integer(), nullable=False),
    sa.Column('name', sa.String(length=50), nullable=True),
    sa.Column('email', sa.String(length=120), nullable=True),
    sa.Column('address', sa.String(length=250), nullable=True),
    sa.Column('password', sa.String(length=100), nullable=True),
    sa.PrimaryKeyConstraint('id')
    )
    op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
    op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False)
    # ### end Alembic commands ###


def downgrade() -> None:
    """Downgrade schema."""
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_index(op.f('ix_users_id'), table_name='users')
    op.drop_index(op.f('ix_users_email'), table_name='users')
    op.drop_table('users')
    op.drop_index(op.f('ix_posts_id'), table_name='posts')
    op.drop_table('posts')
    # ### end Alembic commands ###

```## models.py
```python
from sqlalchemy import Column, Integer, String
from db_core.db import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50))
    email = Column(String(120), unique=True, index=True)
    address = Column(String(250)) # We want to add new columns
    password = Column(String(100))


# HoHai test
class Posts(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    content = Column(String(100))


def expose_base_meta():
    return Base.metadata
```## schemas.py
```python
from pydantic import BaseModel, EmailStr

class UserBase(BaseModel):
    name: str
    password: str
    email: EmailStr

class UserCreate(UserBase):
    pass

class UserRead(UserBase):
    id: int

    class Config:
        orm_mode = True  # allows conversion from SQLAlchemy objects

```## crud.py
```python
from sqlalchemy.orm import Session
from models import models

from models import schemas

def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()

# def get_users(db: Session, skip: int = 0, limit: int = 10):
#     return db.query(models.User).offset(skip).limit(limit).all()

from sqlalchemy.future import select




# Update in auth: we can expose to utitilty
from auth.util import get_password_hash



async def get_users(db, skip: int = 0, limit: int = 100):
    result = await db.execute(select(models.User).offset(skip).limit(limit))
    return result.scalars().all()

# Actually db is not Session in the case AsyncSession (AsyncSession will wrap a Session instance)
async def create_user(db: Session, user: schemas.UserCreate):
    db_user = models.User(name=user.name, email=user.email, password=get_password_hash(user.password))
    db.add(db_user)
    await db.commit() # Should await here
    await db.refresh(db_user)
    return db_user


# Update a use
# Bad behavior: no denote types in parameter
async def update_user(db: Session, username: str, user):

    # user_in_db = db.query(models.User).filter(models.User.id == user_id).first()

    # query_results = await db.execute(select(models.User).where(models.User.id == user_id))

    # user_in_db = query_results.scalar_one_or_none()
    user_in_db = await get_user_by_username(db, username)

    if user_in_db:
        # Update
        user_in_db.email = user.email
        user_in_db.name = user.name


        user_in_db.password = get_password_hash(user.password)


    await db.commit()
    return user_in_db

# add new method: getting user by user_name. Assume that username is unique in our app
async def get_user_by_username(db: Session, username):
    result = await db.execute(select(models.User).where(models.User.name == username))
    return result.scalar_one_or_none()


# Create/update post
async def create_post(db: Session, content: str):
    post = models.Posts(content=content)
    db.add(post)
    await db.commit()
    await db.refresh(post)
    return post

# async def get_posts(db:Session):
#     result = await db.execute(select(models.Posts))


```## db.py
```python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker


# Need to keep in secret place, don't hardcode like this
# Format: postgresql+psycopg2://user:password@host:port/database
DATABASE_URL = "postgresql+asyncpg://root:postgres@localhost:5432/example_pgdb"


engine = create_async_engine(DATABASE_URL, echo=True)





AsyncSessionLocal = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)


async def get_db_connection():
    db = AsyncSessionLocal() # Create an instance for every calling time
    try:
        yield db
    finally:
        await db.close()



from sqlalchemy.ext.declarative import declarative_base


Base = declarative_base()



async def init_models():
    async with engine.begin() as conn:
        # Run the synchronous `create_all` within the async context
        await conn.run_sync(Base.metadata.create_all)

# import asyncio
# # Run the async function
# if __name__ == "__main__":
#     asyncio.run(init_models())


```## __init__.py
```python

```## wrap_async_inside_normal_function.py
```python
import asyncio
import inspect

import nest_asyncio
nest_asyncio.apply()  # only needed in Jupyter

async def fn_check_user_pass_async(u, p):
    await asyncio.sleep(1.0)
    return {"user": u, "password": p}

# asyncio.get_event_loop().run_until_complete(coro)


def wrap_fn():
    if inspect.iscoroutinefunction(fn_check_user_pass_async):
        loop = asyncio.get_event_loop()
        task = loop.create_task(fn_check_user_pass_async("HaiHT", "123"))
        loop.run_until_complete(task)

        print(task.result())

    else:
        print(fn_check_user_pass_async("HaiHT", "123"))

wrap_fn()

```## something.py
```python
import asyncio
import threading

def worker():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    print("Worker thread loop:", id(asyncio.get_running_loop()))


threading.Thread(target=worker).start()

main_loop = asyncio.new_event_loop()
asyncio.set_event_loop(main_loop)
print("Main thread loop:", id(asyncio.get_running_loop()))