2024,江端さんの技術メモ

注意点は、通信プロトコルのメトリクスが2以上あろうとも、
main()の開始近くで

GMainLoop *main_loop = g_main_loop_new(NULL, FALSE);
を1回、

main()の終了近くで

// 共有のイベントループを実行
g_main_loop_run(main_loop);

// メインループをクリーンアップ
g_main_loop_unref(main_loop);

を、それぞれ1回だけ発行すること。

g_main_loop_run(main_loop)でプログラムは無限ループに入ってロックするので、その下のプログラムは実行されないことにも注意。

(以下はイメージ。コンパイルしても稼動しません)

#include <glib.h>
#include <gst/gst.h>
#include <pthread.h>

typedef struct {
    GstElement *pipeline;
    GMainLoop *main_loop;  // 共有のイベントループ
} ArgsRtspRTSP;

typedef struct {
    GstElement *srtserversink;
    GMainLoop *main_loop;  // 共有のイベントループ
} Args;

void *print_rtsp_metrics_thread(void *arg) {
    ArgsRtspRTSP *args = (ArgsRtspRTSP *)arg;
    // RTSPメトリクス計測のロジック
    while (1) {
        g_usleep(1000000); // 1秒ごとに計測
        g_print("RTSP Metrics: Calculating...\n");
    }
    return NULL;
}

void *print_srt_metrics_thread(void *arg) {
    Args *args = (Args *)arg;
    // SRTメトリクス計測のロジック
    while (1) {
        g_usleep(1000000); // 1秒ごとに計測
        g_print("SRT Metrics: Calculating...\n");
    }
    return NULL;
}

void initialize_rtsp_pipeline(ArgsRtspRTSP *args) {
    // パイプラインの初期化
    args->pipeline = gst_pipeline_new("rtsp-pipeline");
    GstBus *bus = gst_element_get_bus(args->pipeline);
    gst_bus_add_watch(bus, (GstBusFunc)gst_bus_async_signal_func, args->main_loop);
    gst_object_unref(bus);
}

void initialize_srt_pipeline(Args *args) {
    // パイプラインの初期化
    args->srtserversink = gst_element_factory_make("srtserversink", "srt-sink");
    GstBus *bus = gst_element_get_bus(args->srtserversink);
    gst_bus_add_watch(bus, (GstBusFunc)gst_bus_async_signal_func, args->main_loop);
    gst_object_unref(bus);
}

int main(int argc, char *argv[]) {
    gst_init(&argc, &argv);

    // 共有のイベントループを作成
    GMainLoop *main_loop = g_main_loop_new(NULL, FALSE);

    // RTSP引数の初期化
    ArgsRtspRTSP rtsp_args;
    rtsp_args.main_loop = main_loop;
    initialize_rtsp_pipeline(&rtsp_args);

    // SRT引数の初期化
    Args srt_args;
    srt_args.main_loop = main_loop;
    initialize_srt_pipeline(&srt_args);

    // スレッドの作成
    pthread_t rtsp_thread, srt_thread;

    pthread_create(&rtsp_thread, NULL, print_rtsp_metrics_thread, &rtsp_args);
    pthread_create(&srt_thread, NULL, print_srt_metrics_thread, &srt_args);

    // 共有のイベントループを実行
    g_main_loop_run(main_loop);

    // メインループをクリーンアップ
    g_main_loop_unref(main_loop);
    return 0;
}

ようするに、開始と終了は、1回のみ宣言する、ということ。

g_main_loop_run(main_loop) の後は、プログラムがそこでロックされて、その後のプログラムコードに進まないことに注意。

2024,江端さんの技術メモ

改めてGeoEntityDBの作り方の手順を示して下さい。

 

以下に、GeoEntityDB を作成する手順を改めて示します。


GeoEntityDBの作成手順

1. GeoEntityDBの作成

  1. PostgreSQLに接続します。
    psql -U postgres

     

  2. 新しいデータベースを作成し、PostGISを有効化します。
    CREATE DATABASE GeoEntityDB; \c GeoEntityDB CREATE EXTENSION postgis; CREATE EXTENSION pgrouting; -- 必要に応じて有効化

     


2. 地物テーブルの作成

地物データを格納するためのテーブルを作成します。

  1. テーブルスキーマを定義します。以下はカテゴリ付きの地物データ用のスキーマです。
    CREATE TABLE geo_entities ( id SERIAL PRIMARY KEY, -- 自動生成の一意識別子 category VARCHAR(10), -- カテゴリ (例: A1, B1, D2) stay_time INTEGER, -- 滞在時間 (分) geom GEOMETRY(POINT, 4326) -- 座標データ (EPSG:4326の緯度経度) );

    # id SERIAL PRIMARY KEY の部分は自動的にインポートされるので、csvの先頭の番号は行削除しておくこと

  2. インデックスを作成して検索性能を向上させます。
    CREATE INDEX geo_entities_geom_idx ON geo_entities USING GIST (geom);

     


3. CSVデータのインポート

CSVファイルから地物データをインポートします。

  1. 一時テーブルを作成(緯度・経度データを変換するために使用)。
    CREATE TEMP TABLE temp_geo_entities ( category VARCHAR(10), stay_time INTEGER, latitude DOUBLE PRECISION, longitude DOUBLE PRECISION );

     

  2. CSVをインポート(ファイルのパスは実際のCSVファイルの場所を指定してください)。
    COPY temp_geo_entities (category, stay_time, latitude, longitude) FROM '/path/to/geo_entities.csv' DELIMITER ',' CSV HEADER;

    # 実は、'/path/to/geo_entities.csv'の部分で酷く転けた記憶があるが、結果的にインポートできてしまったので、忘れた。

  3. 本テーブルに挿入(緯度・経度を POINT 型に変換して挿入)。
    INSERT INTO geo_entities (category, stay_time, geom) SELECT category, stay_time, ST_SetSRID(ST_MakePoint(longitude, latitude), 4326) FROM temp_geo_entities;

     

  4. データが正しくインポートされたか確認します。
    SELECT * FROM geo_entities LIMIT 10;

    #ここから先は試していないが、メモに残しておく。


4. 既存DBとの連携設定

方法 1: postgres_fdw を使った連携

hakata_ex_cost_db(道路ネットワークDB)と GeoEntityDB を連携させます。

  1. postgres_fdw を有効化(GeoEntityDBで実行)。
    CREATE EXTENSION postgres_fdw;

     

  2. 外部サーバーを登録
    CREATE SERVER hakata_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (dbname 'hakata_ex_cost_db', host 'localhost');

     

  3. 外部DBへのユーザーマッピングを作成
    CREATE USER MAPPING FOR current_user SERVER hakata_server OPTIONS (user 'postgres', password 'your_password');

     

  4. 必要なテーブルをインポート(例: 道路ネットワークテーブル)。
    IMPORT FOREIGN SCHEMA public FROM SERVER hakata_server INTO public;

     

  5. テーブルが使用可能か確認
    SELECT * FROM ways LIMIT 10;

     


5. 経路計算の設定

最短経路計算の基本例

エージェントの現在地から目的地(地物)までの経路を計算します。

  1. エージェントの現在地に最も近いノードを取得
    SELECT id FROM ways_vertices_pgr ORDER BY geom <-> ST_SetSRID(ST_MakePoint(130.3980676, 33.59543503), 4326) LIMIT 1;

     

  2. 地物の位置に最も近いノードを取得
    SELECT id FROM ways_vertices_pgr ORDER BY geom <-> (SELECT geom FROM geo_entities WHERE id = 1) LIMIT 1;

     

  3. Dijkstraアルゴリズムで経路計算
    SELECT seq, id1 AS node, id2 AS edge, cost FROM pgr_dijkstra( 'SELECT id, source, target, cost FROM ways', start_node_id, end_node_id, directed := false );

     


6. 地物データと経路計算の組み合わせ

以下のようなクエリで、地物データの位置と経路計算を連携させます。

  1. エージェントの現在地から15メートル以内の地物を検索(これ、上手く稼動しません。下の方は使って下さい)
    SELECT id, category, stay_time, ST_Distance(geom, ST_SetSRID(ST_MakePoint(130.3980676, 33.59543503), 4326)) AS distance FROM geo_entities WHERE ST_DWithin(geom, ST_SetSRID(ST_MakePoint(130.3980676, 33.59543503), 4326), 15);

     

  2. 発見した地物への最短経路を計算
    SELECT seq, id1 AS node, id2 AS edge, cost FROM pgr_dijkstra( 'SELECT id, source, target, cost FROM ways', start_node_id, (SELECT id FROM ways_vertices_pgr ORDER BY geom <-> (SELECT geom FROM geo_entities WHERE id = found_geo_id) LIMIT 1), directed := false );

     


