こぼれネット

MATSimの結果をビューアをpythonで作って貰った件

MATSim 連載準備用

ここの記載を参考にしてoutputの情報を作っていうのですが、via-appを使うのが迂遠です。 もっと簡単にMATSimのシミュレーションを動画にする方法はありませんか?

via-app 不要の「最短・自動」動画化スクリプト "animate_matsim.py"を作って貰いました。
これを配置する場所は、
output_events.xml.gz、output_network.xml.gz (MATSimで生成したもの)
2つのファイルが置かれている場所に、

[animate_matsim.py]

import gzip
import xml.etree.ElementTree as ET
from collections import defaultdict

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation

# ========= 設定(あなたの環境に固定) =========
EVENTS_GZ   = "output_events.xml.gz"
NETWORK_GZ  = "output_network.xml.gz"
OUT_MP4     = "matsim_equil.mp4"

FPS = 20
DT  = 1.0          # 1秒刻みで描画
MAX_T = 24*3600    # 24時間
# ============================================

# events の「位置更新に使うタイプ」
POS_TYPES = {"vehicle enters traffic", "entered link"}  # equil の実ログに合わせている

def load_network_midpoints_gz(network_gz):
    """
    output_network.xml.gz を読み、
    linkId -> (midX, midY) の辞書を作る。
    """
    nodes = {}
    link_mid = {}

    with gzip.open(network_gz, "rt", encoding="utf-8") as f:
        for ev, elem in ET.iterparse(f, events=("end",)):
            if elem.tag == "node":
                nid = elem.attrib["id"]
                nodes[nid] = (float(elem.attrib["x"]), float(elem.attrib["y"]))
                elem.clear()
            elif elem.tag == "link":
                lid = elem.attrib["id"]
                fr = elem.attrib["from"]
                to = elem.attrib["to"]
                if fr in nodes and to in nodes:
                    (x0, y0) = nodes[fr]
                    (x1, y1) = nodes[to]
                    link_mid[lid] = ((x0 + x1) / 2.0, (y0 + y1) / 2.0)
                elem.clear()

    return link_mid


def load_position_events(events_gz):
    """
    output_events.xml.gz を iterparse で読み、
    車両(or person)の「リンク入場系イベント」だけ抜き出す。

    戻り値:
      events_by_agent[agentId] = [(time, linkId), ...] (timeで昇順)
      all_times = sorted unique times
    """
    events_by_agent = defaultdict(list)
    all_times = []

    with gzip.open(events_gz, "rt", encoding="utf-8") as f:
        for ev, elem in ET.iterparse(f, events=("end",)):
            if elem.tag != "event":
                elem.clear()
                continue

            etype = elem.attrib.get("type", "")
            if etype not in POS_TYPES:
                elem.clear()
                continue

            # equilでは entered/left は vehicle しか持たないので vehicle を優先
            aid = elem.attrib.get("vehicle") or elem.attrib.get("person")
            t = elem.attrib.get("time")
            lid = elem.attrib.get("link")

            if aid is None or t is None or lid is None:
                elem.clear()
                continue

            t = float(t)
            events_by_agent[aid].append((t, lid))
            all_times.append(t)

            elem.clear()

    # agentごとに時刻順に整列
    for aid in events_by_agent:
        events_by_agent[aid].sort()

    all_times = sorted(set(all_times))
    return events_by_agent, all_times


def build_time_index(events_by_agent):
    """
    アニメーションの高速化のため、
    agentごとに「次に参照すべきイベント位置」を保持する。
    """
    idx = {aid: 0 for aid in events_by_agent.keys()}
    return idx


def main():
    link_mid = load_network_midpoints_gz(NETWORK_GZ)
    events_by_agent, all_times = load_position_events(EVENTS_GZ)

    if not link_mid:
        raise RuntimeError("network の link が読めていません。NETWORK_GZ を確認してください。")
    if not events_by_agent:
        raise RuntimeError(
            "位置更新イベント(vehicle enters traffic / entered link)が読めていません。"
            "events の type を確認してください。"
        )

    # 描画範囲(networkから)
    xs = [v[0] for v in link_mid.values()]
    ys = [v[1] for v in link_mid.values()]
    xmin, xmax = min(xs), max(xs)
    ymin, ymax = min(ys), max(ys)

    fig, ax = plt.subplots()
    ax.set_xlim(xmin - 50, xmax + 50)
    ax.set_ylim(ymin - 50, ymax + 50)
    ax.set_aspect("equal")

    scat = ax.scatter([], [], s=10)

    # アニメーション時刻列を作る(0〜MAX_TをDT刻み)
    times = []
    t = 0.0
    while t <= MAX_T:
        times.append(t)
        t += DT

    agents = list(events_by_agent.keys())
    idx = build_time_index(events_by_agent)

    # 現在位置(未出発は None)
    current_pos = {aid: None for aid in agents}

    def update(frame_idx):
        tnow = times[frame_idx]

        # 各agentについて、tnowまでに起きた entered link を順に反映
        for aid in agents:
            evs = events_by_agent[aid]
            i = idx[aid]
            while i < len(evs) and evs[i][0] <= tnow:
                _, lid = evs[i]
                if lid in link_mid:
                    current_pos[aid] = link_mid[lid]
                i += 1
            idx[aid] = i

        pts = [p for p in current_pos.values() if p is not None]

        if pts:
            scat.set_offsets(pts)
        else:
            scat.set_offsets(np.empty((0, 2)))

        ax.set_title(f"t = {tnow/3600:.2f} h")
        return scat,

    ani = animation.FuncAnimation(
        fig,
        update,
        frames=len(times),
        interval=1000 / FPS,
        blit=False
    )

    ani.save(OUT_MP4, fps=FPS)
    print("saved:", OUT_MP4)


if __name__ == "__main__":
    main()

として、

python3 animate_matsim.py

とすると、
matsim_equil.mp4
という動画ファイルができます。  但し、かなり時間がかかります。止っているのか、と思えるくらいには。

止っているかを確認する方法としては、

$ top
で、

が、出てくれば動いていると思います。
または、

$ ps aux | grep ffmpeg
で、
ffmpeg が動いている → 動画生成中(正常)
ffmpeg が一切いない → events→trajectory 作成中 or 停止中

「点が出る時刻が存在するか」を一発確認する方法としては、以下を使えます。

python3 - <<'PY'
import gzip, xml.etree.ElementTree as ET
POS_TYPES = {"vehicle enters traffic", "entered link"}
times = []
with gzip.open("output_events.xml.gz","rt",encoding="utf-8") as f:
    for ev, elem in ET.iterparse(f, events=("end",)):
        if elem.tag=="event" and elem.attrib.get("type") in POS_TYPES:
            times.append(float(elem.attrib["time"]))
        elem.clear()
print("pos-events:", len(times))
print("min time:", min(times))
print("max time:", max(times))
PY

これで、

pos-events: 727444 (描画対象の更新回数)
min time: 28800.0  (8.00 時スタート)
max time: 92526.0 (25.07 時 終了)

ということが分かります。

 

で、今も計算中です。出てきたら動画ファイル張りつけますね。

モバイルバージョンを終了