指定されたコマンド "ping -100 kobore.net" をFastAPIのエンドポイントから実行し、それが終了したらAPIを正常に終了させるコードは以下のようになります:
test.py
import subprocess
import threading
import os
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class CommandRequest(BaseModel):
command: str
@app.post("/execute-command")
def execute_command(request: CommandRequest):
command = request.command
# コマンドを非同期で実行
execution_thread = threading.Thread(target=execute_command_async, args=(command,))
execution_thread.start()
return {"message": f"コマンド '{command}' の実行を開始しました"}
def execute_command_async(command):
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait() # コマンドの終了を待つ
print(f"コマンド '{command}' の実行が終了しました")
# os._exit(0) # FastAPIサーバーを終了させる
if __name__ == "__main__":
# FastAPIサーバーを開始
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
$ uvicorn test:app --host 0.0.0.0 --reload
C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"command\": \"ping -n 20 kobore.net\"}" http://localhost:8000/execute-command