Goの複数のgoroutineに対して、一斉ブロードキャストを行いたい

https://teratail.com/questions/370090?nli=619ad631-d4fc-405f-a649-44670a040506#reply-502318

もの凄く助かりました。

nobonoboさんに心からの感謝を

C:\Users\ebata\goga\3-10

 

/*
いっせいブロードキャストをするのに一般的に使われているのはPubSubと呼ばれる方式です。

サブスクライブを複数あらかじめおこなっておき、パブリッシュでメッセージを送ると複数のサブスクライブ先に同じメッセージを配信するというものです。

おそらくこの方式は発見済みで、想像するに複数のサブスクライブ先をループで巡って複数回送信することでブロードキャスト相当を実現するのではなく、もっと真にブロードキャストしたいということが質問者さんの意図なのかなと。

そういうものを実現するのに「sync.Cond」という標準ライブラリ機能があります。
これの活用方法は実はちゃんとした実例が見つけにくいです。たいてい前者のやり方で済ましてしまっているのと、sync.Condの挙動は若干わかりづらいです。

すこし解説は端折りますが、以下のように記述することで実現できると思います。

ポイントは

タイミングだけをsync.CondのBroadcastで伝える
複数のタスクには共有メモリを通して渡したいメッセージを伝えます
送る方も受ける方も排他ロックを併用するのがCondの使い方でロック期間であれば共有メモリをコンフリクトなくアクセスできます

この方法はPubSubにくらべ、共有メモリをすべてのgoroutineタスクに伝播したかどうかを保証する仕組みがないです
つまり、この方法は「低頻度のイベントを大量のタスクに配信」するか、もしくは「最新の値さえ受け取れればOK」という用途向けです。
*/

package main

import (
	"fmt"
	"log"
	"sync"
	"time"
)

type BroadCaster struct {
	cond *sync.Cond
	id   int64
	msg  string
}

func (bc *BroadCaster) Send(msg string) {
	bc.cond.L.Lock()
	defer bc.cond.L.Unlock()
	bc.id++
	bc.msg = msg
	bc.cond.Broadcast()
}

func (bc *BroadCaster) Recv(last int64) (int64, string) {
	bc.cond.L.Lock()
	defer bc.cond.L.Unlock()
	for bc.id == last {
		bc.cond.Wait()
	}
	return bc.id, bc.msg
}

var (
	broadcaster = &BroadCaster{
		cond: sync.NewCond(&sync.Mutex{}),
	}
)

func task(i int) {
	log.Println("task:", i, " start")
	defer log.Println("task:", i, " stop")
	last := int64(0)
	for {
		id, msg := broadcaster.Recv(last)
		last = id
		log.Println("task:", i, msg)
	}
}

func main() {
	for i := 0; i < 3; i++ {
		go task(i)
	}
	for i := 0; i < 3; i++ {
		time.Sleep(1 * time.Second)
		broadcaster.Send(fmt.Sprintf("hello, world: %d", i))
	}
	time.Sleep(1 * time.Second)
}

 

2021/11,江端さんの技術メモ

Posted by ebata