Goの複数のgoroutineに対して、一斉ブロードキャストを行いたい

2022年3月9日

https://teratail.com/questions/370090?nli=619ad631-d4fc-405f-a649-44670a040506#reply-502318

もの凄く助かりました。

nobonoboさんに心からの感謝を

C:\Users\ebata\goga\3-10

/*
いっせいブロードキャストをするのに一般的に使われているのはPubSubと呼ばれる方式です。
サブスクライブを複数あらかじめおこなっておき、パブリッシュでメッセージを送ると
複数のサブスクライブ先に同じメッセージを配信するというものです。
おそらくこの方式は発見済みで、想像するに複数のサブスクライブ先をループで巡って
複数回送信することでブロードキャスト相当を実現するのではなく、もっと真に
ブロードキャストしたいということが質問者さんの意図なのかなと。

そういうものを実現するのに「sync.Cond」という標準ライブラリ機能があります。
これの活用方法は実はちゃんとした実例が見つけにくいです。たいてい前者のやり方で済まして
しまっているのと、sync.Condの挙動は若干わかりづらいです。

すこし解説は端折りますが、以下のように記述することで実現できると思います。

ポイントは

タイミングだけをsync.CondのBroadcastで伝える
複数のタスクには共有メモリを通して渡したいメッセージを伝えます
送る方も受ける方も排他ロックを併用するのがCondの使い方でロック期間であれば
共有メモリをコンフリクトなくアクセスできます

この方法はPubSubにくらべ、共有メモリをすべてのgoroutineタスクに伝播したか
どうかを保証する仕組みがないです
つまり、この方法は「低頻度のイベントを大量のタスクに配信」するか、もしくは
「最新の値さえ受け取れればOK」という用途向けです。
*/

package main

import (
	"fmt"
	"log"
	"sync"
	"time"
)

type BroadCaster struct {
	cond *sync.Cond
	id   int64
	msg  string
}

func (bc *BroadCaster) Send(msg string) {
	bc.cond.L.Lock()
	defer bc.cond.L.Unlock()
	bc.id++
	bc.msg = msg
	bc.cond.Broadcast()
}

func (bc *BroadCaster) Recv(last int64) (int64, string) {
	bc.cond.L.Lock()
	defer bc.cond.L.Unlock()
	for bc.id == last {
		bc.cond.Wait()
	}
	return bc.id, bc.msg
}

var (
	broadcaster = &BroadCaster{
		cond: sync.NewCond(&sync.Mutex{}),
	}
)

func task(i int) {
	log.Println("task:", i, " start")
	defer log.Println("task:", i, " stop")
	last := int64(0)
	for {
		id, msg := broadcaster.Recv(last)
		last = id
		log.Println("task:", i, msg)
	}
}

func main() {
	for i := 0; i < 3; i++ {
		go task(i)
	}
	for i := 0; i < 3; i++ {
		time.Sleep(1 * time.Second)
		broadcaster.Send(fmt.Sprintf("hello, world: %d", i))
	}
	time.Sleep(1 * time.Second)
}

Keyword sync.Cond, sync.Broadcast,

参考文献: sync.Cond/コンディション変数についての解説

2022年3月9日2021/11,江端さんの技術メモ

Posted by ebata