Redisのpsc.ReceiveWithTimeoutの使い方が、分かりません

redisのブロードキャストで構造体データを運ぶ時の注意(というか、golangのキャストがC/C++みたいに単純でない件)

で記載していますが、Redisのブロードキャストを大変「ありがたく」使っているのですが、

/switch v := psc.Receive().(type) {

では、受信でロックしてしまうので、これを破る(Break)する方法を探して、以下の方法を見つけたのですが

switch v := psc.ReceiveWithTimeout(5 * time.Second).(type) {

これが、上手く動きません。

// go run sub.go
// goga\1-9-6\others\sub2.go

package main

import (
	"encoding/json"
	"fmt"
	"time"

	"github.com/gomodule/redigo/redis"
)

type Ch5_info struct {
	Bus_num int     // バスの番号
	CC      int     //  1: 座標情報 2: 停車位置情報
	Present int     // 停車位置番号
	Lat     float64 //
	Lon     float64 //
}

func main() {
	// 接続
	conn, err := redis.Dial("tcp", "localhost:6379")
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	psc := redis.PubSubConn{Conn: conn}
	psc.Subscribe("channel_1")

	for {
		c5i := new(Ch5_info)

		//switch v := psc.Receive().(type) {
		switch v := psc.ReceiveWithTimeout(5 * time.Second).(type) {
		case redis.Message:
			fmt.Printf("%s: message: %s\n", v.Channel, v.Data)

			_ = json.Unmarshal(v.Data, &c5i)

			// 試しに2つほど出力してみる
			fmt.Println(c5i.Bus_num)
			fmt.Println(c5i.Lon)

		case redis.Subscription:
			fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
		case error:
			fmt.Println("After 10 seconds. out of switch")
			time.Sleep(time.Millisecond * 1000)
			//psc.Conn.Flush()
			//psc.Subscribe("channel_1")
		}

		//		fmt.Println("out of switch")

		//if c5i.CC == -1 {
		//	break
		//}

	}

	fmt.Println("Catch break")
}

"case error"
これで、5秒を経過すると"case error"に飛んでしまい、その後、forループで戻しても、常に"case error"に行ってしまいます。

どなたか、5後にswitchを抜けた後、また同じようにswitchで5秒を待つようにする方法をご存知の方、御一報下されば大変助かります。

ちなみに、参考までに、データ配送側のプログラムはこんな感じです(1回配送、走り切りです)。

// go run pub.go
// goga\1-9-6\others\pub.go
package main

import (
	"encoding/json"
	"fmt"

	"github.com/gomodule/redigo/redis"
)

type Ch5_info struct {
	Bus_num int     // バスの番号
	CC      int     //  1: 座標情報 2: 停車位置情報
	Present int     // 停車位置番号
	Lat     float64 //
	Lon     float64 //
}

func main() {
	// 接続
	conn, err := redis.Dial("tcp", "localhost:6379")
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	c5i := new(Ch5_info)

	c5i.Bus_num = 1
	c5i.CC = -1
	c5i.Present = 23
	c5i.Lat = 12.34
	c5i.Lon = 56.78

	json_c5i, _ := json.Marshal(c5i)

	// パブリッシュ
	r, err := redis.Int(conn.Do("PUBLISH", "channel_1", json_c5i))
	if err != nil {
		panic(err)
	}
	fmt.Println(r)
}

 

 

2022/08,江端さんの技術メモ

Posted by ebata