资讯

精准传达 • 有效沟通

从品牌网站建设到网络营销策划,从策略到执行的一站式服务

关于redigo中PubSub的一点小坑分析

前言

网站建设、成都网站建设的关注点不是能为您做些什么网站,而是怎么做网站,有没有做好网站,给成都创新互联一个展示的机会来证明自己,这并不会花费您太多时间,或许会给您带来新的灵感和惊喜。面向用户友好,注重用户体验,一切以用户为中心。

最近在用 golang 做一些 redis 相关的操作,选用了 redigo 这个第三方库。然后在使用 Pub/Sub 的时候,却发现了一个小坑……

Redis Client

首先,我们来初始化一个带连接池的 Redis Client:

import (
	"github.com/gomodule/redigo/redis"
)

type RedisClient struct {
	pool *redis.Pool
}

func NewRedisClient(addr string, db int, passwd string) *RedisClient {
	pool := &redis.Pool{
		MaxIdle:  10,
		IdleTimeout: 300 * time.Second,
		Dial: func() (redis.Conn, error) {
			c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))
			if err != nil {
				return nil, err
			}
			return c, nil
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			if time.Since(t) < time.Minute {
				return nil
			}
			_, err := c.Do("PING")
			return err
		},
	}
	log.Printf("new redis pool at %s", addr)
	client := &RedisClient{
		pool: pool,
	}
	return client
}

Publish

然后我们可以简单的实现一个 publish 方法:

func (r *RedisClient) Publish(channel, message string) (int, error) {
	c := r.pool.Get()
	defer c.Close()
	n, err := redis.Int(c.Do("PUBLISH", channel, message))
	if err != nil {
		return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
	}
	return n, nil
}

Subscribe

接下来就是一个稍微复杂点的带有心跳的 subscribe 方法:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done <- fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done <- err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done <- nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := <-done:
			return err
		case <-tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}

最后,我们写一个简单地 main 函数来调用 publish & subscribe:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
	psc := redis.PubSubConn{Conn: r.pool.Get()}
	defer psc.Close()
	log.Printf("redis pubsub subscribe channel: %v", channel)
	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
		return err
	}
	done := make(chan error, 1)
	// start a new goroutine to receive message
	go func() {
		for {
			switch msg := psc.Receive().(type) {
			case error:
				done <- fmt.Errorf("redis pubsub receive err: %v", msg)
				return
			case redis.Message:
				if err := consume(msg); err != nil {
					done <- err
					return
				}
			case redis.Subscription:
				if msg.Count == 0 {
					// all channels are unsubscribed
					done <- nil
					return
				}
			}
		}
	}()

	// health check
	tick := time.NewTicker(time.Minute)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			if err := psc.Unsubscribe(); err != nil {
				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
			}
			return nil
		case err := <-done:
			return err
		case <-tick.C:
			if err := psc.Ping(""); err != nil {
				return err
			}
		}
	}

	return nil
}

咋一看之下,好像并没有什么异常?然而,如果我们这时候去看 redis 的 tcp 连接,就可以发现一些猫腻:

$sudo netstat -antp | grep redis
tcp  0  0 0.0.0.0:6379   0.0.0.0:*    LISTEN  940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55010  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55015  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55009  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55005  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55012  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55011  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55013  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55007  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55006  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:55014  ESTABLISHED 940/redis-server 0. 
tcp  0  0 172.16.8.128:6379  172.16.8.1:54972  ESTABLISHED 940/redis-server 0. 

竟然是每一次 subscribe 就新建了一个连接,而 connection pool 似乎没有什么作用。

更进一步地调试,我们发现在 defer psc.Close() 的时候就卡住了,也就是上面的 10 个 goroutine 其实并没有正常退出。

Concurrent

排查许久之后,终于定位到了问题!引用 redigo 的说明:

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

也就是说,虽然一个连接可以在不同的 goroutine 并发调用 Receive() 和 Subscribe()(subscribe调用了send和flush) ,但是却不能再有其他并发操作(比如 Close())。

其他相似的问题还可以参考 issue

Fix

知道了上面的原因之后,我们稍微修改一下 defer psc.Close() 的位置即可解决问题:

	// start a new goroutine to receive message
	go func() {
		// IMPORTANT!
		defer psc.Close()
		for {
			switch msg := psc.Receive().(type) {
			case error:

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对创新互联的支持。


分享文章:关于redigo中PubSub的一点小坑分析
网站路径:http://cdkjz.cn/article/ghpgjs.html
多年建站经验

多一份参考,总有益处

联系快上网,免费获得专属《策划方案》及报价

咨询相关问题或预约面谈,可以通过以下方式与我们联系

大客户专线   成都:13518219792   座机:028-86922220