Event Loop#
import asyncio
loop = asyncio.get_event_loop()
def normal_fn():
print(f"Here is normal function")
async def async_fn():
print(f"Here is async function")
# loop.run_until_complete(normal_fn) # This will throw an exception
loop.run_until_complete(async_fn())
print(loop)
Best Practice#
asyncio.run(my_async_fn())
Diagram#
┌───────────────────────────────┐
│ Python Program │
└───────────────────────────────┘
│
▼
┌───────────────────────┐
│ Main OS Thread │
└───────────────────────┘
│
┌─────────────┼───────────────────────────┐
│ │ │
▼ ▼ ▼
Loop A Loop B Loop C
(new_event_loop) (new_event_loop) (new_event_loop)
┌───────┐ ┌───────┐ ┌───────┐
│ Idle │ │ Idle │ │ Idle │
└───────┘ └───────┘ └───────┘
▲
│
│ Only ONE loop can be RUNNING
│ in this thread at a time:
│
▼
┌─────────────────────────────────────────┐
│ Running Event Loop (e.g., A) │
│ - Executes async tasks │
│ - Handles timers / sockets │
│ - Manages await points │
└─────────────────────────────────────────┘
Threads and Event loop#
┌────────────────────┐
│ Main Thread │
└────────────────────┘
│
▼
Event Loop A
(running or idle)
┌────────────────────┐
│ Worker Thread 1 │
└────────────────────┘
│
▼
Event Loop B
(running or idle)
Important asyncio rules#
Rule 1 — One running event loop per thread Each thread may have one running event loop.
Rule 2 — Event loop is NOT shared between threads If you try to use a loop created in Thread A while in Thread B → RuntimeError.
Rule 3 — Threads communicate via thread-safe asyncio functions
loop.call_soon_threadsafe(callback, arg)
Examples#
import threading
import asyncio
def thread_worker():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def run():
print("Hello from worker loop in thread:", threading.current_thread().name)
loop.run_until_complete(run())
loop.close()
# Main thread
main_loop = asyncio.new_event_loop()
asyncio.set_event_loop(main_loop)
t = threading.Thread(target=thread_worker)
t.start()
async def main():
print("Hello from main loop in thread:", threading.current_thread().name)
main_loop.run_until_complete(main())
main_loop.close()
t.join()
Implement signal stop#
graph TD
Main[Main Thread] -->|sets stop_event| SE[threading.Event\nstop_event]
SH[signal_handler\nSIGINT / SIGTERM] -->|stop_event.set| SE
subgraph WorkerThread[Worker Thread]
EL[Event Loop] --> CS[call_soon check_stop]
CS -->|every 0.1s| CheckStop{stop_event\nset?}
CheckStop -->|No| CL[call_later 0.1s check_stop]
CL --> CheckStop
CheckStop -->|Yes| Stop[loop.stop]
EL --> PT[periodic task\nprint Tick]
end
SE -->|is_set| CheckStop
style SE fill:#f96,stroke:#333
style WorkerThread fill:#eef,stroke:#669
import threading
import asyncio
import signal
import time
stop_event = threading.Event()
def thread_worker():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def periodic():
while not stop_event.is_set():
print("Tick from worker thread")
await asyncio.sleep(1)
def check_stop():
if stop_event.is_set():
loop.stop()
else:
loop.call_later(0.1, check_stop)
loop.call_soon(check_stop)
loop.create_task(periodic())
loop.run_forever()
loop.close()
print("Worker thread stopped.")
t = threading.Thread(target=thread_worker, name="worker")
t.start()
def signal_handler(sig, frame):
if sig == signal.SIGINT:
print("\nCtrl+C pressed, stopping...")
elif sig == signal.SIGTERM:
print("\nSIGTERM received, stopping...")
stop_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
while not stop_event.wait(0.1):
pass
except KeyboardInterrupt:
stop_event.set()
t.join()
print("Main thread exiting.")
call_soonandcall_laterare used to schedulecheck_stopwhich monitors thestop_signaleventcheck_stopregisters itself to keep checking the signal every 0.1 secondsAn alternative approach uses a task with a loop instead of
call_soon/call_later
async def check_stop_task():
while not stop_event.is_set():
await asyncio.sleep(0.1)
loop.stop()
loop.create_task(check_stop_task())
Event loop internals#
graph TD
Loop[Event Loop Iteration] --> Phase1[Phase 1: Run immediate callbacks\nloop._ready queue]
Phase1 --> Phase2[Phase 2: Wait for I/O\nselector.select timeout]
Phase2 -->|I/O events ready| Convert[Convert I/O events\nto immediate callbacks]
Convert --> Phase3[Phase 3: Move expired timers\nloop._scheduled into _ready]
Phase3 --> Loop
style Loop fill:#9cf,stroke:#333
style Phase1 fill:#9f9,stroke:#333
style Phase2 fill:#fc9,stroke:#333
style Phase3 fill:#f9c,stroke:#333
while True:
# Phase 1 — RUN immediate callbacks
while loop._ready:
handle = loop._ready.pop(0)
handle._run()
# Phase 2 — WAIT for I/O until next timer
timeout = compute_timeout_from_scheduled()
events = selector.select(timeout)
# Convert I/O events into immediate callbacks
for event in events:
loop._ready.append(event.handle)
# Phase 3 — MOVE expired timers into ready queue
now = loop.time()
while loop._scheduled and loop._scheduled[0].when <= now:
timer = heapq.heappop(loop._scheduled)
loop._ready.append(timer)
event-loop is thread-based#
import asyncio
import threading
def worker():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
print("Worker thread loop:", id(asyncio.get_event_loop()))
threading.Thread(target=worker).start()
main_loop = asyncio.new_event_loop()
asyncio.set_event_loop(main_loop)
print("Main thread loop:", id(asyncio.get_event_loop()))
running-loop is the event-loop that is running#
import asyncio
import threading
import nest_asyncio
nest_asyncio.apply()
async def report():
print(f"Hey man, I'm running inside {threading.current_thread()}")
print(f"Running loop: {id(asyncio.get_running_loop())}. Current loop: {id(asyncio.get_event_loop())}")
def loop_run_to_complete(loop):
loop.create_task(report())
loop.run_until_complete(asyncio.sleep(1.0))
def worker():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop_run_to_complete(loop)
threading.Thread(target=worker).start()
main_loop = asyncio.new_event_loop()
asyncio.set_event_loop(main_loop)
loop_run_to_complete(main_loop)
Hey man, I'm running inside <Thread(Thread-10 (worker), started 6305296384)>
Running loop: 4366786128. Current loop: 4366786128
Hey man, I'm running inside <_MainThread(MainThread, started 8697635008)>
Running loop: 4366783392. Current loop: 4366783392
Socket Server Sample#
Key concepts:
Create a server that exposes a
server_socketlistening on a specified portThe
server_socketis registered in aselectorobject for I/O event bookkeepingA main loop monitors all incoming events and processes them:
Accept connection: When a client first connects, register the client socket
Handle connection: Read and write data via the registered client socket
graph TD
Start[Create server_socket\nbind + listen port 5555] --> Register[sel.register server_sock\nEVENT_READ]
Register --> MainLoop[Main Loop\nsel.select]
MainLoop -->|event on server_sock| Accept[accept_connection\naccept new client]
Accept -->|register client_sock| Register
MainLoop -->|event on client_sock| Handle[handle_client\nrecv and sendall]
Handle -->|data received| Echo[send Message received]
Handle -->|no data / disconnect| Cleanup[sel.unregister\nsock.close]
Cleanup --> MainLoop
Echo --> MainLoop
style MainLoop fill:#9cf,stroke:#333
style Accept fill:#9f9,stroke:#333
style Handle fill:#fc9,stroke:#333
Testing:
Run the code in server:
python this-code.pyFrom client (same machine):
MacOS:
nc 127.0.0.1 5555Windows:
telnet 127.0.0.1 5555
import selectors
import socket
sel = selectors.DefaultSelector()
# --- Step 1: Create listening socket ---
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.bind(("0.0.0.0", 5555))
server_sock.listen()
server_sock.setblocking(False)
# Register the server socket for "read" events (incoming connections)
sel.register(server_sock, selectors.EVENT_READ)
def accept_connection(sock):
client_sock, addr = sock.accept()
print(f"Accepted connection from {addr}")
client_sock.setblocking(False)
sel.register(client_sock, selectors.EVENT_READ)
def handle_client(sock):
data = sock.recv(1024)
if data:
print("Received:", data.decode())
sock.sendall(b"Message received\n")
else:
print("Client disconnected")
sel.unregister(sock)
sock.close()
print("Server listening on port 5555...")
# --- Step 2: Main event loop ---
while True:
events = sel.select()
for key, mask in events:
sock = key.fileobj
if sock is server_sock:
accept_connection(sock)
else:
handle_client(sock)