まとめ

  1. GeoEntityDBの作成とデータインポート:
    • PostGISとpgRoutingを有効化し、CSVデータを地物テーブルに格納。
  2. 既存DBとの連携:
    • postgres_fdw を使用して、hakata_ex_cost_db の道路ネットワークを参照。
  3. 経路計算の実装:
    • エージェントの現在地と地物データを連携させ、道路ネットワーク上で最短経路を計算。

これにより、エージェントが嗜好に応じて地物に向かう動きをリアルにシミュレーションする仕組みを構築できます。

ーーーーー
こういうミスもやったので、以下のメモに残しておく。

SELECT * FROM geo_entities WHERE category = 'B1'; としたらエントリーがでてきませんが、
SELECT * FROM geo_entities WHERE category = ' B1'; としたらエントリーがでてきました。
categoryの要素でえ最初に空白が入っているものを修正したいのですが、どうしたら良いでしょうか

 

category の要素の先頭に空白が含まれている場合、それを修正するには TRIM 関数を使用して先頭と末尾の空白を取り除く必要があります。以下に、データを修正する方法を説明します。


手順

1. データの確認

まず、空白が含まれているデータを確認します。

SELECT * FROM geo_entities WHERE category LIKE ' %' OR category LIKE '% ';

 

  • LIKE ' %' は先頭に空白があるデータを検索します。
  • LIKE '% ' は末尾に空白があるデータを検索します。

2. 空白を修正

TRIM 関数を使用して、category カラムから空白を取り除きます。

すべてのエントリを修正する場合

以下のクエリで、空白を削除した値に更新します。

UPDATE geo_entities SET category = TRIM(category) WHERE category LIKE ' %' OR category LIKE '% ';

 


3. 修正結果の確認

修正後に、データが正しく更新されたか確認します。

SELECT * FROM geo_entities WHERE category LIKE ' %' OR category LIKE '% ';

 

  • クエリの結果が空であれば、すべての空白が正常に削除されたことを意味します。

4. 再度クエリを実行

修正後、以下のようにクエリを実行して正しいデータが取得できるか確認します。

SELECT * FROM geo_entities WHERE category = 'B1';

 


補足

  • TRIM の動作: TRIM 関数は、先頭と末尾の両方から空白を削除します(不要なスペースも含む)。
  • データの一時バックアップ: データ更新は破壊的変更となるため、念のため更新前にバックアップを取得しておくことをおすすめします。

これで問題が解決するはずです!もしさらなるサポートが必要であればお知らせください。


ざっくりとした使い方

 

geoentitydb=# \l
List of databases
Name | Owner | Encoding | Locale Provider | Collate | Ctype | ICU Locale | ICU Rules | Access privileges
--------------------+----------+----------+-----------------+------------+------------+------------+-----------+-----------------------
geoentitydb | postgres | UTF8 | libc | en_US.utf8 | en_US.utf8 | | |
hakata_db | postgres | UTF8 | libc | en_US.utf8 | en_US.utf8 | | |
hakata_ex_cost_db | postgres | UTF8 | libc | en_US.utf8 | en_US.utf8 | | |

geoentitydb=# SELECT id, category, stay_time, geom FROM geo_entities where id = 29;
id | category | stay_time | geom
----+----------+-----------+----------------------------------------------------
29 | B1 | 20 | 0101000020E610000006802A6E9C4C6040099FB5F44BCB4040

 

geoentitydb=# SELECT id, category, stay_time, ST_X(geom) AS longitude, ST_Y(geom) AS latitude FROM geo_entities where id = 29;
id | category | stay_time | longitude | latitude
----+----------+-----------+-------------+-------------
29 | B1 | 20 | 130.3940955 | 33.58825549

こんな感じででてくる。

以下は、130.4064827799044, 33.591348965846656 から 100メートル以内のジオエンティティを抽出するSQLです。(こっちが正しい)

geoentitydb=# SELECT id, category, stay_time, ST_Distance(geom::geography, ST_SetSRID(ST_MakePoint(130.4064827799044, 33.591348965846656), 4326)::geography) AS distance FROM geo_entities WHERE ST_DWithin(geom::geography, ST_SetSRID(ST_MakePoint(130.4064827799044, 33.591348965846656), 4326)::geography, 100);

id | category | stay_time | distance
------+----------+-----------+-------------
304 | A2 | 60 | 68.03775787
357 | A3 | 120 | 75.80618279
366 | A3 | 120 | 76.48825161
414 | D2 | 180 | 80.65210758
415 | D2 | 180 | 79.93815148
429 | D2 | 180 | 61.23626794
435 | D2 | 180 | 62.50441807
546 | E1 | 480 | 77.36793898
550 | E1 | 480 | 90.09868468
813 | E2 | 300 | 54.40053345
1174 | F2 | 10 | 92.85196166
1217 | F2 | 10 | 89.44133931
1933 | H2 | 30 | 64.5772006
1942 | H2 | 30 | 84.70916514
(14 rows)

2024,江端さんの技術メモ

江端がずっとメンテナンスしているgetDijkstraPath関数は、costを変更することによって、ダイクストラ計算を引き曲げているので、距離が正確に出せない。

そこで、agg_costを使わずに、agg_length_mというものを作って、強制的に距離を作り出すようにコードを変更した。

 

func getDijkstraPath(dbMap *sql.DB, locInfoStart, locInfoGoal LocInfo) ([]LocInfo, float64) {

	var path []LocInfo // 経路 (返り値の一つ目)
	var totalDistanceKm float64

	// 例外処理 locInfoStart.source == locInfoGoal.source の場合
	if locInfoStart.Source == locInfoGoal.Source {
		source := locInfoStart.Source

		// SQLクエリの作成
		query := fmt.Sprintf(`
		SELECT x1, y1
		FROM ways
		WHERE source = %d;
	`, source)

		// SQLクエリの実行
		var x1, y1 float64
		err := dbMap.QueryRow(query).Scan(&x1, &y1)
		if err != nil {
			fmt.Println("tools.go line 204 error occures")
			//log.Fatal(err)  // ここで止めたらリターンしなくなる
		}

		var loc LocInfo
		loc.Source = source
		loc.Lon = x1
		loc.Lat = y1

		path = append(path, loc)

		totalDistanceKm = 0.0
		return path, totalDistanceKm
	}

	// こちらは、cost, reverse_cost を使っている
	query := `
	SELECT seq, source, target, x1, y1, x2, y2, agg_cost, length_m
	FROM pgr_dijkstra(
		'SELECT gid as id, source, target, cost, reverse_cost FROM ways', 
		$1::bigint, 
		$2::bigint, 
		directed:=false
	) a 
	INNER JOIN ways b ON (a.edge = b.gid) 
	ORDER BY seq
	`

	rowsDijkstra, errDijkstra := dbMap.Query(query, locInfoStart.Source, locInfoGoal.Source)
	if errDijkstra != nil {
		log.Fatal(errDijkstra)
		os.Exit(1)
	}
	defer rowsDijkstra.Close()

	var agg_cost float64
	var length_m float64
	agg_length_m := 0.0

	var loc LocInfo
	var x1, y1, x2, y2 float64
	var seq int
	var target int
	var source int

	isFirstCheck := true
	isSourceCheck := true

	count := 0

	for rowsDijkstra.Next() {
		// まずnodeを読む
		if err := rowsDijkstra.Scan(&seq, &source, &target, &x1, &y1, &x2, &y2, &agg_cost, &length_m); err != nil {
			fmt.Println(err)
		}

		agg_length_m += length_m
		// fmt.Println("length_m", length_m, "agg_length_m", agg_length_m)

		// 最初の1回だけチェックのために入る これについては、https://wp.kobore.net/江端さんの技術メモ/post-7668/を参照のこと
		// もし rowsDijkstra.Scanで最初のsource値を読みとり、locInfoStart.Source の値と同じであれば、x1,y1をベースとして、異なる値であれば、x2,y2をベースとする

		if isFirstCheck {
			if source == locInfoStart.Source {
				isSourceCheck = true // x1, y1をベースとする処理になる
			} else {
				isSourceCheck = false // x2,y2をベースとする処理になる
			}
			isFirstCheck = false // 最初の1回をチェックすることで、2回目はこのループには入らなくなる
		}

		//var loc LocInfo

		if isSourceCheck { // x1, y1をベースとする処理
			loc.Source = source
			loc.Lon = x1
			loc.Lat = y1
		} else { // x2,y2をベースとする処理
			loc.Source = target
			loc.Lon = x2
			loc.Lat = y2
		}
		path = append(path, loc)

		count++
	}

	// ラストノードだけは手入力 (ここは引っくり返す) (今、ここの部分は無視されている(いいのかな?))
	if isSourceCheck { // x1, y1をベースとする処理
		loc.Source = target
		loc.Lon = x2
		loc.Lat = y2
	} else { // x2,y2をベースとする処理
		loc.Source = source
		loc.Lon = x1
		loc.Lat = y1
	}

	// なんかprg_dijkstraが変な値を返す場合があるので、その対応(ロジカルではないが、パッチ的に対応)
	if loc.Source == locInfoGoal.Source {
		path = append(path, loc)
	}

	fmt.Println("count", count)

	if count == 0 { // 1行のみの場合、ヌルになるという問題に対応するため、
		loc.Source = locInfoGoal.Source
		loc.Lon = locInfoGoal.Lon
		loc.Lat = locInfoGoal.Lat

		// 入力値の指定
		source := locInfoStart.Source
		target := locInfoGoal.Source

		// SQLクエリの作成
		query := fmt.Sprintf(`
		SELECT length_m, x1, y1, x2, y2
		FROM ways
		WHERE source = %d AND target = %d;
	`, source, target)

		// SQLクエリの実行
		var length float64
		var x1, y1, x2, y2 float64
		err := dbMap.QueryRow(query).Scan(&length, &x1, &y1, &x2, &y2)
		if err != nil {
			log.Println("First attempt failed. Retrying with swapped source and target.")
			// 入れ替えたsourceとtargetの値でクエリを再実行
			query = fmt.Sprintf(`
			SELECT length_m, x1, y1, x2, y2
			FROM ways
			WHERE source = %d AND target = %d;
		`, target, source)
			err = dbMap.QueryRow(query).Scan(&length, &x1, &y1, &x2, &y2)
			if err != nil {
				//log.Fatal(err)  // 諦める。とりあえず、エラーを返す
				return nil, 0.0
			}
		}

		// 結果の出力
		fmt.Printf("length_m: %f\n", length)
		fmt.Printf("source: %d\n", source)
		fmt.Printf("target: %d\n", target)
		fmt.Printf("x1: %f\n", x1)
		fmt.Printf("y1: %f\n", y1)
		fmt.Printf("x2: %f\n", x2)
		fmt.Printf("y2: %f\n", y2)

		if source == locInfoGoal.Source {
			loc.Lon = x1
			loc.Lat = y1
		} else {
			loc.Lon = x2
			loc.Lat = y2
		}

		agg_cost = length

		fmt.Println("loc", loc)

		path = append(path, loc) // 354
	} // if count == 0

	//totalDistanceKm = agg_cost / 1000.0
	totalDistanceKm = agg_length_m / 1000.0
	return path, totalDistanceKm
}

