2024,江端さんの技術メモ

プロセスを強制終了しなければならないAPIを作っているのですが、「pythonからPIDをkillできない件」で困っていました。
で、以下の実験用プログラムを作成しました。

import sys
import os
import signal
def kill_process(pid):
    try:
        os.kill(pid, signal.SIGKILL)  # 指定したプロセスIDをSIGKILLシグナルで終了
        print(f"Process with PID {pid} has been killed.")
    except OSError:
        print(f"Failed to kill process with PID {pid}.")
if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python kill_process.py <PID>")
        sys.exit(1)
    try:
        pid = int(sys.argv[1])
        kill_process(pid)
    except ValueError:
        print("Invalid PID. Please enter a valid integer PID.")

で、

python3 kill_process.py 7870

を連発したのですが、全くプロセスが消えません(ps -ef | grep xxxxxなどでレビュー)

どうやら、最初に"sudo"を付けて、

sudo python3 kill_process.py 7870

で、起動してくれることが分かりました。

で、本家の問題ですが、fastapiを使ったプログラムを試してみたのですが、

sudo uvicorn test:app --host 0.0.0.0 --reload

では、エラーになります。これは環境変数を引きついでいない、とのことで、"-E"を付与することで、動くことを確認しました。

sudo -E uvicorn test:app --host 0.0.0.0 --reload

持っていかれた時間は4時間くらいかなぁ。
それでも、とりあえず動いて、次の開発に進めるので、安堵しています。

(こういう案件を夜に残すと、夜の眠りが浅くなる)。

2024,江端さんの技術メモ

※fastapiの場合、スレッドでは上手くコンロールできない(らしい)ので、プロセス単位で管理します
(1)execute-commandというAPIから、pingを行う回数Xをjson形式入力して、"ping -n X kobore.net"を起動します。返り値はプロセス番号(pid)がjson形式で戻ります。
curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"5\"}" http://localhost:8000/execute-command
(2)terminate-processというAPIから、上記のpidをjson形式で入力して、プロセスを強制終了します。返り値は、この成否(1/-1)がjson形式で戻ります。
curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"10164\"}" http://localhost:8000/terminate-process
(3)以下のファイルをtest.pyとして、uvicorn test:app --host 0.0.0.0 --reload を投入してfastapiサーバを起動します。
# curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"5\"}" http://localhost:8000/execute-command
# curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"10164\"}" http://localhost:8000/terminate-process

import subprocess
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class ExecuteRequest(BaseModel):
    count: int  # pingコマンドの実行回数を指定

class TerminateRequest(BaseModel):
    pid: int  # 終了させるプロセスのPID

# 実行中のプロセスを格納する辞書
running_processes = {}

@app.post("/execute-command")
def execute_command(request: ExecuteRequest):
    count = request.count

    try:
        command = f"ping -n {count} kobore.net"  # pingコマンドの回数をcountに指定
        # コマンドを非同期で実行し、プロセスを取得
        process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        pid = process.pid  # プロセスのPIDを取得
        running_processes[pid] = process

        return {"pid": pid}  # PIDのみを返す
    except Exception as e:
        return {"message": f"コマンドの実行中にエラーが発生しました: {str(e)}"}


@app.post("/terminate-process")
def terminate_process(request: TerminateRequest):
    pid = request.pid
    print("pid in terminate-process", pid)

    try:
        # プロセスを取得し、終了させる
        process = running_processes.get(pid)

        process.terminate()  # プロセスを終了させる(SIGTERMを送信)
        process.wait()
        del running_processes[pid]  # プロセスを辞書から削除

        # 成功の場合は1を返す
        return {"status": 1}
    except Exception as e:
        return {"status": -1}  # 失敗の場合は-1を返す

if __name__ == "__main__":
    # FastAPIサーバーを開始
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

出力結果
C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"100\"}" http://localhost:8000/execute-command
{"pid":1784}
C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"1784\"}" http://localhost:8000/terminate-process
{"status":1}
C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"1784\"}" http://localhost:8000/terminate-process
{"status":-1}


