from logging.config import fileConfig
# from sqlalchemy import engine_from_config # No need
from sqlalchemy import pool # No need
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# HoHai: I want to import models from sources. Then becareful with
# with the PYTHONPATH which setup at runtime to allow python can
# discover python package
from source import models
target_metadata = models.expose_base_meta()
from source.db import engine
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
# async with engine.begin() as conn:
# with context.begin_transaction():
# context.run_migrations()
async def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# Option 1: Create engine from config
# connectable = engine_from_config(
# config.get_section(config.config_ini_section, {}),
# prefix="sqlalchemy.",
# poolclass=pool.NullPool,
# context.configure(
# connection=connection, target_metadata=target_metadata
# )
# with context.begin_transaction():
# context.run_migrations()
# )
# Option 2: reuse from db which we want to share
print(target_metadata.tables)
"""Run migrations in 'online' (async) mode."""
async with engine.connect() as conn:
await conn.run_sync(do_run_migrations)
def do_run_migrations(connection):
"""Helper function to actually run migrations inside sync context."""
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True, # detect column type changes
compare_server_default=True, # detect default value changes
)
with context.begin_transaction():
context.run_migrations()
# if context.is_offline_mode():
# run_migrations_offline()
# else:
# run_migrations_online()
import asyncio
asyncio.run(run_migrations_online())
```## d94ba253f9be_add_new_column_address.py
```python
"""add new column Address
Revision ID: d94ba253f9be
Revises:
Create Date: 2025-11-06 14:30:23.110333
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'd94ba253f9be'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('users', sa.Column('address', sa.String(length=250), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('users', 'address')
# ### end Alembic commands ###
```## db.py
```python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# Need to keep in secret place, don't hardcode like this
# Format: postgresql+psycopg2://user:password@host:port/database
DATABASE_URL = "postgresql+asyncpg://root:postgres@localhost:5432/example_pgdb"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
async def get_db_connection():
db = AsyncSessionLocal() # Create an instance for every calling time
try:
yield db
finally:
await db.close()
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
async def init_models():
async with engine.begin() as conn:
# Run the synchronous `create_all` within the async context
await conn.run_sync(Base.metadata.create_all)
# import asyncio
# # Run the async function
# if __name__ == "__main__":
# asyncio.run(init_models())
```## models.py
```python
from sqlalchemy import Column, Integer, String
from .db import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50))
email = Column(String(120), unique=True, index=True)
address = Column(String(250)) # We want to add new columns
# HoHai test
class Posts(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
content = Column(String(100))
def expose_base_meta():
return Base.metadata
```## __init__.py
```python
```## schemas.py
```python
from pydantic import BaseModel, EmailStr
class UserBase(BaseModel):
name: str
email: EmailStr
class UserCreate(UserBase):
pass
class UserRead(UserBase):
id: int
class Config:
orm_mode = True # allows conversion from SQLAlchemy objects
```## main.py
```python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
import asyncio
import schemas
# from .db import engine, get_db
from db import get_db_connection, init_models
import uvicorn
import crud
# Create tables
# Remove (refer to init_models)
# Base.metadata.create_all(bind=engine)
app = FastAPI()
@app.post("/users/", response_model=schemas.UserRead)
async def create_user(user: schemas.UserCreate, db: Session = Depends(get_db_connection)):
db_user = await crud.create_user(db=db, user=user)
return db_user
@app.get("/users/", response_model=list[schemas.UserRead])
async def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db_connection)):
users = await crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.UserRead)
async def read_user(user_id: int, db: Session = Depends(get_db_connection)):
db_user = await crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
# Bad behavior
@app.post("/users/{user_id}")
async def update_user(user_id: int, user: schemas.UserCreate, db: Session = Depends(get_db_connection)):
db_user = await crud.update_user(db, user_id=user_id, user=user)
return db_user
if __name__ == "__main__":
asyncio.run(init_models()) # Should be run here to make sure that all tables (which mapped with models) will be scan in `metadata`
# uvicorn.run(app, host="127.0.0.1", port=8000, reload=True)
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)
```## crud.py
```python
from sqlalchemy.orm import Session
import models
import schemas
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
# def get_users(db: Session, skip: int = 0, limit: int = 10):
# return db.query(models.User).offset(skip).limit(limit).all()
from sqlalchemy.future import select
async def get_users(db, skip: int = 0, limit: int = 100):
result = await db.execute(select(models.User).offset(skip).limit(limit))
return result.scalars().all()
# Actually db is not Session in the case AsyncSession (AsyncSession will wrap a Session instance)
async def create_user(db: Session, user: schemas.UserCreate):
db_user = models.User(name=user.name, email=user.email)
db.add(db_user)
await db.commit() # Should await here
await db.refresh(db_user)
return db_user
# Update a use
# Bad behavior: no denote types in parameter
async def update_user(db: Session, user_id, user):
# user_in_db = db.query(models.User).filter(models.User.id == user_id).first()
query_results = await db.execute(select(models.User).where(models.User.id == user_id))
user_in_db = query_results.scalar_one_or_none()
if user_in_db:
# Update
user_in_db.email = user.email
user_in_db.name = user.name
await db.commit()
return user_in_db
```## simple-chat-server.py
```python
import socket
import select
import errno
HOST = '127.0.0.1'
PORT = 7777
# --- Step 1: Create server socket ---
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((HOST, PORT))
server_sock.listen()
server_sock.setblocking(False) # important for event-driven I/O
print(f"Chat server running on {HOST}:{PORT}")
# --- Step 2: Create kqueue ---
kq = select.kqueue()
# Register server socket for READ events (new connections)
kev = select.kevent(server_sock.fileno(),
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD)
kq.control([kev], 0, 0)
# Map file descriptor โ socket object
fd_to_socket = {server_sock.fileno(): server_sock}
# --- Step 3: Event loop ---
while True:
# Wait for events, max 10 at a time, timeout 1 second
events = kq.control(None, 10, 1.0)
for kev in events:
fd = kev.ident
sock = fd_to_socket[fd]
if sock is server_sock:
# --- New client connection ---
client_sock, addr = server_sock.accept()
client_sock.setblocking(False)
fd_to_socket[client_sock.fileno()] = client_sock
# Register client for READ events
kev_client = select.kevent(client_sock.fileno(),
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD)
kq.control([kev_client], 0, 0)
print(f"Client connected from {addr}")
else:
# --- Client sent data ---
try:
data = sock.recv(4096)
if data:
# Broadcast to all other clients
msg = data.decode().strip()
print(f"Received from {fd}: {msg}")
for other_fd, other_sock in fd_to_socket.items():
if other_sock is not server_sock and other_sock is not sock:
try:
other_sock.sendall(f"(From: {fd}): {msg}\n".encode('utf-8'))
except BlockingIOError:
pass
else:
# Client closed connection
print(f"Client {fd} disconnected")
kq.control([select.kevent(fd,
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_DELETE)], 0, 0)
sock.close()
del fd_to_socket[fd]
except (BlockingIOError, InterruptedError):
pass
```## simple-asgi.py
```python
import uvicorn
class SimpleASGIApp:
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
await send({
'type': 'http.response.start',
'status': 200,
'headers': [(b'content-type', b'text/plain')],
})
await send({
'type': 'http.response.body',
'body': b'Hello, ASGI world!',
})
else:
print(f"Not support scope: {scope}")
app = SimpleASGIApp()
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)