2024,江端さんの技術メモ

仮想RTSPカメラの作り方と使い方のメモ

2024/11/21
江端
この仮想RTSPカメラは、クライアント(受信側)の接続を止めて、再度接続しようとするとクライアント(受信側)に動画が表示されなくなる、という欠点が発覚しております(2024/11/26) 

1. 背景と要件

1.1. 背景

  1. 地車間実験において複数カメラを準備する為、複数の仮想のRTSPカメラが必要となったため、これに対応する

1.2. 仮想RTSPカメラの要件

  1. 1台のWindowsPCにて最大4台程度の仮想RTSPカメラが実現できること
  2. 仮想カメラは指定したmp4ファイルの映像を配送する
  3. 上記のmp4ファイルは繰り返し再生できることが望ましい
  4. 仮想RTSPカメラは、通常のRTSPカメラと同様にrtsp://127.0.0.1:8554/testという形式で指定できること
  5. できるだけ簡易かつ汎用的なコマンドで実現できることが望ましい。

1.3. 事前検討

  1. 仮想RTSPカメラの実現方法としては、(1)ffmpeg, (2)Gstreamer, (3)VLC(VideoLAN Client), (4)C言語(Gstreamerライブラリ)が挙げられている。
  2. 検討の結果、mp4ファイルを繰り返し再生できる方法は指定されているが、試したところ実際に動いたのはVLCのみだったの、VLCで実現することにした。
  3. vlcのバージョンは、3.0.20以上であること(3.0.14では稼動しないことを確認済)。

2. 説明用の設定

  • 仮想RTSPカメラを設定するWindows10パソコンのIPアドレスを、便宜的に、192.168.0.3として取り扱うこととする。
  • 仮想RTSPカメラを再生するWindows10パソコンのIPアドレスを、便宜的に、192.168.0.25として取り扱うこととする。

3. VLCによる仮想カメラの作り方

3.1. 事前準備: 送信側(192.168.0.3)のファイアウォール設定設定

本節の設定は、設定しなくても動くこともあるので、動かなくなった場合のみに対応する。

送信側のVLCは、RTSP制御パケット(TCP:8554)とRTPデータストリーム(UDPポート)を送信する。これらがブロックされないようにする。

3.1.1. 受信規則の設定(クライアントからのRTSP制御パケットを許可)を許可)

  1. 「Windowsセキュリティ」懼「ファイアウォールとネットワーク保護」懼「詳細設定」を開きます。
  2. 左側の「受信規則」を右クリックして「新しい規則」を選択。
  3. **「ポート」**を選択して「次へ」。
  4. TCPを選択し、「特定のローカルポート」に8554を入力して「次へ」。
  5. 「接続を許可する」を選択し「次へ」。
  6. プロファイルを選択(プライベートにチェックを入れる)し「次へ」。
  7. 名前を設定(例: VLC RTSP TCP)して「完了」をクリック。

3.1.2. 送信規則の設定(RTP/RTCPデータを送信)を送信)

  1. 同様に「送信規則」を右クリックして「新しい規則」を選択。
  2. **「ポート」**を選択して「次へ」。
  3. UDPを選択し、「特定のローカルポート」に5000-5500を入力して「次へ」。
  4. 「接続を許可する」を選択し「次へ」。
  5. プロファイルを選択(プライベートにチェックを入れる)し「次へ」。
  6. 名前を設定(例: VLC RTP UDP)して「完了」をクリック。

3.2. 事前準備: 受信側(192.168.0.25)のファイアウォール設定設定

受信側が、Windows10でない場合は、本節は無視して下さい。

受信側のGStreamerは、RTSP制御パケット(TCP:8554)とRTPデータストリーム(UDPポート)を受信します。これらを許可します。

3.2.1. 受信規則の設定(RTSPとRTPストリームの受信を許可)を許可)

  1. 「Windowsセキュリティ」懼「ファイアウォールとネットワーク保護」懼「詳細設定」を開きます。
  2. 左側の「受信規則」を右クリックして「新しい規則」を選択。
  3. **「ポート」**を選択して「次へ」。
  4. TCPを選択し、「特定のローカルポート」に8554を入力して「次へ」。
  5. 「接続を許可する」を選択し「次へ」。
  6. プロファイルを選択(プライベートにチェックを入れる)し「次へ」。
  7. 名前を設定(例: GStreamer RTSP TCP)して「完了」をクリック。

3.2.1.1. RTPストリームのポート設定設定**

  1. 再度「受信規則」を右クリックして「新しい規則」を選択。
  2. **「ポート」**を選択して「次へ」。
  3. UDPを選択し、「特定のローカルポート」に5000-5500を入力して「次へ」。
  4. 「接続を許可する」を選択し「次へ」。
  5. プロファイルを選択(プライベートにチェックを入れる)し「次へ」。
  6. 名前を設定(例: GStreamer RTP UDP)して「完了」をクリック。

3.2.2. アプリケーション許可(必要に応じて)応じて)

GStreamerがファイアウォールでブロックされている場合、gst-launch-1.0の実行ファイルを許可します。

  1. 「Windows Defender ファイアウォールを介したアプリまたは機能を許可する」をクリック。
  2. 「別のアプリを許可する」をクリックし、GStreamerの実行ファイル(例: C:\msys64\mingw64\bin\gst-launch-1.0.exe)を指定して追加。
  3. 「プライベート」にチェックを入れて「OK」をクリック。

3.3. 仮想RTSPカメラの起動方法

以下のコマンド(例示)で起動する。

$ "C:\Program Files\VideoLAN\VLC\vlc.exe" -vvv "0326_JP.mp4" --sout="#rtp{sdp=rtsp://0.0.0.0:8554/test}" --loop

コマンドの内容は、以下の通りである。

  • "C:\Program Files\VideoLAN\VLC\vlc.exe": vlc.exeを起動するコマンドをフルパスで指定。
  • vvv: デバッグ情報を詳細に出力する。問題発生時のトラブルシューティングに役立つ。
  • "0326_JP.mp4": 再生するmp4のファイル名(このファイル名は例示であるので、使用時のファイル名を使用する)
  • rtsp://0.0.0.0:8554/test: RTSPアドレス名。0.0.0.0は特殊なアドレスで、「すべてのネットワークインターフェース」を意味する。サーバーがリッスンする対象が、特定のIPアドレスではなく、すべての有効なインターフェースで接続を受け付ける設定である。8554は、ポート番号である。RTSPのデフォルトポート番号は554であるが、アプリケーションによって異なるポート番号が指定されることがある。この例では8554を使用している。/testは、ストリームのパス名を示す。サーバー内でストリームを識別するための名前である。
  • loop: ファイルの繰り返し再生を指定する。

