go channelでマルチキャスト、ブロードキャストを何とかできないか

2021年11月25日

=======================

https://github.com/SierraSoftworks/multicast

マルチキャストでは、チャンネルをリスナーのリンクリストとして実装しています。リスナーは、受信したメッセージを次のリスナーに自動的に転送してから、自分のCチャンネルで公開します。

このアプローチにより、チャネルに属するリスナーの配列を維持する必要がなくなり、実装が大幅に簡素化されます。

=======================

以下は、readme.mdに記載されているプログラムを、ちょっとだけ改造したもの(syncを加えた)

package main

import (
	"fmt"
	"sync"

	"github.com/SierraSoftworks/multicast"
)

func main() {
	c := multicast.New()
	wg := sync.WaitGroup{}

	wg.Add(2)

	go func() {
		l := c.Listen()
		wg.Done()
		defer wg.Done()
		for msg := range l.C {
			fmt.Printf("Listener 1: %s\n", msg)
		}
		fmt.Println("Listener 1 Closed")
	}()

	go func() {
		l := c.Listen()
		wg.Done()
		defer wg.Done()
		for msg := range l.C {
			fmt.Printf("Listener 2: %s\n", msg)
		}
		fmt.Println("Listener 2 Closed")
	}()

	wg.Wait()
	wg.Add(2)

	c.C <- "1 Hello World!"
	c.C <- "2 Hello World!"
	c.C <- "3 Hello World!"
	c.C <- "4 Hello World!"

	c.Close()

	wg.Wait()
}

 

ebata@DESKTOP-P6KREM0 MINGW64 ~/goga/3-7
$ go run main.go
Listener 1: 1 Hello World!
Listener 1: 2 Hello World!
Listener 2: 1 Hello World!
Listener 2: 2 Hello World!
Listener 2: 3 Hello World!
Listener 2: 4 Hello World!
Listener 2 Closed
Listener 1: 3 Hello World!
Listener 1: 4 Hello World!
Listener 1 Closed

久しぶりに、「世界に感謝を」という気持ちになりました。

https://github.com/SierraSoftworks/multicast/blob/main/channel.goの、コメントに日本語コメント(翻訳)を付けたもの

package multicast

import "sync"

// Channel represents a multicast channel container which provides
// a writable channel C and allows multiple listeners to be connected.
// Channelは、書き込み可能なチャンネルCを提供し、複数のリスナーの接続を可能にする
// マルチキャストチャンネルコンテナを表します。

type Channel struct {
	// C is a writable channel on which you can send messages which will
	// be delivered to all connected listeners.
	// Cは書き込み可能なチャンネルで、接続されているすべてのリスナーに配信される
        // メッセージを送ることができます。

	C chan<- interface{}

	c chan interface{}
	l *Listener
	m sync.Mutex
}

// New creates a new multicast channel container which can have listeners
// connected to it and messages sent via its C channel property.
// このコンテナにはリスナーが接続され、C channelプロパティでメッセージが送信されます。

func New() *Channel {
	c := make(chan interface{})

	return From(c)
}

// From creates a new multicast channel which exposes messages it receives
// on the provided channel to all connected listeners.
// Fromは、新しいマルチキャストチャネルを作成し、提供されたチャネルで受信した
// メッセージを、接続されているすべてのリスナーに公開します。

func From(c chan interface{}) *Channel {
	return &Channel{
		C: c,
		c: c,
	}
}

// Listen returns a new listener instance attached to this channel.
// Each listener will receive a single instance of each message sent
// to the channel.
// Listenは、このチャンネルに接続された新しいリスナーのインスタンスを返します。
//各リスナーは、チャンネルに送信された各メッセージのインスタンスを1つずつ受信します。

func (c *Channel) Listen() *Listener {
	c.m.Lock()
	defer c.m.Unlock()

	if c.l == nil {
		c.l = NewListener(c.c)
	} else {
		c.l = c.l.Chain()
	}

	return c.l
}

// Close is a convenience function for closing the top level channel.
// You may also close the channel directly by using `close(c.C)`.
// Closeは、トップレベルのチャンネルを閉じるための便利な関数です。
// close(c.C)`を使ってチャンネルを直接閉じることもできます。

func (c *Channel) Close() {
	c.m.Lock()
	defer c.m.Unlock()

	close(c.c)
}

 

https://github.com/SierraSoftworks/multicast/blob/main/listener.goの、コメントに日本語コメント(翻訳)を付けたもの

package multicast

// Listener represents a listener which will receive messages
// from a channel.
// リスナーは、チャンネルからのメッセージを受信するリスナーを表します。
type Listener struct {
	C <-chan interface{}
	f chan interface{}
}

// NewListener creates a new listener which will forward messages
// it receives on its f channel before exposing them on its C
// channel.
// NewListenerは、Cチャンネルで公開する前に、fチャンネルで受信したメッセージを
// 転送する新しいリスナーを作成します。

