env.py#

from logging.config import fileConfig

# from sqlalchemy import engine_from_config # No need
from sqlalchemy import pool # No need

from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata


# HoHai: I want to import models from sources. Then becareful with
# with the PYTHONPATH which setup at runtime to allow python can
# discover python package
from source import models
target_metadata = models.expose_base_meta()


from source.db import engine

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    # async with engine.begin() as conn:


    # with context.begin_transaction():
    #     context.run_migrations()


async def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """

    # Option 1: Create engine from config
    # connectable = engine_from_config(
    #     config.get_section(config.config_ini_section, {}),
    #     prefix="sqlalchemy.",
    #     poolclass=pool.NullPool,
    # context.configure(
    #     connection=connection, target_metadata=target_metadata
    # )

    # with context.begin_transaction():
    #     context.run_migrations()
    # )

    # Option 2: reuse from db which we want to share

    print(target_metadata.tables)
    """Run migrations in 'online' (async) mode."""
    async with engine.connect() as conn:
        await conn.run_sync(do_run_migrations)


def do_run_migrations(connection):

    """Helper function to actually run migrations inside sync context."""
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,              # detect column type changes
        compare_server_default=True,    # detect default value changes
    )

    with context.begin_transaction():
        context.run_migrations()






# if context.is_offline_mode():
#     run_migrations_offline()
# else:
#     run_migrations_online()


import asyncio
asyncio.run(run_migrations_online())
```## d94ba253f9be_add_new_column_address.py
```python
"""add new column Address