なお、上記のコマンドを投入すると、以下の黒い画面が立ち上がり、コマンドは直ちにリターンする。矢印は再生している位置を示している。

このコマンドを停止するには、この画面を右上の『・』を押下する。

3.4. 仮想RTSPカメラのフレームレート/画像サイズの変更方法

VLCを使って仮想RTSPカメラを実現した際に、送信するフレームレートや画像サイズ(解像度)を変更することは可能である。以下の方法で設定を変更する。

$ "C:\Program Files\VideoLAN\VLC\vlc.exe" -vvv "0326_JP.mp4" --sout="#transcode{vcodec=h264,fps=15,width=640,height=360}:rtp{sdp=rtsp://0.0.0.0:8554/test}" --loop

  • fps=15:
    • 送信するフレームレートを15fpsに設定する。
    • 必要に応じて、他の値(例: 3060)に変更可能。
  • width=640,height=360:
    • 解像度を640x360に設定しています。
    • 必要に応じて、他の解像度(例: 1920x1080640x480)に変更可能。

3.5. 複数の仮想RTSPカメラの起動方法

今回のケースでは、1台のPCで複数のカメラを実現するので、IPアドレスを固定として、異なるポート番号を使って、複数のカメラを実現することとする。

具体的には、以下のポート番号(8554, 8555, 8556)を変えて、それぞれコマンドを押下することで、同じmp4ファイルで3台のカメラが実現できる(もちろん、mp4ファイルを変えても良い)。

$ "C:\Program Files\VideoLAN\VLC\vlc.exe" -vvv "0326_JP.mp4" --sout="#rtp{sdp=rtsp://0.0.0.0:8554/test}" --loop

$ "C:\Program Files\VideoLAN\VLC\vlc.exe" -vvv "0326_JP.mp4" --sout="#rtp{sdp=rtsp://0.0.0.0:8555/test}" --loop

$ "C:\Program Files\VideoLAN\VLC\vlc.exe" -vvv "0326_JP.mp4" --sout="#rtp{sdp=rtsp://0.0.0.0:8556/test}" --loop

4. 仮想RTSPカメラの起動確認方法

4.1. VLCを使った起動確認方法

簡易な確認方法として、画像受信もVLCを使用する方法を提示する。

受信側Windows10(192.168.0.25)のVLCを立ち上げて、以下の操作を行う。

ネットワークURLに、rtsp://192.168.0.3:8554/testと入力する。

これで映像が表示されれば成功である。

4.2. GStreamerを使った起動確認方法

以下のコマンドを投入して下さい。

$ gst-launch-1.0 -v rtspsrc location=rtsp://192.168.0.3:8554/test latency=0 ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! glimagesink

(Windows10の場合は、autovideosinkでは動かないことが多い)

4.3. 注意点

仮想RTSPカメラはmp4ファイルを繰り返し再生するが、ファイル終了時にVLC(および殆どのクライアント)は、映像が停止したものと判断して再生を終了する。

現時点で、この対応方法は不明である。

以上

2024,江端さんの技術メモ

前提

  • iPad用HDMI変換ポートを購入(純正は高いが、iPhoneでネトフリ見る時など重宝しているので、純正の購入をお勧めします)
  • PowerPointのレビュー専用のアプリをインストールしておくこと(レビュー専用ならタダ)
  • 資料はmail経由などを使ってiPadにダウンロードしておく

手順

Step 1 "ファイルのフォルダを開く"

Step 2 ファイルを開く

Step 3 ここをクリックする

Step 4 こういう画面が出てくる(場合によっては、プレゼンテーション画面そのものが出てくることもある)

Step 5 ディスプレイ/プロジェクタの方にプレゼンテーションモードで表示される(といいな)