// You will very rarely need to use this method directly in your
// applications, prefer using From instead.
// アプリケーションでこのメソッドを直接使用することはほとんどないと思いますが、
// 代わりにFromを使用してください。

func NewListener(source <-chan interface{}) *Listener {
	out := make(chan interface{}, 0)
	l := &Listener{
		C: out,
	}

	go func() {
		for v := range source {
			if l.f != nil {
				l.f <- v
			}
			out <- v
		}

		if l.f != nil {
			close(l.f)
		}
		close(out)
	}()

	return l
}

// Chain is a shortcut which updates an existing listener to forward
// to a new listener and then returns the new listener.
// Chainは、既存のリスナーを更新して新しいリスナーに転送し、
// その新しいリスナーを返すショートカットです。

// You will generally not need to make use of this method in your
// applications.
// 一般的には、このメソッドを使用する必要はありません。

func (l *Listener) Chain() *Listener {
	f := make(chan interface{})
	l.f = f
	return NewListener(f)
}

 

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/SierraSoftworks/multicast"
)

func bus_func(i int, wg *sync.WaitGroup, ch *multicast.Channel) {

	l := ch.Listen()
	wg.Done()
	defer wg.Done()

	/*
		// これだといいんだけど、多面待ちができない
		for msg := range l.C {
			fmt.Printf("Listener %d: %s\n", i, msg)
		}

	*/

	for {
		select {
		case v := <-l.C:
			if v == nil {
				return
			}
			fmt.Println(i, v)
		}
		//time.Sleep(1 * time.Second)
	}

	//fmt.Printf("Listener %d Closed\n", i)
}

func main() {
	c := multicast.New()
	wg := sync.WaitGroup{}

	/*
		wg.Add(2)
		go func() {
			l := c.Listen()
			wg.Done()
			defer wg.Done()
			for msg := range l.C {
				fmt.Printf("Listener 1: %s\n", msg)
			}
			fmt.Println("Listener 1 Closed")
		}()

		go func() {
			l := c.Listen()
			wg.Done()
			defer wg.Done()
			for msg := range l.C {
				fmt.Printf("Listener 2: %s\n", msg)
			}
			fmt.Println("Listener 2 Closed")
		}()
	*/

	for i := 0; i < 3; i++ {
		wg.Add(1)
		go bus_func(i, &wg, c)
	}

	wg.Wait()
	wg.Add(2)

	c.C <- "1 Hello World!"
	time.Sleep(time.Second)

	c.C <- "2 Hello World!"
	time.Sleep(time.Second)

	c.C <- "3 Hello World!"
	time.Sleep(time.Second)
	c.C <- "4 Hello World!"

	c.Close()

	wg.Wait()
}

 

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/SierraSoftworks/multicast"
)

func bus_func(i int, wg *sync.WaitGroup, ch *multicast.Channel) {

	l := ch.Listen()
	wg.Done()
	defer wg.Done()

	/*
		// これだといいんだけど、多面待ちができない
		for msg := range l.C {
			fmt.Printf("Listener %d: %s\n", i, msg)
		}

	*/

	tick := time.Tick(time.Millisecond * 10000)

	for {
		select {
		case v := <-l.C:
			if v == nil { // c.Close() で goroutineをクローズする
				return
			}
			fmt.Println(i, v)

		case t := <-tick:
			// ここに送信するものを入れる
			fmt.Println("tiched", t)
		}

		//time.Sleep(1 * time.Second)
	}

	//fmt.Printf("Listener %d Closed\n", i)
}


func main() {
	c := multicast.New()
	wg := sync.WaitGroup{}

	/*
		wg.Add(2)
		go func() {
			l := c.Listen()
			wg.Done()
			defer wg.Done()
			for msg := range l.C {
				fmt.Printf("Listener 1: %s\n", msg)
			}
			fmt.Println("Listener 1 Closed")
		}()

		go func() {
			l := c.Listen()
			wg.Done()
			defer wg.Done()
			for msg := range l.C {
				fmt.Printf("Listener 2: %s\n", msg)
			}
			fmt.Println("Listener 2 Closed")
		}()
	*/

	for i := 0; i < 5; i++ { // ここの"5"は  (Aに飛ぶ)
		wg.Add(1)
		go bus_func(i, &wg, c)
	}

	wg.Wait()

	wg.Add(5) // (Aから)ここの"5"と一致している必要がある(が理由が分からん)

	c.C <- "11111!"
	//time.Sleep(time.Second)

	c.C <- "22222!"
	//time.Sleep(time.Second)

	c.C <- "33333!"
	//time.Sleep(time.Second)

	c.C <- "44444!"
	//time.Sleep(3 * time.Second)

	c.Close() // これでスレッドを全部止める

	wg.Wait()
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

2021年11月25日2021/11,江端さんの技術メモ

Posted by ebata