You
■fastapi側プログラム dummy2.py
from fastapi import FastAPI
from pydantic import BaseModel
import asyncio # asyncio モジュールをインポート
app = FastAPI()
class InputMessage(BaseModel):
value: int
class OutputMessage(BaseModel):
response: int # 数値を含むレスポンスメッセージ
@app.post("/process")
async def process(input_message: InputMessage):
# 5秒間の待機
await asyncio.sleep(5)
# サーバー側で数値を受け取り、同じ数値を含むレスポンスメッセージを返す
response_value = input_message.value
output_message = OutputMessage(response=response_value)
return output_message
起動方法は、
uvicorn dummy2:app --host 0.0.0.0 --reload
■メッセージ送信側(クライアント)プログラム dummy3.py
import httpx
import sys
import asyncio # asyncio モジュールをインポート
async def send_message_to_api(value: int):
url = "http://localhost:8000/process" # FastAPIサーバーのエンドポイントURLを指定
input_message = {"value": value} # 送信する数値をJSONメッセージに含める
async with httpx.AsyncClient() as client:
response = await client.post(url, json=input_message, timeout=10.0)
if response.status_code == 200:
data = response.json()
print(f"APIからのレスポンス: {data['response']}")
else:
print(f"エラーが発生しました。ステータスコード: {response.status_code}")
if __name__ == "__main__":
if len(sys.argv) != 2:
print("使用法: python send.py <数値>")
sys.exit(1)
try:
value_to_send = int(sys.argv[1]) # コマンドライン引数から数値を取得
except ValueError:
print("数値を指定してください。")
sys.exit(1)
asyncio.run(send_message_to_api(value_to_send))
起動方法は、
>python dummy3.py 10
平行に3つ起動しても、正確に非同期処理してくれるようです。
昨夜から、デッドロック問題で、困っていたので、原点に戻って考え中です。