起動したプロセスを監視して、プロセスが予定通り/突然停止した場合、それを通知する仕組みを追加しました。

# curl -X POST -H "Content-Type: application/json" -d "{\"count\": \"5\"}" http://localhost:8000/execute-command
# curl -X POST -H "Content-Type: application/json" -d "{\"pid\": \"10164\"}" http://localhost:8000/terminate-process
# C:\Users\ebata\fastapi7>uvicorn test:app --host 0.0.0.0 --reload

import subprocess
import os
import time
import multiprocessing  # multiprocessingモジュールを追加
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class ExecuteRequest(BaseModel):
    count: int  # pingコマンドの実行回数を指定

class TerminateRequest(BaseModel):
    pid: int  # 終了させるプロセスのPID

# 実行中のプロセスを格納する辞書
running_processes = {}
process_monitor_processes = {}

@app.post("/execute-command")
def execute_command(request: ExecuteRequest):
    count = request.count

    try:
        command = f"ping -n {count} kobore.net"  # pingコマンドの回数をcountに指定
        # コマンドを非同期で実行し、プロセスを取得
        process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        pid = process.pid  # プロセスのPIDを取得
        running_processes[pid] = process

        # プロセスを監視するプロセスを起動
        monitor_process = multiprocessing.Process(target=monitor_process_status, args=(pid,))
        monitor_process.start()
        process_monitor_processes[pid] = monitor_process

        return {"pid": pid}  # PIDのみを返す
    except Exception as e:
        return {"message": f"コマンドの実行中にエラーが発生しました: {str(e)}"}


@app.post("/terminate-process")
def terminate_process(request: TerminateRequest):
    pid = request.pid
    print("pid in terminate-process", pid)

    try:
        # プロセスを取得し、終了させる
        process = running_processes.get(pid)

        process.terminate()  # プロセスを終了させる(SIGTERMを送信)
        process.wait()
        del running_processes[pid]  # プロセスを辞書から削除

        # 成功の場合は1を返す
        return {"status": 1}
    except Exception as e:
        return {"status": -1}  # 失敗の場合は-1を返す

def monitor_process_status(pid):
    while True:
        if not is_process_running(pid):
            # プロセスが存在しない場合
            # メッセージを生成して出力(または送信)
            message = {
                "status": "Process Disappeared",
                "pid": pid
            }
            print("Process Disappeared:", message)

            # プロセス監視プロセスを停止
            ### del process_monitor_processes[pid]
            break

        # 一定の待機時間を設定して監視を継続
        time.sleep(10)  # 10秒ごとに監視

def is_process_running(pid):
    #try:
    #    os.kill(pid, 0)  # PIDを使ってプロセスにシグナルを送信し、存在を確認
    #    return True
    #except OSError:
    #    return False
    try:
        # ここの部分Windows特有のやりかたなので、後で、例の、os.kill(pid,0)を試してみること
        # tasklistコマンドを実行してプロセス一覧を取得
        result = subprocess.check_output(["tasklist", "/fi", f"PID eq {pid}"], universal_newlines=True)

        # 結果から指定したPIDの行を検索
        lines = result.splitlines()
        for line in lines:
            if f"{pid}" in line:
                return True

        # 指定したPIDが見つからない場合
        # ここに、停止時のメッセージ送信を組み込めばO.K.のはず   

        return False
    except Exception as e:
        return False





if __name__ == "__main__":
    # FastAPIサーバーを開始
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

2024,江端さんの技術メモ

指定されたコマンド "ping -100 kobore.net" をFastAPIのエンドポイントから実行し、それが終了したらAPIを正常に終了させるコードは以下のようになります:

test.py

import subprocess
import threading
import os
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class CommandRequest(BaseModel):
    command: str

@app.post("/execute-command")
def execute_command(request: CommandRequest):
    command = request.command
    # コマンドを非同期で実行
    execution_thread = threading.Thread(target=execute_command_async, args=(command,))
    execution_thread.start()
    return {"message": f"コマンド '{command}' の実行を開始しました"}

