golang nsq

安装

brew install nsq

nsqd :

负责接收消息,存储队列和将消息发送给客户端,nsqd 可以多机器部署,当你使用客户端向一个topic发送消息时,可以配置多个nsqd地址,消息会随机的 分配到各个nsqd上,nsqd优先把消息存储到内存channel中,当内存channel满了之后,则把消息写到磁盘文件中。他监听了两个tcp端口,一个用来服务客 户端,一个用来提供http的接口 ,nsqd 启动时置顶下nsqlookupd地址即可: 在一个 shell 中,运行 nsqlookupd:

nsqlookupd:

主要负责服务发现 负责nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态

➜nsqlookupd 
[nsqlookupd] 2017/11/27 22:48:11.186029 nsqlookupd v1.0.0-compat (built w/go1.8)
[nsqlookupd] 2017/11/27 22:48:11.186266 TCP: listening on [::]:4160
[nsqlookupd] 2017/11/27 22:48:11.186314 HTTP: listening on [::]:4161

再开启一个 shell,运行 nsqd:

➜  bin ./nsqd --lookupd-tcp-address=127.0.0.1:4160
[nsqd] 2017/11/27 22:49:11.433686 nsqd v1.0.0-compat (built w/go1.8)
[nsqd] 2017/11/27 22:49:11.433755 ID: 280
[nsqd] 2017/11/27 22:49:11.433797 NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2017/11/27 22:49:11.434504 TCP: listening on [::]:4150
[nsqd] 2017/11/27 22:49:11.434596 HTTP: listening on [::]:4151
[nsqd] 2017/11/27 22:49:11.434641 LOOKUP(127.0.0.1:4160): adding peer
[nsqd] 2017/11/27 22:49:11.434653 LOOKUP connecting to 127.0.0.1:4160

再开启第三个 shell,运行 nsqadmin:

➜  bin ./nsqadmin --lookupd-http-address=127.0.0.1:4161
[nsqadmin] 2017/11/27 22:50:18.517989 nsqadmin v1.0.0-compat (built w/go1.8)
[nsqadmin] 2017/11/27 22:50:18.518233 HTTP: listening on [::]:4171

开启第四个 shell,推送一条初始化数据(并且在集群中创建一个 topic):

➜  bin  curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test' 
OK

按照预先设想的,在浏览器中打开 http://127.0.0.1:4171/ 就能查看 nsqadmin 的 UI 界面和队列统计数据。同时,还可以在 /tmp 目录下检查 (test.*.log) 文件.

producer

package main

import (
	"fmt"
	"time"

	"github.com/nsqio/go-nsq"
)

// Producer 生产者
func Producer() {
	producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
	if err != nil {
		fmt.Println("NewProducer", err)
		panic(err)
	}

	i := 1
	for {
		if err := producer.Publish("test", []byte(fmt.Sprintf("Hello World %d", i))); err != nil {
				fmt.Println("Publish", err)
			panic(err)
		}

		time.Sleep(time.Second * 5)

		i++
		fmt.Println(i)
	}
}

func main() {
	Producer()
}

consumer

package main

import (
	"fmt"
	"sync"
	"github.com/nsqio/go-nsq"
)
type NSQHandler struct {
}

func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
	fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
	return nil
}

func testNSQ() {
	waiter := sync.WaitGroup{}
	waiter.Add(1)

	go func() {
		defer waiter.Done()
		config:=nsq.NewConfig()
		config.MaxInFlight=9

		//建立多个连接
		for i := 0; i<10; i++ {
			consumer, err := nsq.NewConsumer("test", "struggle", config)
			if nil != err {
				fmt.Println("err", err)
				return
			}

			consumer.AddHandler(&NSQHandler{})
			err = consumer.ConnectToNSQD("127.0.0.1:4150")
			if nil != err {
				fmt.Println("err", err)
				return
			}
		}
		select{}

	}()

	waiter.Wait()
}
func main() {
	testNSQ();

}