コマンドを起動して、必要に応じて停止させ、プロセスID(PID)が消えるまでを監視するfastapiのプログラム

2024年1月14日

※fastapiの場合、スレッドでは上手くコンロールできない(らしい)ので、プロセス単位で管理します
(1)execute-commandというAPIから、pingを行う回数Xをjson形式入力して、"ping -n X kobore.net"を起動します。返り値はプロセス番号(pid)がjson形式で戻ります。
curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"5\"}" http://localhost:8000/execute-command
(2)terminate-processというAPIから、上記のpidをjson形式で入力して、プロセスを強制終了します。返り値は、この成否(1/-1)がjson形式で戻ります。
curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"10164\"}" http://localhost:8000/terminate-process
(3)以下のファイルをtest.pyとして、uvicorn test:app --host 0.0.0.0 --reload を投入してfastapiサーバを起動します。
# curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"5\"}" http://localhost:8000/execute-command
# curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"10164\"}" http://localhost:8000/terminate-process

import subprocess
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class ExecuteRequest(BaseModel):
    count: int  # pingコマンドの実行回数を指定

class TerminateRequest(BaseModel):
    pid: int  # 終了させるプロセスのPID

# 実行中のプロセスを格納する辞書
running_processes = {}

@app.post("/execute-command")
def execute_command(request: ExecuteRequest):
    count = request.count

    try:
        command = f"ping -n {count} kobore.net"  # pingコマンドの回数をcountに指定
        # コマンドを非同期で実行し、プロセスを取得
        process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        pid = process.pid  # プロセスのPIDを取得
        running_processes[pid] = process

        return {"pid": pid}  # PIDのみを返す
    except Exception as e:
        return {"message": f"コマンドの実行中にエラーが発生しました: {str(e)}"}


@app.post("/terminate-process")
def terminate_process(request: TerminateRequest):
    pid = request.pid
    print("pid in terminate-process", pid)

    try:
        # プロセスを取得し、終了させる
        process = running_processes.get(pid)

        process.terminate()  # プロセスを終了させる(SIGTERMを送信)
        process.wait()
        del running_processes[pid]  # プロセスを辞書から削除

        # 成功の場合は1を返す
        return {"status": 1}
    except Exception as e:
        return {"status": -1}  # 失敗の場合は-1を返す

if __name__ == "__main__":
    # FastAPIサーバーを開始
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

出力結果
C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"100\"}" http://localhost:8000/execute-command
{"pid":1784}
C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"1784\"}" http://localhost:8000/terminate-process
{"status":1}
C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"1784\"}" http://localhost:8000/terminate-process
{"status":-1}


起動したプロセスを監視して、プロセスが予定通り/突然停止した場合、それを通知する仕組みを追加しました。

# curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"5\"}" http://localhost:8000/execute-command
# curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"10164\"}" http://localhost:8000/terminate-process
# C:\Users\ebata\fastapi7>uvicorn test:app --host 0.0.0.0 --reload

import subprocess
import os
import time
import multiprocessing  # multiprocessingモジュールを追加
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class ExecuteRequest(BaseModel):
    count: int  # pingコマンドの実行回数を指定

class TerminateRequest(BaseModel):
    pid: int  # 終了させるプロセスのPID

# 実行中のプロセスを格納する辞書
running_processes = {}
process_monitor_processes = {}

@app.post("/execute-command")
def execute_command(request: ExecuteRequest):
    count = request.count

    try:
        command = f"ping -n {count} kobore.net"  # pingコマンドの回数をcountに指定
        # コマンドを非同期で実行し、プロセスを取得
        process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        pid = process.pid  # プロセスのPIDを取得
        running_processes[pid] = process

        # プロセスを監視するプロセスを起動
        monitor_process = multiprocessing.Process(target=monitor_process_status, args=(pid,))
        monitor_process.start()
        process_monitor_processes[pid] = monitor_process

        return {"pid": pid}  # PIDのみを返す
    except Exception as e:
        return {"message": f"コマンドの実行中にエラーが発生しました: {str(e)}"}


@app.post("/terminate-process")
def terminate_process(request: TerminateRequest):
    pid = request.pid
    print("pid in terminate-process", pid)

    try:
        # プロセスを取得し、終了させる
        process = running_processes.get(pid)

        process.terminate()  # プロセスを終了させる(SIGTERMを送信)
        process.wait()
        del running_processes[pid]  # プロセスを辞書から削除

        # 成功の場合は1を返す
        return {"status": 1}
    except Exception as e:
        return {"status": -1}  # 失敗の場合は-1を返す

def monitor_process_status(pid):
    while True:
        if not is_process_running(pid):
            # プロセスが存在しない場合
            # メッセージを生成して出力(または送信)
            message = {
                "status": "Process Disappeared",
                "pid": pid
            }
            print("Process Disappeared:", message)

            # プロセス監視プロセスを停止
            ### del process_monitor_processes[pid]
            break

        # 一定の待機時間を設定して監視を継続
        time.sleep(10)  # 10秒ごとに監視

def is_process_running(pid):
    #try:
    #    os.kill(pid, 0)  # PIDを使ってプロセスにシグナルを送信し、存在を確認
    #    return True
    #except OSError:
    #    return False
    try:
        # ここの部分Windows特有のやりかたなので、後で、例の、os.kill(pid,0)を試してみること
        # tasklistコマンドを実行してプロセス一覧を取得
        result = subprocess.check_output(["tasklist", "/fi", f"PID eq {pid}"], universal_newlines=True)

        # 結果から指定したPIDの行を検索
        lines = result.splitlines()
        for line in lines:
            if f"{pid}" in line:
                return True

        # 指定したPIDが見つからない場合
        # ここに、停止時のメッセージ送信を組み込めばO.K.のはず   

        return False
    except Exception as e:
        return False





if __name__ == "__main__":
    # FastAPIサーバーを開始
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

2024年1月14日2024,江端さんの技術メモ

Posted by ebata