def execute_command_async(command):
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    process.wait()  # コマンドの終了を待つ
    print(f"コマンド '{command}' の実行が終了しました")
    # os._exit(0)  # FastAPIサーバーを終了させる

if __name__ == "__main__":
    # FastAPIサーバーを開始
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

$ uvicorn test:app --host 0.0.0.0 --reload

 

C:\Users\ebata\fastapi7>curl -X POST -H "Content-Type: application/json" -d "{\"command\": \"ping -n 20 kobore.net\"}" http://localhost:8000/execute-command

2024,江端さんの技術メモ

運行情報(ダイヤグラム)を反映した最短時間で到着する計算方法は、通常のダイクストラ法を拡張して実行することができます。以下は、ダイヤ情報を考慮する最短到着時間を計算する一般的なアプローチです:

  1. グラフの作成:
    • ダイヤグラムに基づいて、駅間の接続を表すグラフを作成します。各エッジ(駅間の接続)には、所要時間が含まれます。
    • グラフのノードは駅を表し、エッジの重みは駅間の移動にかかる時間です。
    • 通常のダイクストラ法と同様に、出発駅を始点としてグラフを探索します。
  2. ノードの拡張と更新:
    • 通常のダイクストラ法と同様に、出発駅から各駅への最短到着時間を記録するデータ構造を使用します。初期状態では、出発駅の最短到着時間を0に設定し、他の駅は無限大(無効値)とします。
    • プライオリティキューを使用して、最短到着時間が最小の駅を選択します。
  3. ダイヤ情報の適用:
    • 選択した駅から出発するエッジを調べ、ダイヤ情報を考慮して最短到着時間を計算します。
    • ダイヤ情報には、駅への到着時間や運行間隔などが含まれます。現在の到着時間とダイヤ情報を使用して、次の駅への最短到着時間を計算します。
    • 新しい到着時間が現在の最短到着時間よりも短い場合、その駅の最短到着時間を更新します。
  4. プライオリティキューから次の最短到着時間の駅を選択し、ステップ3を繰り返します。目的の駅に到達した場合、計算を終了します。
  5. 最短到着時間を使用して、目的の駅への最短経路を復元します。

このアプローチを使用すると、ダイヤ情報を反映した最短到着時間を計算できます。ダイヤ情報は、駅への到着時間や運行間隔を正確に取得し、計算に組み込む必要があります。また、プライオリティキューの実装や適切なデータ構造の設計も重要です。

2024,江端さんの技術メモ

/* 
   簡易バスダイヤ作成プログラム    c:\users\ebata\dummy1.go

   (1)バス路線があり、5つの停留所("A0", "A1", "A2", "A3", "A4", "A5")があります。
   (2)このバスは始点から運行を開始し、路線の終点でで一定時間停車した後、再び逆方向に運行を開始します。
   (3)バスは朝6時に出発して、5分単位で次の停留所で停止し、終端で10分間停止します。
   (4)これを3往復するものとします。

*/



package main

import (
	"fmt"
	"strings" // strings パッケージをインポート
	"time"
)

// バスのダイヤグラム
type BusSchedule struct {
	Route             []string             // 停留所のリスト
	DepartureTime     time.Time            // 出発時刻
	ArrivalTimeStops  map[string][]string // 各停留所の到着時刻
	ArrivalTimeRounds int                  // 往復回数
}

