fastapiサーバを2段に非同期に連携させるテストプログラム

■startkick.py

import httpx
import sys
import asyncio

async def call_start_endpoint(value: int):
    url = "http://localhost:8000/start"  # 1つ目のFastAPIアプリケーションのエンドポイントURLを指定

    input_data = {"value": value}

    async with httpx.AsyncClient() as client:
        #response = await client.post(url, json=input_data)
        response = await client.post(url, json=input_data, timeout=10.0) #10秒間応答を待つ

    if response.status_code == 200:
        data = response.json()
        return data
    else:
        print(f"エラーが発生しました。ステータスコード: {response.status_code}")
        return None

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("使用法: python startkick.py <数値>")
        sys.exit(1)

    try:
        value_to_send = int(sys.argv[1])  # コマンドライン引数から数値を取得
    except ValueError:
        print("数値を指定してください。")
        sys.exit(1)

    result = asyncio.run(call_start_endpoint(value_to_send))
    if result:
        print(f"1つ目のFastAPIアプリケーションの結果: {result}")

■app1.py

from fastapi import FastAPI
import httpx
from pydantic import BaseModel

app = FastAPI()

class InputData(BaseModel): # json形式の場合これが必要。これが無いと422エラーで弾かれる
    value: int

@app.post("/start")
#async def start(value: int):  # json形式を無視した場合
async def start(input_data: InputData):
    # 2つ目のFastAPIアプリケーションのprocessエンドポイントを呼び出す
    url = "http://localhost:8001/process"  # 2つ目のアプリケーションのエンドポイントURLを指定

    value2 = input_data.value
    input_data2 = {"value": value2}

    async with httpx.AsyncClient() as client:
        response = await client.post(url, json=input_data2, timeout=10.0) #10秒間応答を待つ

    if response.status_code == 200:
        data = response.json()
        return data
    else:
        return {"error": f"エラーが発生しました。ステータスコード: {response.status_code}"}

■app2.py

from fastapi import FastAPI
from pydantic import BaseModel
import asyncio

app = FastAPI()

class InputData(BaseModel):
    value: int

class OutputData(BaseModel):
    result: int

@app.post("/process")
async def process(input_data: InputData):
    value = input_data.value

    # 5秒間の待機
    await asyncio.sleep(5)


    # ここで2つ目のFastAPIアプリケーション独自の処理を行う(例: valueを2倍にする)
    result = value * 2
    return {"result": result}

2024,江端さんの忘備録

Posted by ebata