コマンドを起動して、必要に応じて停止させ、プロセスID(PID)が消えるまでを監視するfastapiのプログラム
※fastapiの場合、スレッドでは上手くコンロールできない(らしい)ので、プロセス単位で管理します
(1)execute-commandというAPIから、pingを行う回数Xをjson形式入力して、"ping -n X kobore.net"を起動します。返り値はプロセス番号(pid)がjson形式で戻ります。
curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"5\"}" http://localhost:8000/execute-command
(2)terminate-processというAPIから、上記のpidをjson形式で入力して、プロセスを強制終了します。返り値は、この成否(1/-1)がjson形式で戻ります。
curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"10164\"}" http://localhost:8000/terminate-process
(3)以下のファイルをtest.pyとして、uvicorn test:app --host 0.0.0.0 --reload を投入してfastapiサーバを起動します。
# curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"5\"}" http://localhost:8000/execute-command
# curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"10164\"}" http://localhost:8000/terminate-process
import subprocess
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class ExecuteRequest(BaseModel):
count: int # pingコマンドの実行回数を指定
class TerminateRequest(BaseModel):
pid: int # 終了させるプロセスのPID
# 実行中のプロセスを格納する辞書
running_processes = {}
@app.post("/execute-command")
def execute_command(request: ExecuteRequest):
count = request.count
try:
command = f"ping -n {count} kobore.net" # pingコマンドの回数をcountに指定
# コマンドを非同期で実行し、プロセスを取得
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pid = process.pid # プロセスのPIDを取得
running_processes[pid] = process
return {"pid": pid} # PIDのみを返す
except Exception as e:
return {"message": f"コマンドの実行中にエラーが発生しました: {str(e)}"}
@app.post("/terminate-process")
def terminate_process(request: TerminateRequest):
pid = request.pid
print("pid in terminate-process", pid)
try:
# プロセスを取得し、終了させる
process = running_processes.get(pid)
process.terminate() # プロセスを終了させる(SIGTERMを送信)
process.wait()
del running_processes[pid] # プロセスを辞書から削除
# 成功の場合は1を返す
return {"status": 1}
except Exception as e:
return {"status": -1} # 失敗の場合は-1を返す
if __name__ == "__main__":
# FastAPIサーバーを開始
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
出力結果
C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"100\"}" http://localhost:8000/execute-command
{"pid":1784}
C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"1784\"}" http://localhost:8000/terminate-process
{"status":1}
C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"1784\"}" http://localhost:8000/terminate-process
{"status":-1}
起動したプロセスを監視して、プロセスが予定通り/突然停止した場合、それを通知する仕組みを追加しました。
# curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"5\"}" http://localhost:8000/execute-command
# curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"10164\"}" http://localhost:8000/terminate-process
# C:\Users\ebata\fastapi7>uvicorn test:app --host 0.0.0.0 --reload
import subprocess
import os
import time
import multiprocessing # multiprocessingモジュールを追加
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class ExecuteRequest(BaseModel):
count: int # pingコマンドの実行回数を指定
class TerminateRequest(BaseModel):
pid: int # 終了させるプロセスのPID
# 実行中のプロセスを格納する辞書
running_processes = {}
process_monitor_processes = {}
@app.post("/execute-command")
def execute_command(request: ExecuteRequest):
count = request.count
try:
command = f"ping -n {count} kobore.net" # pingコマンドの回数をcountに指定
# コマンドを非同期で実行し、プロセスを取得
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pid = process.pid # プロセスのPIDを取得
running_processes[pid] = process
# プロセスを監視するプロセスを起動
monitor_process = multiprocessing.Process(target=monitor_process_status, args=(pid,))
monitor_process.start()
process_monitor_processes[pid] = monitor_process
return {"pid": pid} # PIDのみを返す
except Exception as e:
return {"message": f"コマンドの実行中にエラーが発生しました: {str(e)}"}
@app.post("/terminate-process")
def terminate_process(request: TerminateRequest):
pid = request.pid
print("pid in terminate-process", pid)
try:
# プロセスを取得し、終了させる
process = running_processes.get(pid)
process.terminate() # プロセスを終了させる(SIGTERMを送信)
process.wait()
del running_processes[pid] # プロセスを辞書から削除
# 成功の場合は1を返す
return {"status": 1}
except Exception as e:
return {"status": -1} # 失敗の場合は-1を返す
def monitor_process_status(pid):
while True:
if not is_process_running(pid):
# プロセスが存在しない場合
# メッセージを生成して出力(または送信)
message = {
"status": "Process Disappeared",
"pid": pid
}
print("Process Disappeared:", message)
# プロセス監視プロセスを停止
### del process_monitor_processes[pid]
break
# 一定の待機時間を設定して監視を継続
time.sleep(10) # 10秒ごとに監視
def is_process_running(pid):
#try:
# os.kill(pid, 0) # PIDを使ってプロセスにシグナルを送信し、存在を確認
# return True
#except OSError:
# return False
try:
# ここの部分Windows特有のやりかたなので、後で、例の、os.kill(pid,0)を試してみること
# tasklistコマンドを実行してプロセス一覧を取得
result = subprocess.check_output(["tasklist", "/fi", f"PID eq {pid}"], universal_newlines=True)
# 結果から指定したPIDの行を検索
lines = result.splitlines()
for line in lines:
if f"{pid}" in line:
return True
# 指定したPIDが見つからない場合
# ここに、停止時のメッセージ送信を組み込めばO.K.のはず
return False
except Exception as e:
return False
if __name__ == "__main__":
# FastAPIサーバーを開始
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)