// バスのダイヤグラムを生成する関数
func GenerateBusSchedule(route []string, departureTime time.Time, numRoundTrips int) *BusSchedule {
	schedule := &BusSchedule{
		Route:             route,
		DepartureTime:     departureTime,
		ArrivalTimeStops:  make(map[string][]string),
		ArrivalTimeRounds: numRoundTrips,
	}

	currentTime := departureTime
	reverse := false // 逆向き運行を切り替えるフラグ

	for round := 0; round < numRoundTrips; round++ {
		routeOrder := make([]string, 0, len(route)*2-1)

		if reverse {
			// 逆向き運行の場合、終点から始点に戻る
			for i := len(route) - 1; i >= 0; i-- {
				stop := route[i]
				arrivalTime := currentTime.Format("15:04")
				schedule.ArrivalTimeStops[stop] = append(schedule.ArrivalTimeStops[stop], arrivalTime)
				routeOrder = append(routeOrder, fmt.Sprintf("%s(%d): %s", stop, len(schedule.ArrivalTimeStops[stop]), arrivalTime))
				if i > 0 {
					currentTime = currentTime.Add(5 * time.Minute)
				}
			}
			reverse = false
		} else {
			// 正向き運行の場合、始点から終点に向かう
			for i := 0; i < len(route); i++ {
				stop := route[i]
				arrivalTime := currentTime.Format("15:04")
				schedule.ArrivalTimeStops[stop] = append(schedule.ArrivalTimeStops[stop], arrivalTime)
				routeOrder = append(routeOrder, fmt.Sprintf("%s(%d): %s", stop, len(schedule.ArrivalTimeStops[stop]), arrivalTime))
				if i < len(route)-1 {
					currentTime = currentTime.Add(5 * time.Minute)
				}
			}
			reverse = true
		}

		fmt.Println(strings.Join(routeOrder, "->"))
		currentTime = currentTime.Add(10 * time.Minute) // 終点での停止時間
	}

	return schedule
}

func main() {
	route := []string{"A0", "A1", "A2", "A3", "A4", "A5"}
	departureTime := time.Date(2024, 1, 6, 6, 0, 0, 0, time.UTC)
	numRoundTrips := 3

	schedule := GenerateBusSchedule(route, departureTime, numRoundTrips)

	// routeOrder を表示
	fmt.Println("routeOrder:")
	for _, stop := range schedule.Route {
		fmt.Printf("%s:\n", stop)
		for i, arrivalTime := range schedule.ArrivalTimeStops[stop] {
			fmt.Printf("  通過%d: %s\n", i+1, arrivalTime)
		}
	}
}

出力結果

C:\Users\ebata>go run dummy1.go
A0(1): 06:00->A1(1): 06:05->A2(1): 06:10->A3(1): 06:15->A4(1): 06:20->A5(1): 06:25
A5(2): 06:35->A4(2): 06:40->A3(2): 06:45->A2(2): 06:50->A1(2): 06:55->A0(2): 07:00
A0(3): 07:10->A1(3): 07:15->A2(3): 07:20->A3(3): 07:25->A4(3): 07:30->A5(3): 07:35
routeOrder:
A0:
通過1: 06:00
通過2: 07:00
通過3: 07:10
A1:
通過1: 06:05
通過2: 06:55
通過3: 07:15
A2:
通過1: 06:10
通過2: 06:50
通過3: 07:20
A3:
通過1: 06:15
通過2: 06:45
通過3: 07:25
A4:
通過1: 06:20
通過2: 06:40
通過3: 07:30
A5:
通過1: 06:25
通過2: 06:35
通過3: 07:35

2024,江端さんの技術メモ

このプログラムの目的は、時刻表の乗り換え案内のアルゴリズムを実現する為のテストプログラムです。
「到着時刻より早い時間の電車やバスには乗れない」をダイクストラに組み込むことができるかを調べたものです。

ノードのValueが到着・出発時間を表わすと考えて下さい。

package main

import (
	"fmt"
	"math"
)

type Node struct {
	Name  string
	Value float64 // 各ノードに設定された数値
}

type Edge struct {
	From   *Node
	To     *Node
	Weight float64
}