Revision ID: d94ba253f9be
Revises:
Create Date: 2025-11-06 14:30:23.110333

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'd94ba253f9be'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Upgrade schema."""
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('users', sa.Column('address', sa.String(length=250), nullable=True))
    # ### end Alembic commands ###


def downgrade() -> None:
    """Downgrade schema."""
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_column('users', 'address')
    # ### end Alembic commands ###

```## db.py
```python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker


# Need to keep in secret place, don't hardcode like this
# Format: postgresql+psycopg2://user:password@host:port/database
DATABASE_URL = "postgresql+asyncpg://root:postgres@localhost:5432/example_pgdb"


engine = create_async_engine(DATABASE_URL, echo=True)





AsyncSessionLocal = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)


async def get_db_connection():
    db = AsyncSessionLocal() # Create an instance for every calling time
    try:
        yield db
    finally:
        await db.close()



from sqlalchemy.ext.declarative import declarative_base


Base = declarative_base()



async def init_models():
    async with engine.begin() as conn:
        # Run the synchronous `create_all` within the async context
        await conn.run_sync(Base.metadata.create_all)

# import asyncio
# # Run the async function
# if __name__ == "__main__":
#     asyncio.run(init_models())


```## models.py
```python
from sqlalchemy import Column, Integer, String
from .db import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50))
    email = Column(String(120), unique=True, index=True)
    address = Column(String(250)) # We want to add new columns


# HoHai test
class Posts(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    content = Column(String(100))


def expose_base_meta():
    return Base.metadata
```## __init__.py
```python

```## schemas.py
```python
from pydantic import BaseModel, EmailStr

class UserBase(BaseModel):
    name: str
    email: EmailStr

class UserCreate(UserBase):
    pass

class UserRead(UserBase):
    id: int

    class Config:
        orm_mode = True  # allows conversion from SQLAlchemy objects

```## main.py
```python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session

import asyncio


import schemas
# from .db import engine, get_db

from db import get_db_connection, init_models

import uvicorn

import crud

# Create tables
# Remove (refer to init_models)
# Base.metadata.create_all(bind=engine)

app = FastAPI()

@app.post("/users/", response_model=schemas.UserRead)
async def create_user(user: schemas.UserCreate, db: Session = Depends(get_db_connection)):
    db_user = await crud.create_user(db=db, user=user)


    return db_user

@app.get("/users/", response_model=list[schemas.UserRead])
async def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db_connection)):
    users = await crud.get_users(db, skip=skip, limit=limit)

    return users

@app.get("/users/{user_id}", response_model=schemas.UserRead)
async def read_user(user_id: int, db: Session = Depends(get_db_connection)):
    db_user = await crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

# Bad behavior
@app.post("/users/{user_id}")
async def update_user(user_id: int, user: schemas.UserCreate, db: Session = Depends(get_db_connection)):
    db_user = await crud.update_user(db, user_id=user_id, user=user)
    return db_user




if __name__ == "__main__":
    asyncio.run(init_models()) # Should be run here to make sure that all tables (which mapped with models) will be scan in `metadata`
    # uvicorn.run(app, host="127.0.0.1", port=8000, reload=True)
    uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)
```## crud.py
```python
from sqlalchemy.orm import Session
import models

import schemas

def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()

# def get_users(db: Session, skip: int = 0, limit: int = 10):
#     return db.query(models.User).offset(skip).limit(limit).all()

from sqlalchemy.future import select

async def get_users(db, skip: int = 0, limit: int = 100):
    result = await db.execute(select(models.User).offset(skip).limit(limit))
    return result.scalars().all()

# Actually db is not Session in the case AsyncSession (AsyncSession will wrap a Session instance)
async def create_user(db: Session, user: schemas.UserCreate):
    db_user = models.User(name=user.name, email=user.email)
    db.add(db_user)
    await db.commit() # Should await here
    await db.refresh(db_user)
    return db_user


# Update a use
# Bad behavior: no denote types in parameter
async def update_user(db: Session, user_id, user):

    # user_in_db = db.query(models.User).filter(models.User.id == user_id).first()

    query_results = await db.execute(select(models.User).where(models.User.id == user_id))

    user_in_db = query_results.scalar_one_or_none()

    if user_in_db:
        # Update
        user_in_db.email = user.email
        user_in_db.name = user.name

    await db.commit()
    return user_in_db




```## simple-chat-server.py
```python
import socket
import select
import errno

HOST = '127.0.0.1'
PORT = 7777

# --- Step 1: Create server socket ---
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((HOST, PORT))
server_sock.listen()
server_sock.setblocking(False)  # important for event-driven I/O

print(f"Chat server running on {HOST}:{PORT}")

# --- Step 2: Create kqueue ---
kq = select.kqueue()

# Register server socket for READ events (new connections)
kev = select.kevent(server_sock.fileno(),
                    filter=select.KQ_FILTER_READ,
                    flags=select.KQ_EV_ADD)
kq.control([kev], 0, 0)

# Map file descriptor โ†’ socket object
fd_to_socket = {server_sock.fileno(): server_sock}

# --- Step 3: Event loop ---
while True:
    # Wait for events, max 10 at a time, timeout 1 second
    events = kq.control(None, 10, 1.0)

    for kev in events:
        fd = kev.ident
        sock = fd_to_socket[fd]

        if sock is server_sock:
            # --- New client connection ---
            client_sock, addr = server_sock.accept()
            client_sock.setblocking(False)
            fd_to_socket[client_sock.fileno()] = client_sock

            # Register client for READ events
            kev_client = select.kevent(client_sock.fileno(),
                                       filter=select.KQ_FILTER_READ,
                                       flags=select.KQ_EV_ADD)
            kq.control([kev_client], 0, 0)

            print(f"Client connected from {addr}")

        else:
            # --- Client sent data ---
            try:
                data = sock.recv(4096)
                if data:
                    # Broadcast to all other clients
                    msg = data.decode().strip()
                    print(f"Received from {fd}: {msg}")
                    for other_fd, other_sock in fd_to_socket.items():
                        if other_sock is not server_sock and other_sock is not sock:
                            try:
                                other_sock.sendall(f"(From: {fd}): {msg}\n".encode('utf-8'))
                            except BlockingIOError:
                                pass
                else:
                    # Client closed connection
                    print(f"Client {fd} disconnected")
                    kq.control([select.kevent(fd,
                                               filter=select.KQ_FILTER_READ,
                                               flags=select.KQ_EV_DELETE)], 0, 0)
                    sock.close()
                    del fd_to_socket[fd]

            except (BlockingIOError, InterruptedError):
                pass

```## simple-asgi.py
```python
import uvicorn


class SimpleASGIApp:
    async def __call__(self, scope, receive, send):
        if scope['type'] == 'http':
            await send({
                'type': 'http.response.start',
                'status': 200,
                'headers': [(b'content-type', b'text/plain')],
            })
            await send({
                'type': 'http.response.body',
                'body': b'Hello, ASGI world!',
            })
        else:
            print(f"Not support scope: {scope}")

app = SimpleASGIApp()

if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)