Step 5 "Step 4"の画面を操作すると、画面が変わる(ちなみに、画面の左下の方を叩くとページが前に戻り、右下の方を叩くとページが次に動く

2024,江端さんの技術メモ

博多駅のNodeを選んで、GIS-DBを作りなおしてみたが、博多駅から西に抜けるwayがなかったので、抜けるNodeに接続するレコードを手動で作った。

Postgresql + PostGISのデータベースに、エントリを直接投入して、結線道路を一本作る

gidとosm_idは衝突しない数値を選んで、tag_idは、既存の数値を選ぶ(選ばないと弾かれる)

あとは、sourceとtargetからsource_osm、target_osm とそれぞれの座標を引っ張って、あとは、chatGPTに座標を入れると2点間の距離を教えてくれる。

この距離が、 length_m となる。 lengthは、length_m / 92854.2986 の値になる。 length = cost = reverse_cost となる。 cost_s,  reverse_cost_sは、時間を含めたコストらしいが、cost x 6685.509499 の値を入れておいた(まあ、どうでも言いのだろう)

あとthe_geomの値も、ChatGPTが教えてくれる。このSQL文を教えて貰った。

SELECT ST_AsEWKB(ST_MakeLine(
ST_SetSRID(ST_MakePoint(130.4210743, 33.5897456), 4326),
ST_SetSRID(ST_MakePoint(130.4206351, 33.5899416), 4326)
)) AS the_geom_hex;

で、最終的に、こうなった。

INSERT INTO ways (gid, osm_id, tag_id, length, length_m, name, source, target, source_osm, target_osm, cost, reverse_cost, cost_s, reverse_cost_s, rule, one_way, oneway, x1, y1, x2, y2, maxspeed_forward, maxspeed_backward, priority, the_geom)
VALUES (200001, 2000000001, 501, 0.000497554, 46.2, 'ebata', 44351 , 89839, 1882462094, 7992560451, 0.000497554, 0.000497554, 3.3264, 3.3264, NULL, 0, 'UNKNOWN', 130.4210743 , 33.5897456 , 130.4206351 , 33.5899416 , 10 , 10 , 0 , '0102000020E610000002000000f475CF70794D6040DE7AA8C87CCB404097c1BDD7754D60406446D33483CB4040');

で、上手く動いているようです。

SELECT seq, edge, b.the_geom AS "the_geom" FROM pgr_dijkstra('SELECT gid as id, source, target, cost, reverse_cost FROM ways', 51913,23498) a INNER JOIN ways b ON (a.edge = b.gid) ORDER BY seq;

として、

2024,江端さんの技術メモ

第三章を20回ほど読み直して、方式よりメソッドの理解に努めました。

資料5 『非集計ロジットのRプログラム例』のMNLモデルのサンプルコードを拡張したものを使っていたのですが、上手く動かなかったので、GO言語で作ってみました。

/*
このプログラムは、Go言語を用いて多項ロジットモデル(Multinomial Logit Model, MNL)の推定を行い、以下の要素を計算・表示するものです。

### プログラムの主要な機能

1. **データ読み込み**
   - `loadData` 関数で、`transport_data.csv` ファイルからデータを読み込み、各選択肢の所要時間、費用、利用可能性、および実際に選択された結果を `Data` 構造体に格納します。

2. **対数尤度関数の定義**
   - `logLikelihood` 関数で、ロジットモデルの対数尤度関数を定義します。
   - 各選択肢について、効用を計算し、その効用に基づいた選択確率を計算します。実際の選択結果に基づいて対数尤度を計算し、最適化のために負の対数尤度を返します。

3. **初期尤度の計算**
   - `initialLogLikelihood` 関数で、選択肢が全て等確率(均等選択)で選ばれると仮定した場合の初期尤度を計算します。
   - 初期尤度は、モデルの改善度を測るための基準として使用します。

4. **ヘッセ行列の計算(数値微分による近似)**
   - `hessianMatrix` 関数で、数値微分を用いて対数尤度関数の二階微分からヘッセ行列を計算します。
   - `epsilon` は微小量として設定され、4つの関数値の組み合わせを使って二階微分を近似的に計算します。
   
5. **最適化の実行**
   - `optimize.Problem` 構造体を用いて、対数尤度関数を最小化(尤度最大化)する問題を定義します。
   - `optimize.Minimize` 関数で最適化を実行し、最適なパラメータを推定します。

6. **結果の表示**
   - 最適化が終了すると、次の項目が表示されます:
     - 最適化されたパラメータ
     - 最終対数尤度
     - ヘッセ行列
     - t値
     - 尤度比と補正済み尤度比

### t値の計算

- `t値` は、推定されたパラメータの統計的有意性を示す指標であり、各パラメータの推定値をヘッセ行列の対角要素の平方根で割ることで求められます。
- ヘッセ行列の対角要素が非常に小さい場合や不安定な値の場合、計算が `NaN` になるのを防ぐため、`diag > 1e-5` の条件で計算しています。

### 尤度比と補正済み尤度比

- **尤度比**:初期尤度と最終尤度の差を初期尤度で割ったもので、モデルがどれだけデータに適合しているかの指標です。
- **補正済み尤度比**:尤度比に、パラメータ数の補正を加えたものです。

### 実行結果の例

実行時には、次のような情報が出力されます:

1. **初期尤度**(均等選択の仮定による尤度)
2. **最適化されたパラメータ**
3. **最終尤度**(モデルの最大尤度)
4. **ヘッセ行列**(二階微分を基に計算された行列)
5. **t値**(推定パラメータの統計的有意性の指標)
6. **尤度比**および**補正済み尤度比**(モデルの改善度の指標)

### 全体の流れ

1. `transport_data.csv` からデータを読み込む。
2. 初期尤度と、最適化前の準備を行う。
3. `optimize.Minimize` 関数を使って対数尤度を最大化し、パラメータを推定する。
4. 最適化後の結果(パラメータ、最終尤度、ヘッセ行列、t値、尤度比)を表示する。

### 注意点

- このプログラムは `gonum` パッケージを使用しているため、事前に `gonum` をインストールしておく必要があります。
- ヘッセ行列の数値微分による計算には誤差が生じやすく、`epsilon` の設定を適切に行う必要があります。

*/

package main

import (
	"encoding/csv"
	"fmt"
	"log"
	"math"
	"os"
	"strconv"

	"gonum.org/v1/gonum/mat"
	"gonum.org/v1/gonum/optimize"
)

// データ構造体の定義
type Data struct {
	所要時間  [6]float64
	費用    [6]float64
	利用可能性 [6]int
	選択結果  int
}

// データの読み込み
func loadData(filename string) ([]Data, error) {
	file, err := os.Open(filename)
	if err != nil {
		return nil, err
	}
	defer file.Close()

	reader := csv.NewReader(file)
	records, err := reader.ReadAll()
	if err != nil {
		return nil, err
	}

	// データの解析
	var data []Data
	for _, record := range records[1:] { // ヘッダをスキップ
		var d Data
		for i := 0; i < 6; i++ {
			d.所要時間[i], _ = strconv.ParseFloat(record[7+i], 64)
			d.費用[i], _ = strconv.ParseFloat(record[13+i], 64)
			d.利用可能性[i], _ = strconv.Atoi(record[1+i])
		}
		d.選択結果, _ = strconv.Atoi(record[0])
		data = append(data, d)
	}

	return data, nil
}

// 対数尤度関数の定義
func logLikelihood(x []float64, data []Data) float64 {
	// パラメータの取得
	b := x[:6] // 定数項
	d1 := x[6]
	f1 := x[7]
	epsilon := 1e-10
	LL := 0.0

	for _, d := range data {
		// 効用の計算
		var v [6]float64
		for i := 0; i < 6; i++ {
			v[i] = b[i] + d1*(d.所要時間[i]/10) + f1*(d.費用[i]/10)
		}

		// 各選択肢の効用
		var vehicle [6]float64
		var deno float64
		for i := 0; i < 6; i++ {
			if d.利用可能性[i] == 1 {
				vehicle[i] = math.Exp(v[i])
				deno += vehicle[i]
			}
		}
		deno += epsilon

		// 選択確率の計算と対数尤度の加算
		selected := d.選択結果 - 1
		if selected >= 0 && selected < 6 {
			P := vehicle[selected] / deno
			LL += math.Log(P + epsilon)
		}
	}

	return -LL // 最小化のため、対数尤度を負にする
}

// 初期尤度の計算
func initialLogLikelihood(data []Data) float64 {
	numChoices := 6
	initialProb := 1.0 / float64(numChoices)
	L0 := float64(len(data)) * math.Log(initialProb)
	return L0
}

// ヘッセ行列の計算(数値微分)
func hessianMatrix(f func([]float64) float64, x []float64) *mat.Dense {
	n := len(x)
	hessian := mat.NewDense(n, n, nil)
	epsilon := 1e-7 // 微小量を小さめに調整

	for i := 0; i < n; i++ {
		for j := 0; j < n; j++ {
			x1 := make([]float64, n)
			x2 := make([]float64, n)
			x3 := make([]float64, n)
			x4 := make([]float64, n)
			copy(x1, x)
			copy(x2, x)
			copy(x3, x)
			copy(x4, x)

			x1[i] += epsilon
			x1[j] += epsilon
			x2[i] += epsilon
			x2[j] -= epsilon
			x3[i] -= epsilon
			x3[j] += epsilon
			x4[i] -= epsilon
			x4[j] -= epsilon

			f1 := f(x1)
			f2 := f(x2)
			f3 := f(x3)
			f4 := f(x4)

			hessian.Set(i, j, (f1-f2-f3+f4)/(4*epsilon*epsilon))
		}
	}

	return hessian
}

func main() {
	// データの読み込み
	data, err := loadData("transport_data.csv")
	if err != nil {
		log.Fatalf("データの読み込みに失敗しました: %v", err)
	}

	// 初期尤度の計算
	L0 := initialLogLikelihood(data)
	fmt.Printf("初期尤度: %.4f\n", L0)

	// 初期パラメータの設定
	b0 := []float64{0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.01, 0.01}

	// 最適化問題の設定
	problem := optimize.Problem{
		Func: func(x []float64) float64 {
			return logLikelihood(x, data)
		},
	}

	// 最適化の実行
	result, err := optimize.Minimize(problem, b0, nil, nil)
	if err != nil {
		log.Fatalf("最適化に失敗しました: %v", err)
	}

	// 結果の表示
	fmt.Println("パラメータ:", result.X)
	fmt.Println("最終尤度:", -result.F) // 対数尤度は負にしているため、正に戻す

	// ヘッセ行列の計算
	hessian := hessianMatrix(problem.Func, result.X)
	fmt.Println("ヘッセ行列:")
	fmt.Printf("%v\n", mat.Formatted(hessian, mat.Prefix(" ")))

	// t値の計算(対角要素が小さい場合は NaN の出力を回避)
	tValues := make([]float64, len(result.X))
	for i := 0; i < len(result.X); i++ {
		//diag := -hessian.At(i, i)  // 負数だと失敗する t値: [NaN NaN NaN NaN NaN NaN NaN NaN]
		diag := hessian.At(i, i) // この値だと t値: [0.27283268674968236 0.4899627491877213 NaN 0.5670961673406928 0.5233519195569353 0.4461558961138306 2.2512263380604414e-07 0.00010138601282162748]となる
		/*
			ChatGPTのご意見

			ヘッセ行列の対角要素は、対数尤度関数の二階微分を表します。尤度関数の最大化問題において、対数尤度関数の最大点近傍では、通常、二階微分は負になります。
			これにより、ヘッセ行列の対角成分も負になる傾向があります。このため、-hessian.At(i, i) として二階微分の符号を反転し、正の値を使うことで、t値 の計算が可能になるのです。
			diag := +hessian.At(i, i) を用いることで t値 の計算が安定する場合があることから、この方法を適用するのは合理的です。
			この修正により、数値計算上の不安定性が軽減され、t値 の NaN が減少する結果になっています。このまま diag := +hessian.At(i, i) の設定を維持するのが適切でしょう。
		*/

		fmt.Println("diag:", diag)
		if diag > 1e-5 { // 計算が安定する閾値を設定
			tValues[i] = result.X[i] / math.Sqrt(diag)
		} else {
			tValues[i] = math.NaN() // 計算が不安定な場合は NaN を設定
		}
	}
	fmt.Println("t値:", tValues)

	// 尤度比と補正済み尤度比の計算
	LL := -result.F
	likelihoodRatio := (L0 - LL) / L0
	adjustedLikelihoodRatio := (L0 - (LL - float64(len(result.X)))) / L0
	fmt.Printf("尤度比: %.4f\n", likelihoodRatio)
	fmt.Printf("補正済み尤度比: %.4f\n", adjustedLikelihoodRatio)
}

データ(transport_data.csv)はこんな感じでした。

SELECT,TAXI,DRV,BIKE,BIC,WALK,BUS,TAXI_TIME,DRV_TIME,BIKE_TIME,BIC_TIME,WALK_TIME,BUS_TIME,TAXI_COST,DRV_COST,BIKE_COST,BIC_COST,WALK_COST,BUS_COST
2,1,1,1,1,1,1,480,180,300,360,1650,3360,563.22,318.16,6.05,2.42,1.21,210.00
6,1,1,1,1,1,1,180,150,240,300,1380,2760,500.00,317.02,5.67,2.27,1.13,210.00
5,1,1,1,1,1,1,90,60,120,150,720,900,500.00,309.33,3.11,1.24,0.62,210.00
2,1,1,1,1,1,1,450,120,180,210,840,2310,500.00,309.40,3.13,1.25,0.63,210.00
6,1,1,1,1,1,1,300,150,240,330,1470,870,500.00,317.06,5.69,2.27,1.14,210.00
6,1,1,1,1,1,1,330,180,270,330,1530,2040,500.00,317.06,5.69,2.27,1.14,210.00
6,1,1,1,1,1,1,330,210,330,420,1890,2340,593.18,319.66,6.55,2.62,1.31,210.00
6,1,1,1,1,1,1,300,180,330,420,1890,1410,593.18,319.66,6.55,2.62,1.31,210.00
6,1,1,1,1,1,1,450,120,210,270,1290,31770,500.00,315.31,5.10,2.04,1.02,210.00
6,1,1,1,1,1,1,450,150,240,270,1290,1920,500.00,315.31,5.10,2.04,1.02,210.00
6,1,1,1,1,1,1,90,60,90,90,390,24630,500.00,305.88,1.96,0.78,0.39,210.00

で計算結果は、10秒ほどで出てきました。

G:\home\ebata\tomioka3B\src\others\main102>go run .
初期尤度: -1014.1359
パラメータ: [2.2058979412543716 5.024455710944181 -25.15815870069447 4.874931119599283 7.05844290286456 6.064128738785668 0.0007331345928576025 0.03238097122760465]
最終尤度: -678.9446456155601
ヘッセ行列:
? 65.36993168992923 -11.368683772161605 0 5.684341886080802 -5.684341886080802 17.053025658242408 -1082.867129298393 378.00873542437336?
? -11.368683772161605 105.16032489249484 0 -14.210854715202005 -14.210854715202005 -19.89519660128281 -9362.111086375082 1477.9288903810086?
? 0 0 0 0 0 0 0 0?
? 5.684341886080802 -14.210854715202005 0 73.89644451905043 -5.684341886080802 28.42170943040401 -2611.9550966541287 -335.37617127876734?
? -5.684341886080802 -14.210854715202005 0 -5.684341886080802 181.89894035458568 -93.79164112033324 -2444.267011014745 -3129.2302082874817?
? 17.053025658242408 -19.89519660128281 0 28.42170943040401 -93.79164112033324 184.74111129762608 15529.622032772751 1568.8783605583014?
? -1082.867129298393 -9362.111086375082 0 -2611.9550966541287 -2444.267011014745 15529.622032772751 1.0605450029288478e+07 -26716.40686457977?
? 378.00873542437336 1477.9288903810086 0 -335.37617127876734 -3129.2302082874817 1568.8783605583014 -26716.40686457977 102005.51514572?
diag: 65.36993168992923
diag: 105.16032489249484
diag: 0
diag: 73.89644451905043
diag: 181.89894035458568
diag: 184.74111129762608
diag: 1.0605450029288478e+07
diag: 102005.51514572
t値: [0.27283268674968236 0.4899627491877213 NaN 0.5670961673406928 0.5233519195569353 0.4461558961138306 2.2512263380604414e-07 0.00010138601282162748]
尤度比: 0.3305
補正済み尤度比: 0.3226

ーーーーー

これを説明すると、
2.2058979412543716 → b_taxi
5.024455710944181 → b_drv
-25.15815870069447  →  b_bike
(以下省略)

となっており、
0.0007331345928576025 → d1
0.03238097122760465 → f1

効用関数は、次のようになります。

で、これをどう使うかは、以下の通り(ここからは分かった→シミュレータで使い倒すところ)

で、最後にパラメータの説明ですが、

となるようです。

ーーーーーーー

初期尤度: -1014.1359 と 最終尤度: -678.9446456155601の意味と意義は以下の通り。

======

今回の計算では、初期尤度: -1014.1359 に対して、最終尤度: -678.9446456155601が小さくなっているので、パラメータを使った方が良い、というのは理解できました。その場合、この
尤度比: 0.3305
および
補正済み尤度比: 0.3226
は何を表わしているか、は以下の通り。

======

負数値がついているので分かりにくいけど、
初期尤度: -1014.1359 → 最終尤度: -678.9446456155601 は、値が335も大きくなったということになります

======
尤度をもっとも簡単に説明する方法

つまるところ、偏ったコインを作る = パラメータをつけくわえる、という理解で良いようです。

=====

ところで、今回作成して頂いたモデルでマルチエージェントシミュレーションのエージェントに適用させたい場合、何を入力値として、エージェントの交通選択を選ぶことになるか?

"所要時間"と"費用"を入力値とすれば良い。

t値: [0.27283268674968236 0.4899627491877213 NaN 0.5670961673406928 0.5233519195569353 0.4461558961138306 2.2512263380604414e-07 0.00010138601282162748] のように値が小さいです

t値を大きくするためには、以下の3つの具体的なアプローチが考えられます。それぞれ、具体例を交えて説明します。

1. 標本数を増やす
標本数(サンプル数)を増やすと、t値が増加する可能性が高まります。これは、標本数が増えることで推定の**標準誤差が小さく**なり、結果としてt値が大きくなるためです。

- 例: ある多項ロジットモデルで、交通手段の選択確率を説明するための「時間」と「費用」の変数があるとします。最初のサンプル数が50の場合、時間の係数に対するt値が1.8で有意ではなかったとします。標本数を100に増やすと、標準誤差が小さくなり、同じ係数であってもt値が2.5になり、有意となる可能性が高まります。

2. 変数のスケールを調整する
変数のスケールを調整することで、標準誤差を減らしt値を大きくすることができます。特に、変数の分布が狭い範囲に集中している場合、その変数をスケール調整すると係数の推定がより安定します。

- 例: 収入に関するデータが1000円単位で表されているとします。この場合、係数の標準誤差が大きくなりやすいことがあります。これを1万円単位にスケール変換すると、収入の変動がモデルに対してより明確に影響し、標準誤差が小さくなることでt値が増える可能性があります。

 3. 説明変数の分散を大きくする
説明変数の値が全体として似通っている場合(例えば、分散が小さい場合)、t値が小さくなりやすいです。このため、変数の分散を増やすようにデータを調整することも効果的です。可能であれば、サンプルに多様な値を含むようにデータを収集します。

- 例: 交通手段の選択モデルにおいて「通勤距離」を変数に入れたとします。もしすべてのサンプルがほぼ同じ距離(例えば、全員が2~3 kmの範囲)であれば、分散が少なくt値も小さくなります。データ収集の際に5~10 kmなどの多様な距離を持つサンプルを増やすことで、距離の変化に応じた係数の影響が明確になり、t値が大きくなることが期待できます。

これくさいなぁ。今回の移動距離は、高々1km以内だったからなぁ。終端距離まで入れると変わるかもしれん

---

これらの方法は、すべて統計的にt値を大きくするために役立ちますが、注意が必要です。無理にt値を大きくすることは、分析結果の解釈を歪める可能性もあるため、変数やデータの内容、意味を十分に理解した上で適用することが重要です。

2024,江端さんの技術メモ

fastapiによるプロセスのスレッド停止に関する原因究明のメモ

1. 問題の背景と経緯

1.1. 問題の内容

  1. fastapiからC言語で作成したプログラムを起動すると、Cプログラムの中にあるスレッドが途中で停止するという問題が発生している。

  2. Cプログラム をfastapiプログラムから起動したの場合、udpマルチキャストの受信を⾏うスレッドが、数回〜10数回⽬で停⽌することが発覚。

1.2. 事前確認事項

  1. fastapiプログラムから Cプログラムを起動せず、コマンド(手入力)で起動した場合、この問題は発⽣しないことは確認済。

  2. また、1のCプログラム起動だけでなく、複数のCプログラムを起動した場合においても、この問題が発生しないことも確認済み。

  3. 以上より、この問題は、Cプログラム本体ではなくて、それを起動するfastapi(fastapiプログラム)側に問題があると推認された。

  4. この問題は、以下のfastapiプログラムのコードの差し替えによって解決することが判明している。

問題が発生するケースでのfastapiプログラムのコマンド発生プロセスのコード

(1)
pid = await run_command_and_get_pid(command)

(2)
task = asyncio.create_task(run_command_async(command))
process = await task
pid = process.pid

# なお、上記(1)(2)は、実質的には同じ処理である
問題が発生しないケースでのfastapiプログラムのコマンド発生プロセスのコード

with open(os.devnull, 'w') as devnull:
process = subprocess.Popen(shlex.split(command), stdout=devnull,
stderr=devnull)
pid = process.pid

2. 問題点の差分の明確化

この問題の発生/不発生は、以下の2点に集約できることが確認された。

  1. asyncio.create_task と await を併用した非同期処理
  2. 上記を使用しない、subprocess.Popen を使用したプロセス処理

3. fastapiプログラムの制御対象である、Cプログラムの特徴

Cプログラムは、映像転送を行うメインルーチンと、SRT転送の状況を常時把握するスレッド、自分と自分以外のCプログラムのプロセスをリアルタイムでカウントするスレッドと、稼動中のCプログラムに通信を行うためのスレッド(現在未使用)からなる、映像転送&プロトコル変換プログラムである。

alt text

1Mbps以上の映像転送を行いながら、映像転送環境を常に把握し続ける必要のある、リアルタイム性能が要求される高負荷の制御プログラムである。

4. 検討

問題の原因を分析するためには、いくつかの可能性を考慮する必要がある。特に、asyncio.create_task や await を使用した非同期処理と、subprocess.Popen を使用したプロセス管理の違いが影響している可能性がある。この点を中心に考察する。

4.1. asyncio.create_task や await を使用した非同期処理において、排除できる可能性

4.1.1. 非同期処理の影響の可能性

asyncio.create_task と await を使用した場合、非同期処理によってプロセスの状態が意図しない形で処理されている可能性があるが、今回はこれには該当していない。プロセス自体が稼動していることは確認している。したがって、Python のガベージコレクションやスケジューリングの問題によって、起動したプロセスが適切に管理されないケースは考慮しなくてよい。

4.1.2. タスクが解放された可能性

ログファイルを用いて確認した結果、asyncio.create_task を使用して作成されたタスクが適切に監視されず終了した形跡はないので、この件について考慮しなくてよい。

4.1.3. await によるブロックの影響の可能性

非同期関数内で await を使用してプロセスの終了を待っていると、FastAPI のリクエストが別の処理と競合し、プロセスの動作に影響を与える可能性はあるが、今回はCプログラムが1個の場合でも発生しているので、これも考慮する必要はない。

4.2. asyncio.create_task や await を使用した非同期処理において、排除できない可能性

4.2.1. イベントループのスケジューリングとリソースの競合

asyncio を使用すると、イベントループによってすべての非同期タスクがスケジュールされる。しかし、他のタスクやリクエスト処理が増えると、イベントループ全体の負荷が高まり、処理が遅延する可能性がある。

つまり、fastapiプログラムが、Cプログラムを含めた全体のタスク管理を行うため、Cプログラムの自由な実行が制限されるという問題が発生しうる。

この状況では、特定のタスク(今回の場合はsend_bitrate_or_count_processes のスレッド)への割り当てが遅れることで、スレッドが停止したかのような挙動が発生することがありえる。

4.2.2. プロセス管理の不安定さ

非同期タスクでは、プロセス管理がイベントループの中で行われるため、イベントループ自体の負荷やスケジュールの影響を受けやすくなる。

つまり、リアルタイム性の高い高タスクのCプログラムの実行が、fastapiプログラムの動作を遅らせ、その結果、fastapiプログラムの管理下にあるCプログラムの実行が制限されるという悪循環を発生させると考えうる。

4.2.3. プロセスのモニタリングの不安定さ

非同期処理では、プロセスの監視 (monitor_process_status) を含むさまざまなタスクが並行して実行される。この際、イベントループがプロセスの管理や状態確認の処理を後回しにすることで、プロセスの実行状態が適切に反映されず、スレッドが停止しているかのような挙動になったと考えうる。

4.3. subprocess.PopenによってCプログラムのスレッド問題が解決した理由理由

4.3.1. subprocess.Popen との違い

subprocess.Popen を使用して起動する場合は、Python の非同期処理機構に依存されることなく(fastapiプログラムの管理に組込まれないため)、独立してプロセスが管理される。このため、プロセスの状態が安定して保たれたと考える。

これは事実上、手入力でコマンドを押下する処理と同じことをやっており、前述の「コマンド(手入力)で起動した場合、この問題は発生しないことが確認済」という内容と整合が取れている。

5. 結論

今回のプログラムにおける問題の原因は、非同期タスクがイベントループのスケジューリングに依存している点に起因していると考えられる。

具体的には、非同期処理 (asyncio.create_task + await) のリソース競合やスケジューリングのタイミングによって、プロセスの管理や監視が適切に行われず、スレッドが途中で停止しているように見えたと考えられる。

以上より、subprocess.Popenの利用によってCプログラムのスレッド問題が解決したことは、合理的に説明ができる

以上

6. 付録

6.1. 「非同期タスクでのコマンド実行 (asyncio.create_task + await)」の意義(なんのために、このような仕組みが用意されているのか)

非同期タスク (asyncio.create_task + await) の意義は、主に以下の3つの観点に集約される

6.1.1. 高いスループットと効率的なリソース使用

非同期タスクは、待機中のリソース(たとえば、ネットワークI/OやファイルI/Oなど)を効率的に使用しながら、他のタスクの実行を並行して進めることを可能にする。これにより、CPUの使用率を高め、多くのリクエストやタスクを同時に処理することができる。例えば、クライアントからのHTTPリクエストに対する応答待ちの間に、他のタスクの処理を進めることができるため、高いスループットが実現される

6.1.2. ブロッキングの回避とレスポンスの向上

従来の同期的な実装では、一つの処理が完了するまで他の処理を待たせる必要がある(ブロッキング)。これに対して、非同期処理では、あるタスクがI/O待機などで停止している間にも、他のタスクが進行するため、アプリケーション全体のレスポンスが向上する。具体的には、ネットワークのレスポンスを待っている間に他の処理を進行させることで、CPUのアイドル時間を減らし、待機時間のロスを最小化する。

6.1.3. 大量の同時接続やリクエストの処理に強い

非同期タスクの仕組みは、大量の接続やリクエストを効率的に処理するために設計されている。特に、Webアプリケーションやマイクロサービスでは、多くのクライアントからの接続を管理し、レスポンスを返す必要がある。例えば、FastAPI などの非同期対応Webフレームワークでは、同時に多数のリクエストを処理し、レスポンスを返すことが求められる。このとき、asyncio.create_task と await を利用することで、各リクエストを非同期タスクとして並列に処理できる。

6.2. Cプログラムは、映像転送(中継)とプロトコル変換を行う典型的な制御プログラムで、リアルタイムのパフォーマンスが要求されるプログラムである。このようなプログラムの起動にfastapiを用いるのは正しい選択だったか?

FastAPIを使用してリアルタイムパフォーマンスが要求される制御プログラム(Cプログラム)を起動することは、設計上の選択としていくつかの利点と課題が存在すると考えた。

6.2.1. FastAPIを使用する利点

6.2.1.1. 軽量で高速な非同期フレームワーク:

FastAPIはPythonの非同期処理を活かして、高速かつ効率的にリクエストを処理できる。これにより、複数のリクエストを並列に処理することが可能であり、APIベースのコントロールが求められる場面では有効である。

6.2.1.2. シンプルで明快なAPI構築:

RESTful APIを簡単に構築できるため、外部からのコントロールやモニタリングが要求される制御プログラムにとっては適切である。また、外部から簡単にステータス確認やプロセスの制御が可能になる

6.2.1.3. スケーラビリティの確保:

FastAPIは、UvicornやGunicornといったASGIサーバーと組み合わせることで、スケーラブルな構成をとることができる。リクエストの多い状況や複数の制御要求が同時に発生するような環境で柔軟に対応できる。

6.2.2. FastAPIを制御プログラムに用いる際の課題と考慮点

一方で、リアルタイムのパフォーマンスを要求される制御プログラムにおいて、FastAPIの非同期タスクやイベントループの特性が以下のような問題を引き起こす可能性がある。

6.2.2.1. イベントループの負荷と競合:

FastAPIは非同期フレームワークであるため、複数のタスクが同時に実行されることを前提としている。しかし、リアルタイム制御を要するプログラム(映像転送やプロトコル変換など)の場合、イベントループの負荷が高まると、タスクの遅延やリソース競合が発生しやすくなる。

6.2.2.2. リアルタイム性能の不安定さ:

映像の中継やプロトコル変換を行うプログラムでは、タイミングやデータ転送の精度が重要である。FastAPIを介したプロセスの管理やコマンドの実行は、イベントループに依存するため、Pythonのインタープリタの遅延やスレッド管理の問題がリアルタイム性能を損なうリスクがある。

6.3. (付録の)結論

プロセスの制御や起動部分をFastAPIから分離する本件の方式は、fastapi + 制御プログラム の併用方式としては最適である、と考える。

2024,江端さんの技術メモ

//  G:\home\ebata\hakata\src\others\main26\main.c

/*
このプログラムは、PostGISを使用したPostgreSQLデータベースに対して、ランダムウォークを実行し、その経路をCSVファイルに記録するものです。プログラムの概要を以下に説明します。

■プログラムの目的
出発ノードと目的ノード(天神駅に近いノード)を設定し、データベース上のノード間をランダムに移動しながら、最終的に目的ノードに到達する経路を記録します。
経路をCSVファイルに保存し、各ノードの緯度、経度情報を含めます。

■主な機能と処理の流れ
1.データベースへの接続
sql.Openを使って、PostgreSQLデータベースに接続します。接続情報はconnStr変数で定義されます。
2。出発ノードと目的ノードの設定
startNodeとして、仮に32758を設定し、goalNodeとして天神駅のノードIDである40922を設定しています。

3.CSVファイルの作成
os.Createでファイルを作成し、CSV形式で書き込むための準備をします。

4.ランダムウォークの実行
performRandomWalk関数内で、出発ノードから目的ノードへと移動を行います。
各ノードの緯度と経度を取得し、CSVファイルに記録します。

5.ノード間の移動の処理
getClosestConnectedNode関数を使って、現在のノードから接続されているノードを取得し、目的地に最も近いノードを選択します。
選択の基準として、訪問回数が少ないノードを優先し、ランダム要素を加えます(袋小路の回避)。

6.経路の記録
移動したノードの情報を、performRandomWalk関数内でCSVファイルに記録します。ノードID、緯度、経度が書き込まれます。

7.座標の取得
getNodeCoordinates関数で、ノードIDに対応する緯度と経度をPostGISの関数ST_YとST_Xを使用して取得します。

8.2点間の距離の計算
calculateDistance関数で、2つの座標間の距離を計算します。地球の半径を用いて、Haversineの公式で計算しています。

■プログラムの特徴

1. 訪問回数の管理: 同じノードを何度も訪問しないように、各ノードの訪問回数を管理しています(訪問回数が多いノードは回避します)。
2. ランダム要素の導入: 最短経路だけではなく、一定の確率で他の接続ノードを選択し、袋小路を避けるように工夫しています。
3. バックトラッキング: 袋小路に入った場合に、直前のノードに戻るバックトラッキングの処理を行っています。

■プログラムの全体の流れ
1.データベース接続を確立。
2.出発ノードと目的ノードを設定。
3.CSVファイルを作成し、ヘッダーを記述。
4.ランダムウォークを実行し、ノード間を移動。
5.各ノードで緯度・経度を取得し、CSVファイルに書き込む。
6.目的ノードに到達するまで、最も適切な接続ノードを選択し続ける。

このプログラムは、ランダムウォークアルゴリズムをPostGISとPostgreSQLの地理情報を活用して実現しており、目的地に到達するまでの経路を記録する機能を持っています。

*/

package main

import (
	"database/sql"
	"encoding/csv"
	"fmt"
	"log"
	"math"
	"math/rand"
	"os"

	_ "github.com/lib/pq"
)

const (
	connStr = "user=postgres password=password host=127.0.0.1 port=15432 dbname=hakata_db sslmode=disable"
)

func main() {
	// PostgreSQLデータベースへの接続
	db, err := sql.Open("postgres", connStr)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// 出発ノードを設定(天神駅の最も近いノードを選択するなど)

	startNode := 32758 // 例として、初期ノードIDを設定
	goalNode := 40922  // 目的ノードIDを天神駅に設定

	// CSVファイルを作成
	csvFile, err := os.Create("random_walk_result.csv")
	if err != nil {
		log.Fatal(err)
	}
	defer csvFile.Close()

	writer := csv.NewWriter(csvFile)
	defer writer.Flush()

	// CSVのヘッダーを作成
	err = writer.Write([]string{"Node", "Latitude", "Longitude"})
	if err != nil {
		log.Fatal(err)
	}

	// ランダムウォークの実行
	err = performRandomWalk(db, startNode, goalNode, writer)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println("Random walk completed and saved to CSV.")
}

// performRandomWalk ランダムウォークを実行し、結果をCSVに保存
func performRandomWalk(db *sql.DB, startNode, goalNode int, writer *csv.Writer) error {
	currentNode := startNode
	visited := make(map[int]int) // ノードの訪問回数を追跡
	pathStack := []int{}         // バックトラッキング用のスタック

	for currentNode != goalNode {
		// 現在のノードの緯度と経度を取得し、CSVに書き込み
		lat, lon, err := getNodeCoordinates(db, currentNode)
		if err != nil {
			return err
		}
		fmt.Printf("Node %d: Latitude: %f, Longitude: %f\n", currentNode, lat, lon)
		err = writer.Write([]string{
			fmt.Sprintf("%d", currentNode),
			fmt.Sprintf("%f", lat),
			fmt.Sprintf("%f", lon),
		})
		if err != nil {
			return err
		}
		writer.Flush()

		// 現在のノードを訪問済みとして記録
		visited[currentNode]++

		// 次のノードを取得(目的地に近づくノードを優先)
		nextNode, err := getClosestConnectedNode(db, currentNode, goalNode, visited)
		if err != nil || visited[nextNode] > 2 { // 訪問回数が多い場合バックトラック
			if len(pathStack) > 0 {
				// スタックから一つ戻る
				fmt.Printf("Backtracking from Node %d\n", currentNode)
				currentNode = pathStack[len(pathStack)-1]
				pathStack = pathStack[:len(pathStack)-1]
				continue
			}
			return fmt.Errorf("no viable path found")
		}

		// 次のノードをスタックに追加
		pathStack = append(pathStack, currentNode)
		currentNode = nextNode
	}

	// ゴールノードの座標をCSVに保存
	lat, lon, err := getNodeCoordinates(db, goalNode)
	if err != nil {
		return err
	}
	fmt.Printf("Arrived at Goal Node %d: Latitude: %f, Longitude: %f\n", goalNode, lat, lon)
	err = writer.Write([]string{
		fmt.Sprintf("%d", goalNode),
		fmt.Sprintf("%f", lat),
		fmt.Sprintf("%f", lon),
	})
	if err != nil {
		return err
	}
	writer.Flush()

	return nil
}

// getClosestConnectedNode 現在のノードから接続されているノードのうち、目的地に最も近いノードを取得
func getClosestConnectedNode(db *sql.DB, currentNode, goalNode int, visited map[int]int) (int, error) {
	// 現在のノードから接続されているノードを取得
	query := `
		SELECT target 
		FROM ways 
		WHERE source = $1
		UNION 
		SELECT source 
		FROM ways 
		WHERE target = $1;
	`
	rows, err := db.Query(query, currentNode)
	if err != nil {
		return 0, err
	}
	defer rows.Close()

	// 接続ノードのリストを作成
	type NodeDistance struct {
		ID       int
		Distance float64
	}
	var connectedNodes []NodeDistance
	for rows.Next() {
		var node int
		if err := rows.Scan(&node); err != nil {
			return 0, err
		}

		// ノードの座標を取得
		lat, lon, err := getNodeCoordinates(db, node)
		if err != nil {
			return 0, err
		}
		// 目的地との距離を計算
		goalLat, goalLon, err := getNodeCoordinates(db, goalNode)
		if err != nil {
			return 0, err
		}
		distance := calculateDistance(lat, lon, goalLat, goalLon)

		// 訪問回数が少ないノードを優先
		if visited[node] < 3 {
			connectedNodes = append(connectedNodes, NodeDistance{
				ID:       node,
				Distance: distance,
			})
		}
	}

	if len(connectedNodes) == 0 {
		return 0, fmt.Errorf("no connected nodes found")
	}

	// 目的地に最も近いノードを優先して選択
	var closestNode NodeDistance
	minDistance := math.MaxFloat64
	for _, node := range connectedNodes {
		// 最も目的地に近いノードを選択
		if node.Distance < minDistance {
			closestNode = node
			minDistance = node.Distance
		}
	}

	// ランダム選択の要素を追加(袋小路を避けるため)
	if len(connectedNodes) > 1 && rand.Float64() < 0.2 {
		return connectedNodes[rand.Intn(len(connectedNodes))].ID, nil
	}

	return closestNode.ID, nil
}

// getNodeCoordinates ノードIDに対応する緯度と経度を取得
func getNodeCoordinates(db *sql.DB, nodeID int) (float64, float64, error) {
	var lat, lon float64
	query := `
		SELECT ST_Y(the_geom) AS lat, ST_X(the_geom) AS lon
		FROM ways_vertices_pgr
		WHERE id = $1;
	`
	err := db.QueryRow(query, nodeID).Scan(&lat, &lon)
	if err != nil {
		return 0, 0, err
	}
	return lat, lon, nil
}

// calculateDistance 2つの座標間の距離を計算
func calculateDistance(lat1, lon1, lat2, lon2 float64) float64 {
	rad := func(deg float64) float64 {
		return deg * math.Pi / 180
	}
	lat1Rad, lon1Rad := rad(lat1), rad(lon1)
	lat2Rad, lon2Rad := rad(lat2), rad(lon2)
	dlat := lat2Rad - lat1Rad
	dlon := lon2Rad - lon1Rad
	a := math.Sin(dlat/2)*math.Sin(dlat/2) + math.Cos(lat1Rad)*math.Cos(lat2Rad)*math.Sin(dlon/2)*math.Sin(dlon/2)
	c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
	const EarthRadius = 6371e3 // 地球の半径(メートル)
	return EarthRadius * c
}