func main() {
	/*
		// ノードとエッジを初期化
		nodeA := &Node{Name: "A", Value: 5}
		nodeB := &Node{Name: "B", Value: 8}
		nodeC := &Node{Name: "C", Value: 6}
		nodeD := &Node{Name: "D", Value: 2}
		nodeE := &Node{Name: "E", Value: 4}
	*/

	// ノードとエッジを初期化
	nodeA := &Node{Name: "A", Value: 1}
	nodeB := &Node{Name: "B", Value: 1}
	nodeC := &Node{Name: "C", Value: 0}
	nodeD := &Node{Name: "D", Value: 1}
	nodeE := &Node{Name: "E", Value: 1}

	/*
		edges := []Edge{
			{nodeA, nodeB, 2},
			{nodeA, nodeC, 4},
			{nodeB, nodeC, 1},
			{nodeB, nodeD, 7},
			{nodeC, nodeD, 3},
			{nodeC, nodeE, 5},
			{nodeE, nodeD, 2},
		}
	*/

	edges := []Edge{ // "方向性あり"に注意
		{nodeA, nodeB, 1},
		{nodeA, nodeC, 1},
		{nodeB, nodeC, 1},
		{nodeB, nodeD, 1},
		{nodeC, nodeD, 1},
		{nodeC, nodeE, 1},
		{nodeE, nodeD, 1},
		{nodeD, nodeE, 1},
	}

	startNode := nodeA
	targetNode := nodeE

	// ダイクストラアルゴリズムを実行
	shortestPath, totalWeight := dijkstra(startNode, targetNode, edges)

	if shortestPath == nil {
		fmt.Println("最短経路が見つかりませんでした。")
	} else {
		fmt.Printf("最短経路: %v\n", getNodeNames(shortestPath))
		fmt.Printf("最短経路の総重み: %.2f\n", totalWeight)
	}
}

func dijkstra(startNode, targetNode *Node, edges []Edge) ([]*Node, float64) {
	// ノード間の最短距離を格納するマップを初期化
	shortestDistances := make(map[*Node]float64)
	// 各ノードの前のノードを格納するマップを初期化
	predecessors := make(map[*Node]*Node)

	// 最短距離を無限大で初期化し、開始ノードの最短距離を0に設定
	for _, edge := range edges {
		shortestDistances[edge.From] = math.Inf(1)
		shortestDistances[edge.To] = math.Inf(1)
	}
	shortestDistances[startNode] = 0

	// 訪問済みのノードを格納するセットを初期化
	visitedNodes := make(map[*Node]bool)

	// まだ訪問していないノードが残っている間ループ
	for len(visitedNodes) < len(shortestDistances) {
		// 未訪問のノードの中から最短距離のノードを選択
		currentNode := getClosestUnvisitedNode(shortestDistances, visitedNodes)

		// ノードがない場合やターゲットノードに到達した場合は終了
		if currentNode == nil || currentNode == targetNode {
			break
		}

		// 隣接ノードの最短距離を更新
		for _, edge := range edges {
			//
			if edge.From == currentNode && edge.To.Value >= currentNode.Value { // ここがポイント
				distance := shortestDistances[currentNode] + edge.Weight
				if distance < shortestDistances[edge.To] {
					shortestDistances[edge.To] = distance
					predecessors[edge.To] = currentNode
				}
			}
		}

		// このノードを訪問済みとしてマーク
		visitedNodes[currentNode] = true
	}

	// 最短経路を復元
	shortestPath := make([]*Node, 0)
	currentNode := targetNode
	for currentNode != nil {
		shortestPath = append([]*Node{currentNode}, shortestPath...)
		currentNode = predecessors[currentNode]
	}

	// 最短経路の総重みを計算
	totalWeight := shortestDistances[targetNode]

	return shortestPath, totalWeight
}

func getClosestUnvisitedNode(distances map[*Node]float64, visitedNodes map[*Node]bool) *Node {
	minDistance := math.Inf(1)
	var closestNode *Node

	for node, distance := range distances {
		if !visitedNodes[node] && distance < minDistance {
			minDistance = distance
			closestNode = node
		}
	}

	return closestNode
}

func getNodeNames(nodes []*Node) []string {
	names := make([]string, len(nodes))
	for i, node := range nodes {
		names[i] = node.Name
	}
	return names
}