fastapiサーバを2段に非同期に連携させるテストプログラム
■startkick.py
import httpx
import sys
import asyncio
async def call_start_endpoint(value: int):
url = "http://localhost:8000/start" # 1つ目のFastAPIアプリケーションのエンドポイントURLを指定
input_data = {"value": value}
async with httpx.AsyncClient() as client:
#response = await client.post(url, json=input_data)
response = await client.post(url, json=input_data, timeout=10.0) #10秒間応答を待つ
if response.status_code == 200:
data = response.json()
return data
else:
print(f"エラーが発生しました。ステータスコード: {response.status_code}")
return None
if __name__ == "__main__":
if len(sys.argv) != 2:
print("使用法: python startkick.py <数値>")
sys.exit(1)
try:
value_to_send = int(sys.argv[1]) # コマンドライン引数から数値を取得
except ValueError:
print("数値を指定してください。")
sys.exit(1)
result = asyncio.run(call_start_endpoint(value_to_send))
if result:
print(f"1つ目のFastAPIアプリケーションの結果: {result}")
■app1.py
from fastapi import FastAPI
import httpx
from pydantic import BaseModel
app = FastAPI()
class InputData(BaseModel): # json形式の場合これが必要。これが無いと422エラーで弾かれる
value: int
@app.post("/start")
#async def start(value: int): # json形式を無視した場合
async def start(input_data: InputData):
# 2つ目のFastAPIアプリケーションのprocessエンドポイントを呼び出す
url = "http://localhost:8001/process" # 2つ目のアプリケーションのエンドポイントURLを指定
value2 = input_data.value
input_data2 = {"value": value2}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=input_data2, timeout=10.0) #10秒間応答を待つ
if response.status_code == 200:
data = response.json()
return data
else:
return {"error": f"エラーが発生しました。ステータスコード: {response.status_code}"}
■app2.py
from fastapi import FastAPI
from pydantic import BaseModel
import asyncio
app = FastAPI()
class InputData(BaseModel):
value: int
class OutputData(BaseModel):
result: int
@app.post("/process")
async def process(input_data: InputData):
value = input_data.value
# 5秒間の待機
await asyncio.sleep(5)
# ここで2つ目のFastAPIアプリケーション独自の処理を行う(例: valueを2倍にする)
result = value * 2
return {"result": result}