test#
import asyncio
# asyncio.set_event_loop_policy(asyncio.unix_events.DefaultEventLoopPolicy())
test = asyncio.get_event_loop_policy()
test._hai_thread
# dir(test)
test._hai_stack
import uvicorn
from fastapi import FastAPI
import threading
app = FastAPI()
@app.get("/")
def read_root():
return {"hello": "world"}
def run_app():
uvicorn.run(app, host="0.0.0.0", port=8888)
thread = threading.Thread(target=run_app, daemon=True, name="Thread run FASTAPI")
thread.start()
INFO: Started server process [35520]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8888 (Press CTRL+C to quit)
INFO: 127.0.0.1:55111 - "GET / HTTP/1.1" 200 OK
INFO: 127.0.0.1:55143 - "GET / HTTP/1.1" 200 OK
import sys
sys.modules
import asyncio
test = asyncio.events._event_loop_policy
test._local
<asyncio.events.BaseDefaultEventLoopPolicy._Local at 0x10320fef0>
id(test._local._loop)
# HoHai: Debug
# ensure the class has the attribute, then append
lst = getattr(self.__class__, "_hai_loop", None)
if lst is None:
setattr(self.__class__, "_hai_loop", [self])
else:
lst.append(self)