2つのクロック(goroutine)を用意して、異なるエージェントで受けとれるかどうかの実験

Go言語で、redisを使って2つの型の異なるデータをブロードキャストしている場合、その受信している部分を1つのswitchで受けとるにはどうしたら良いですか

を、異なるエージェントで、異なるメッセージを受信できるか試してみた件。

 

// C:\Users\ebata\tomioka3B\src\others\main28.go
// 2つのクロック(goroutine)を用意して、異なるエージェントで受けとれるかどうかの実験

package main

import (
	"encoding/json"
	"fmt"
	"sync"
	"time"

	"github.com/gomodule/redigo/redis"
)


type Clock_Info struct {
	VirtualTime time.Time
	RealTime    time.Time
}

type SubClockInfo_2 struct {
    // 異なるデータ型のフィールドをここに追加
    SomeField string
    AnotherField int
}

func BaseClock() {

	// 接続
	conn, err := redis.Dial("tcp", "localhost:6379")
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	// スタート時刻を指定
	startTime := time.Date(2023, 10, 1, 7, 0, 0, 0, time.UTC)

	// 1秒値を保持する変数
	seconds := 0

	var ci Clock_Info

	// ループを開始
	for {
		// 現在の時刻を計算
		ci.VirtualTime = startTime.Add(time.Duration(seconds) * time.Second)
		ci.RealTime = time.Now()

		// 現在の時刻を表示
		// fmt.Println("シミュレータの時刻:", ci.VirtualTime.Format("2006/01/02 15:04:05"))
		// fmt.Println("現在の時刻:", ci.RealTime.Format("2006/01/02 15:04:05")) // "2006/01/02 15:04:05"はフォーマットの形を真似るもので、内容に意味なし

		// パブリッシュ
		json_ci, _ := json.Marshal(ci)
		r, err := redis.Int(conn.Do("PUBLISH", "ClockInfo_1", json_ci))
		if err != nil {
			panic(err)
		}
		fmt.Println(r)

		// 5秒待つ (実際は、0.05秒くらいだが、確認用に長くしている)
		time.Sleep(5000 * time.Millisecond)

		// 1秒値を増加させる
		seconds++
	}
}


func SubClock() {  // 実験用に追加(時間ではなく、単なる文字列と数値を送り込むだけ)

	// 接続
	conn, err := redis.Dial("tcp", "localhost:6379")
	if err != nil {
		panic(err)
	}
	defer conn.Close()


	// 1秒値を保持する変数
	seconds := 0

	var sci2 SubClockInfo_2

	// ループを開始
	for {
		// 現在の時刻を計算
		sci2.SomeField = "ebata is great"
    	sci2.AnotherField = seconds

		// パブリッシュ
		json_sci2, _ := json.Marshal(sci2)
		r, err := redis.Int(conn.Do("PUBLISH", "SubClockInfo_2", json_sci2))
		if err != nil {
			panic(err)
		}
		fmt.Println(r)

		// 7秒待つ (実際は、0.05秒くらいだが、確認用に長くしている)
		time.Sleep(7000 * time.Millisecond)

		// 1秒値を増加させる
		seconds += 1 
	}
}



func person_1(person_num int, wg *sync.WaitGroup) {
	defer wg.Done()
	// 接続
	conn, err := redis.Dial("tcp", "localhost:6379")
	if err != nil {
		panic(err)

	}
	defer conn.Close()

	psc := redis.PubSubConn{Conn: conn}
	psc.Subscribe("ClockInfo_1") // 2つに増やした

	for {	
		switch v := psc.Receive().(type) { // redisのメッセージを受けとると、ここでロックが外れる

		case redis.Message:

			switch v.Channel{

			case "ClockInfo_1":  // ブロードキャスト"ClockInfo_1"のメッセージは、こっちでキャッチ
				ci := new(Clock_Info) 
        		_ = json.Unmarshal(v.Data, &ci)
        		fmt.Println("Person_1:", person_num, "VirtualTime (ClockInfo_1):", ci.VirtualTime)

			case "SubClockInfo_2": // ブロードキャスト"SubClockInfo_2"のメッセージは、こっちでキャッチ
        		subClockData := new(SubClockInfo_2)
        		_ = json.Unmarshal(v.Data, &subClockData)
        		fmt.Println("Person_1:", person_num, "SomeField (SubClockInfo_2):", subClockData.SomeField)
         		fmt.Println("Person_1:", person_num, "AnotherField (SubClockInfo_2):", subClockData.AnotherField)


    	}
		
		case redis.Subscription:
			fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)

		case error:
			return
		}
	}

}

func person_2(person_num int, wg *sync.WaitGroup) {
	defer wg.Done()
	// 接続
	conn, err := redis.Dial("tcp", "localhost:6379")
	if err != nil {
		panic(err)

	}
	defer conn.Close()

	psc := redis.PubSubConn{Conn: conn}
	psc.Subscribe("SubClockInfo_2") // 2つに増やした

	for {	
		switch v := psc.Receive().(type) { // redisのメッセージを受けとると、ここでロックが外れる

		case redis.Message:

			switch v.Channel{


			case "ClockInfo_1":  // ブロードキャスト"ClockInfo_1"のメッセージは、こっちでキャッチ
				ci := new(Clock_Info) 
        		_ = json.Unmarshal(v.Data, &ci)
        		fmt.Println("Person_2:", person_num, "VirtualTime (ClockInfo_1):", ci.VirtualTime)

			case "SubClockInfo_2": // ブロードキャスト"SubClockInfo_2"のメッセージは、こっちでキャッチ
        		subClockData := new(SubClockInfo_2)
        		_ = json.Unmarshal(v.Data, &subClockData)
        		fmt.Println("Person_2:", person_num, "SomeField (SubClockInfo_2):", subClockData.SomeField)
         		fmt.Println("Person_2:", person_num, "AnotherField (SubClockInfo_2):", subClockData.AnotherField)
    	}
		
		case redis.Subscription:
			fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)

		case error:
			return
		}
	}

}


func main() {

	wg := sync.WaitGroup{}

	//wg.Add(1)
	//go BaseClock(&wg)
	go BaseClock()
	go SubClock()

	for i := 0; i < 5; i++ { // 5人
		wg.Add(1)
		go person_1(i, &wg)
	}

	for i := 0; i < 5; i++ { // 5人
		wg.Add(1)
		go person_2(i, &wg)
	}


	wg.Wait()
	fmt.Println("end of ... main()")
}

ちゃんと動くみたいです。

2023,江端さんの技術メモ

Posted by ebata