用 ETCD + NSQ 实现分布式数据分片处理(Golang)

数据分片处理是提高服务数据处理速度的重要手段,而多服务分布式数据分片处理则在现代的云服务时代优势性更大,你不但可以通过这样的处理减少单体服务压力,提高数据处理速度,还可以进行服务的 动态横向拓展,在你的服务处理数据有压力时再不进行暂停当前服务的同时对服务进行动态扩展,从而提高处理性能,避免出现数据积压服务处理不及时的重大问题。

为什么要做分布式数据分片处理

主要是因为可以动态扩容,试想一个大数据量高并发而且数据量会动态变化的项目你设计的服务不支持横向扩容会出现什么样可怕的问题,数据积压,丢失数据,你这个时候要进行纵向扩容将会导致一段时间数据缺失,用户无法访问,这个是不可容忍的,如果你的服务可以动态扩容,那么你发现数据量过大服务器压力过大时你只要增加一个Pod是不是很开心。

为什么使用ETCD + NSQ

ETCD 本身是一个高可用的集群,保证数据的高可用和一致性。本身提供watch方法快速发现k v 变动从而及时响应且本身效率高广泛使用于服务注册发现等场景。下面是etcd的优势:

  • 完全复制:集群中的每个节点都可以使用完整的存档
  • 高可用性:Etcd可用于避免硬件的单点故障或网络问题
  • 一致性:每次读取都会返回跨多主机的最新写入
  • 简单:包括一个定义良好、面向用户的API(gRPC)
  • 安全:实现了带有可选的客户端证书身份验证的自动化TLS
  • 快速:每秒10000次写入的基准速度
  • 可靠:使用Raft算法实现了强一致、高可用的服务存储目录

使用NSQ主要是因为轻量耗费资源少也主要是因为没钱哈哈哈。

其实使用 Kafka 的 ConsumeGroup 是一个更简单的方法,他先天支持数据分区,且在其内部维护了一个coordinate服务,实现的就是我们下面要介绍的使用ETCD实现的方法,就是ConsumeGroup中的消费者会注册到该服务上且该服务不断的监控这个ConsumeGroup中消费者的变动,包括减少,新增,挂掉。

但有两点是Kafka方案落选的原因:1. 太重了,消耗资源过多浪费钱。2.这个coordinate是内至的不可控性太强一旦出了问题无法进行及时定位和解决,网上也有很多它的问题总结,所以把它抽离出来用ETCD来做对于以后的优化和问题排查都有好处,毕竟这个过程一旦出问题那就是大问题。

总结起来就是 ETCD + NSQ 的方案节省资源,可控性高,性能好。

逻辑架构

先看下资源架构图

image-20220922133015417

图中每个云服务就是那个蓝色的云彩都是K8s中的一个pod,在服务压力大时只要增加几个pod就可以完美的解决问题。可以看到在注册到etcd时我是进行啦状态的划分的主和从,主节点用来进行数据的分片工作,它通过接受外来源的信息的ID或是唯一标识等字段把源数据进行分片图中是分成了六片,分别打到了六个不同的nsq topic ,每个从节点也通过主节点的分工去各自消费各自该消费的节点,目前我是没有把主节点设计成不进行消费当然你如果觉得浪费资源可以把主节点也设计成同时消费的节点。

说的再多都是放屁,没有代码的设计都是浪费感情,话不多说我们直接看代码:

代码逻辑:源码在这里

代码结构

老样子我们看下代码的结构:

image-20220922170412129

代码在 apply_etcd 下的 service_frag 主要的 etcd 服务注册监控服务以及控制逻辑在 register.go 中这里是逻辑最核心的地方需要重点来看,leader_consumer.go 就是主节点的结构构造及处理分发数据消费数据的方法,follower_comsumer.go 就是从节点的主题结构以及消费数据的方法。这三个文件构成逻辑的主体。nsqd 文件夹中放的就是进行nsq的消费者生产创建的一些基础逻辑。conf文件夹中放的就是配置文件及配置文件加载的代码。最后 util 文件夹中就放的是处理通过ID或唯一字段进行哈希分片的逻辑。

下面我会先讲主节点分发消费,从节点消费逻辑,最后讲 etcd 注册监控分片逻辑将主从节点数据处理逻辑串联起来。

主节点分发消费逻辑

通过逻辑架构图可以看出主节点的作用是消费源数据并进行过滤等处理然后对数据分片分发到从节点的 nsq topic 中。

主节点消费者结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type LeaderConsumers struct {
consumers []*LeaderConsumer
cancel context.CancelFunc
}

type LeaderConsumer struct {
topic string
QurumCap int
consumer *nsq.Consumer
sche map[int]*LeaderSched
hashIndex map[string]int
}

type LeaderSched struct {
index int
Channel chan []byte
Ctx context.Context
Producer *nsq.Producer
Topic string
}

上面说了主节点承担的任务就是对源数据进行消费并通过源数据的ID等唯一字段对其分片最后分发到不同的从节点消费的nsq topic 中,那么通过结构可以看出,首先是维护多个 leaderConsumer,这个例子我们只用到了一个当然可以处理多个源。之后是一个context的cancel方法进行协程统一控制,如果这个基础知识不知道建议想看下context的用法。在LeaderConsumer中可以看到有源数据的 topic 字段,通过这个topic获取源数据。

  • QurumCap 是分片数量

  • consumer 就是消费源数据的nsq消费者。

  • sche 这个字段很重要这个是进行数据分片分发的结构,这个结构通过将源数据经过分片后的index,将要发送到的nsq topic 统一在一起,通过对数据分片获取到数据的分片下标index,通过sche这个散列获取到对应的 LeaderSched 该结构存储了数据对应的从节点nsq topic以及producer,通过这两个标识将数据分发到对应的从节点。

  • hashIndex 这个结构是建立一个唯一字段和分片下标的一个缓存,可以不用每次都去计算数据源对应的下标。

新建主节点消费者并进行分发处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func InitLeaderDistribute(nsqConf conf.ConfigNSQ, channel chan []byte, ctx context.Context, index int) *LeaderSched {
sched := &LeaderSched{
index: index,
Channel: channel,
Ctx: ctx,
Producer: nsqd.CreateProducer(nsqConf.NsqdAddress),
Topic: fmt.Sprintf(followerTopic, index),
}
sched.Publish()
return sched
}

func (self *LeaderSched) Publish() {
go func() {
for {
select {
case v, ok := <-self.Channel:
if !ok {
return
}
self.Producer.PublishAsync(self.Topic, v, nil)
case <-self.Ctx.Done():
return
}
}
}()
}

func InitLeaderConsumer(topics []string, config conf.ConfigNSQ) *LeaderConsumers {
consumers := new(LeaderConsumers)
souces := make([]*LeaderConsumer, 0, len(topics))
var cap int
if ServiceInstance.quorum.IsMasterConsume {
cap = ServiceInstance.quorum.QuorumCap
} else {
cap = ServiceInstance.quorum.QuorumCap - 1
}
sche := make(map[int]*LeaderSched)
ctx, cancel := context.WithCancel(context.Background())
consumers.cancel = cancel
for i := 0; i < cap; i++ {
channel := make(chan []byte, 10000)
sched := InitLeaderDistribute(config, channel, ctx, i)
sche[i] = sched
}
for _, topic := range topics {
consumer := new(LeaderConsumer)
consumer.topic = topic
consumer.sche = sche
consumer.QurumCap = cap
consumer.hashIndex = make(map[string]int)
consumer.consumer = nsqd.CreateNSQConsumer(topic, channel, config.LookupAddress, consumer)
souces = append(souces, consumer)
}
consumers.consumers = souces
return consumers
}


建立主节点消费结构的方法是 InitLeaderConsumer 这个主节点消费结构其实是建立了一个消费队列,可以从多个不同的数据源进行消费并分发到不同从节点。如果你要主节点也消费分片数据的的话就对 QuorumCap 分片量减一操作。

重要的一步是 InitLeaderDistribute 做分片数据分发结构 sche ,该结构我们通过遍历分片数量每个分片建立一个 LeaderSched 结构并通过分片下标形成对应的分发 topic 。并把分片下标和leaderSche 存入散列以便主节点在分发数据时候使用,每形成一个leaderSched都通过 publish 方法形成一个数据分发者该分发者,该分发者通过接收 channel 的数据将数据分发到已经构建好的源数据对应的从节点nsq topic 中。

主节点分片并消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type Data struct {
ID string
}

func (self *LeaderConsumer) HandleMessage(message *nsq.Message) error {

fmt.Println("收到消息: ", string(message.Body))
var testData = new(Data)
if err := json.Unmarshal(message.Body, testData); err != nil {
fmt.Printf("HandleMessage, failed to unmarshal tick data: %s, err: %v", string(message.Body), err)
return err
}

index := self.GetIndex(testData.ID)
fmt.Printf("发送消息 : %s 到 %d \n", testData.ID, index)
self.sche[index].Channel <- message.Body

return nil
}

func (self *LeaderConsumer) GetIndex(code string) int {
if v, ok := self.hashIndex[code]; ok {
return v
} else {
index := util.GetIndex(code, self.QurumCap, 1)
self.hashIndex[code] = index
return index
}
}

func (self *LeaderConsumers) Stop() {
for _, c := range self.consumers {
c.consumer.Stop()
<-c.consumer.StopChan
fmt.Println("leader consumer consume topic exit:", c.topic)
}
self.cancel()
}

这是主节点的消费主逻辑,主节点在获取到源数据后对源数据进行分片并通过分片下标找到刚刚我们讲到的 leaderSche 结构送入其分发数据的分发者中进行分发。那么如何分片的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func HashCode(s string) int { //固定的hashcode
v := int(crc32.ChecksumIEEE([]byte(s)))
if v >= 0 {
return v
}
if -v >= 0 {
return -v
}
// v == MinInt
return 0
}

func GetIndex(code string, length, div int) int {
hash := HashCode(code)
index := hash % (length * div)
return index
}

主要是这个GetIndex逻辑,他在util下的hash.go文件中。通过取模来进行分片。

最后这个 Stop 方法是结束主节点的消费队列并且结束所有数据分发者。

从节点消费逻辑

从节点消费结构

1
2
3
4
5
type FollowerConsumer struct {
topic string
consumer *nsq.Consumer
}

从节点消费者就很简单了,因为它不要进行任何的额外逻辑处理只需要把消费逻辑定义好就好。

我定义了一个简单的消费逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func InitFollowerConsumer(index int, config conf.ConfigNSQ) *FollowerConsumer {
consumer := new(FollowerConsumer)
consumer.topic = fmt.Sprintf(followerTopic, index)
consumer.consumer = nsqd.CreateNSQConsumer(consumer.topic, channel, config.LookupAddress, consumer)
fmt.Println("start consume real::::", consumer.topic)
return consumer
}

func (self *FollowerConsumer) HandleMessage(message *nsq.Message) error {
var testData = new(Data)
if err := json.Unmarshal(message.Body, testData); err != nil {
fmt.Printf("HandleMessage, failed to unmarshal tick data: %s, err: %v \n", string(message.Body), err)
return err
}
fmt.Printf("follower consumer message : %s topic index : %d \n", string(message.Body), util.GetIndex(testData.ID, conf.Config.QuorumCap-1, 1))

return nil
}

func (self *FollowerConsumer) Stop() {
self.consumer.Stop()
fmt.Println("stop consume real::::", self.topic)
}

这里不做nsq的使用解释了大家可以自己google

ETCD 服务注册监控与数据源分片处理

这个是我们的重头戏也是稍微复杂的,用 ETCD 串联起来了master 和 follower 的节点消费,不难想象 ETCD 需要做的工作有什么

  • 每个节点启动都要进行注册,注册并维护自身生命周期。
  • 每个节点需要监控自身:从看消费分片下标是否改变,主看自己是否被删掉了结束分发工作并重连。
  • 每个节点都竞选主节点。
  • 当选到主节点后开始:1. 数据接收并分片。2. 分配从节点需要消费的分片下标topic。3. 分发分片任务给对应的分片下标topic。
  • 主节点监控所有节点是否变动,若变动进行重新分片并分配任务。

我们从代码来看看以上的任务是如何实现的:

ETCD 服务注册结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type InstanceInfo struct {
name string
leaseid clientv3.LeaseID
client *clientv3.Client
clusterId uint64
memberId uint64
ServicePath string
ElectionPath string
}

type QuorumInfo struct {
QuorumCap int //集群数量
IsMasterConsume bool
}

type MasterInfo struct {
Master *atomic.Value
Consumers *LeaderConsumers
watchNodesChan chan struct{}
}

type SlaveInfo struct {
ChildrenNodeData *atomic.Value //订阅某个队列的数据
Consumers map[int]*FollowerConsumer
}

type Service struct {
instance *InstanceInfo
quorum *QuorumInfo
master *MasterInfo
slave *SlaveInfo
}

这个结构其实也很简单,其中 InstanceInfo 结构是与ETCD进行够通的主要结构,其中包含了节点的名称name,这个name是注册到ETCD的唯一标识,用来标识该节点。其中 ServicePath,ElectionPath 都是用于生成该 name 做其前缀的。leaseid 是该 name 对应的租期ID用于定义并监控 name 的生命周期。最重要的当然是和ETCD 交流的客户端 client。最后 clusterId 和 memberId 是租约ID对应的一些信息,并么有用到可以忽略。

QuorumInfo 结构就是存储了分片数量分片数据量 QuorumCap 还有就是 IsMasterConsume 定义主节点是否需要消费分片数据。主节点如果想让他同时分发数据且消费数据就可以定义为true 。但我建议在资源够用的时候不要让主节点进行消费,这样主节点资源消耗将会大于其他节点,这会产生问题。

MasterInfo 结构就是主节点的一些信息包括,是否是主的 Master ,Consumers 主节点消费队列。其中这个 LeaderConsumers 是之前详细介绍过的,忘记的可以往上翻一翻。还有一个 watchNodesChan 在自己的 ETCD key 被删掉时去控制结束自己的分发任务的一个管道。

SlaveInfo 从节点信息 ChildrenNodeData 是存储的当前消耗的分片数据下标。Consumers 是维护的当前分片数据下标对应 topic 的对应消费者的散列。其中 FollowerConsumer 也是刚刚介绍过的

Sevice 就是这几个结构的集合。

下面我们一点点来看刚刚列举的的 ETCD 需要做的事情:

注册并维护自身生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
func NewService(config conf.ConfigEtcdOption, quormCap int, isMasterConsume bool) (*Service, error) {
fmt.Println(config)
cli, err := clientv3.New(clientv3.Config{
Endpoints: config.Hosts,
DialTimeout: 2 * time.Second,
})
if err != nil {
panic(err)
}
ServiceInstance = &Service{
instance: &InstanceInfo{
client: cli,
ServicePath: config.SlavePath,
ElectionPath: config.ElectionPath,
},
quorum: &QuorumInfo{
QuorumCap: quormCap,
IsMasterConsume: isMasterConsume,
},
master: &MasterInfo{
Master: new(atomic.Value),
watchNodesChan: make(chan struct{}),
},
}
ServiceInstance.slave = ServiceInstance.InitSlaveInfo()
ServiceInstance.master.Master.Store(false)
ServiceInstance.Grant()
ServiceInstance.instance.name = ServiceInstance.instance.ServicePath +
"/" + serviceRegistryPrefix +
"#" + util.GetInternal() +
"#" + strconv.FormatInt(int64(ServiceInstance.instance.leaseid), 10)
ServiceInstance.WatchSelfNodesData()
go ServiceInstance.Election()
return ServiceInstance, nil
}

func (s *Service) InitSlaveInfo() *SlaveInfo {
return &SlaveInfo{
ChildrenNodeData: new(atomic.Value),
Consumers: make(map[int]*FollowerConsumer),
}
}

func (s *Service) Start() error {
ch, err := s.keepAlive(s.instance.name, s.instance.leaseid)
if err != nil {
fmt.Println(err)
return err
}

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
for {
select {
case <-s.instance.client.Ctx().Done():
return errors.New("server closed")

case interSig := <-sig:
fmt.Println("internal : ", interSig)
return errors.New("server closed by internal signal : " + interSig.String())

case _, ok := <-ch:
if !ok {
fmt.Println("keep alive channel closed")
s.revoke()
return nil
}
//else {
//log.Printf("Recv reply from service: %s, ttl:%d ", s.Name, ka.TTL, s.leaseid, ka.ID, ka.ClusterId, ka.MemberId, ka.Revision)
//}
}
}
}

func (s *Service) revoke() error {
_, err := s.instance.client.Revoke(context.TODO(), s.instance.leaseid)
if err != nil {
fmt.Println(err)
}
fmt.Printf("servide:%s stop \n", s.instance.name)
return err
}

func (s *Service) keepAlive(key string, id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
_, err := s.instance.client.Put(context.TODO(), key, string([]byte{}), clientv3.WithLease(id))
if err != nil {
fmt.Println(err)
panic(err)
}
return s.instance.client.KeepAlive(context.TODO(), id)
}

func (s *Service) Grant() {
resp, err := s.instance.client.Grant(context.TODO(), 2)
if err != nil {
fmt.Println("create lease err : ", err)
panic(err)
}
s.instance.leaseid = resp.ID
s.instance.clusterId = resp.ClusterId
s.instance.memberId = resp.MemberId
}

这个 NewSevice + Start() 其实就是一步步做了所有的我刚刚列举的事情。我们重点先看一下注册和监控自身健康。这个两个操作其实就是NewService 中生成 Name 字段并使用 Grant() 生成租约 ID。然后是Start()中注册Name到ETCD,关联租约ID,调用KeepAlive 方法不断的去监控并维护该Name的生命周期,当服务挂掉,该Name也不会再被维护。

监控自身

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (s *Service) WatchSelfNodesData() {
rch := s.instance.client.Watch(context.Background(), s.instance.name, clientv3.WithKeysOnly())
go func() {
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case clientv3.EventTypePut:
fmt.Println("watch node data put ", ev.IsModify(), string(ev.Kv.Key), string(ev.Kv.Value))
if ev.IsModify() {
if ev.Kv == nil || len(ev.Kv.Value) == 0 {
continue
}
data := make([]string, 0)
err := json.Unmarshal(ev.Kv.Value, &data)
if err != nil {
fmt.Println("node data deserial occured err::", err)
continue
}
value := s.slave.ChildrenNodeData.Load()
fmt.Println("之前监控的节点为:", value)
fmt.Println("目前新监控的节点为:", data)

s.slave.ChildrenNodeData.Store(data)
if value == nil {
s.startConsume(data)
} else {
oldIndexes := value.([]string)
newIndexMap := make(map[string]bool, 0)
for _, v := range data {
newIndexMap[v] = true
}
for _, v := range oldIndexes {
if _, ok := newIndexMap[v]; !ok {
s.stopConsume(v)
}
}
s.startConsume(data)
}
}
case clientv3.EventTypeDelete:
fmt.Println("watch node data delete ", ev.IsModify(), string(ev.Kv.Key), string(ev.Kv.Value))
if s.master.Master.Load().(bool) {
s.master.Master.Store(false)
s.master.Consumers.Stop()
s.master.watchNodesChan <- struct{}{}
go s.Election()
}
//重连
s.instance.client.Put(context.TODO(), s.instance.name, string([]byte{}), clientv3.WithLease(s.instance.leaseid))
}
}
}
}()
}

func (s *Service) startConsume(indexes []string) {

for _, index := range indexes {
index, _ := strconv.Atoi(index)
if _, ok := s.slave.Consumers[index]; !ok {
s.slave.Consumers[index] = InitFollowerConsumer(index, conf.Config.Nsqd)
}
}
}

func (s *Service) stopConsume(indexNode string) {
index, _ := strconv.Atoi(indexNode)
if consumer, ok := s.slave.Consumers[index]; ok {
consumer.Stop()
delete(s.slave.Consumers, index)
}
}

每个节点都需要监控自身的对应数据变动。从节点需要监控是否自己被分配了新的分片消费 topic ,或是其是否有改变,从而在第一时间开始重新进行原消费的停止,现消费的开始。主节点需要看自己是否被删除,如果删除说明自己挂了或是keepalive出现了问题,就需要立刻重连并进行重新选主。这些方法都定义在以上的WatchSelfNodesData中。

竞选主节点

在关注自身变动的同时每个节点也同时需要竞选主节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (s *Service) Election() {
s1, err := concurrency.NewSession(s.instance.client, concurrency.WithTTL(2))
if err != nil {
fmt.Println("new session:", err)
panic(err)
}
e1 := concurrency.NewElection(s1, s.instance.ElectionPath)
if err := e1.Campaign(context.Background(), util.GetInternal()+"_"+uuid.New()); err != nil {
fmt.Println("campaign :", err)
panic(err)
}
fmt.Println("=============================================== 我已经成为主节点 ==============================================================")
cctx, cancel := context.WithCancel(context.TODO())
defer cancel()
fmt.Println("elect success info,", (<-e1.Observe(cctx)), s.instance.name)
s.master.Master.Store(true)
s.StopSlaveOperator()
s.master.Consumers = InitLeaderConsumer(conf.Config.MasterTopic, conf.Config.Nsqd)
s.WatchNodes()
}

在NewSeverice的方法中我们看到最后会调用 go ServiceInstance.Election() 这个方法,其实就是开启了选主。选主的方法如上,如果选到了主节点将会进行主节点的分片处理及任务分发等工作,这个接下来会说。如果没有选到主节点将会一直等待值到下一次自己被选上。

当选到主节点后开始:

  1. 数据接收并分片。
  2. 分配从节点需要消费的分片下标topic。
  3. 分发分片任务给对应的分片下标topic。
  4. 主节点监控所有节点是否变动,若变动进行重新分片并分配任务。

上面选主的方法在成功选主后会停掉之前作为从节点对分片的消费者。然后调用 WatchNodes 来进行以上的1,2,3,4步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
func (s *Service) WatchNodes() {
rch := s.instance.client.Watch(context.Background(), s.instance.ServicePath, clientv3.WithPrefix())
s.distributedNodeJob()
go func() {
for {
select {
case wresp := <-rch:
for _, ev := range wresp.Events {
switch ev.Type {
case clientv3.EventTypePut:
fmt.Println("发现增加了节点: ", ev.IsCreate(), string(ev.Kv.Key), string(ev.Kv.Value))
fmt.Println("即将任务重新分配")
if ev.IsCreate() {
s.distributedNodeJob()
}
case clientv3.EventTypeDelete:
fmt.Println("发现删除了节点: ", string(ev.Kv.Key), string(ev.Kv.Value))
fmt.Println("即将任务重新分配")
if s.master.Master.Load().(bool) && string(ev.Kv.Key) != s.instance.name {
s.distributedNodeJob()
}
}
}
case <-s.master.watchNodesChan:
return
}
}
}()
}

func (s *Service) distributedNodeJob() {
keys, err := s.getNodeInfo()
fmt.Println("获取到目前所有节点的 Keys :", keys)
if err != nil {
fmt.Println("get node info occured error,", err)
return
}
data := s.AssignNode(keys)
s.LoadNodeToEtcd(data)
}

func (s *Service) getNodeInfo() ([]string, error) {
response, err := s.instance.client.Get(context.Background(), s.instance.ServicePath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))
if err != nil {
fmt.Println("err::", err)
return nil, err
}
//重新分发
if response == nil || response.Kvs == nil || len(response.Kvs) == 0 {
return nil, errors.New("err node is not found")
}
resp := make([]string, 0, len(response.Kvs))
for _, v := range response.Kvs {
resp = append(resp, string(v.Key))
}
return resp, nil
}

func (s *Service) AssignNode(keys []string) map[string][]string {
distributed := make(map[string][]string)
quorumCap := s.quorum.QuorumCap
if !s.quorum.IsMasterConsume {
quorumCap--
}
s.assignNode(quorumCap, keys, distributed)
return distributed
}

func (s *Service) assignNode(quorumCap int, keys []string, distributed map[string][]string) {
if quorumCap == 0 {
return
}
if s.quorum.IsMasterConsume {
if len(keys) == 0 {
return
}
} else {
if len(keys) == 1 {
return
}
}
for _, path := range keys {
val := s.master.Master.Load()
if val == nil {
fmt.Println("还没有master,暂时不分发数据,略过...........................")
return
}
master := val.(bool)
if master && !s.quorum.IsMasterConsume && path == s.instance.name {
fmt.Println("主节点不消费数据,略过...........................")
continue
}
distributed[path] = append(distributed[path], strconv.Itoa(quorumCap-1))
quorumCap--
if quorumCap <= 0 {
return
}
}
s.assignNode(quorumCap, keys, distributed)
}

func (s *Service) LoadNodeToEtcd(data map[string][]string) {
for key, value := range data {
fmt.Printf("开始向节点 %s 分配分片:%#v 的任务 \n", key, data)
s.SetData(key, value)
}
}


func (s *Service) SetData(path string, data interface{}) {
pathArray := strings.Split(path, "#")
leaseId, err := strconv.ParseInt(pathArray[2], 10, 64)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = s.instance.client.Put(ctx, path, util.FromObject(data), clientv3.WithLease(clientv3.LeaseID(leaseId)))
cancel()
if err != nil {
fmt.Println("put failed, err:", err)
return
}
}

整个过程就是选到主之后开始分配任务,首先主节点去获取所有服务前缀的ETCD注册服务Name,然后根据我们指定好的分片数量进行Name对应分片下标的呼应,直到把所有下标分配完为止,这个过程中可以选择主节点到底是否也同时去消费分片数据。呼应好之后,主节点会把对应的Name的分片下标数组分发给ETCD,因为每个节点都进行自我监控,并及时响应对消费分片下标的变化,所以每个从节点会自动完成分配的消费任务。分发完成后主节点开始监控所有节点的变化(增删)当有变化的时候进行重新的分配。

好了最后我们看下main函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
defer func() {
time.Sleep(2 * time.Second)
fmt.Println("clean and eligible Quit...")
}()

conf.InitConfig("/Users/mac/go/src/CoolGoPkg/apply_etcd/service_frag/conf/conf.yaml")
fmt.Println(conf.Config)

srv, err := NewService(conf.Config.Registry, conf.Config.QuorumCap, false)
if err != nil {
panic(err)
}

err = srv.Start()
if err != nil {
fmt.Println("service start err :", err)
}

}

就是加载配置并新建服务然后调用Start。

测试案例

纸上得来终觉浅,我们跑起我们的集群进行数据观测,更加真切的去感知分布式分片数据处理。

首先当然是启动 ETCD ,NSQ 这些如何本地安装我就不说了,自己google一下。

之后我们进入到 service_frag 这个目录下,运行go run .

注意gomod要开起来,当然你想用什么方式跑都可以。

我们看到出现以下日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 mac@MacdeMacBook-Pro  ~/go/src/CoolGoPkg/apply_etcd/service_frag   master ±  go run .
&{{[127.0.0.1:2379] /myetcd_election /myetcd_service } {[127.0.0.1:4161] 127.0.0.1:4150} 7 false [master-data-topic-1]}
{[127.0.0.1:2379] /myetcd_election /myetcd_service }
watch node data put false /myetcd_service/service#192.168.100.122#7587865433662652252
=============================================== 我已经成为主节点 ==============================================================
elect success info, {cluster_id:14841639068965178418 member_id:10276657743932975437 revision:39 raft_term:4 [key:"/myetcd_election/694d8368e0ed7f5f" create_revision:39 mod_revision:39 version:1 value:"192.168.100.122_68929376-2f2a-4989-b16d-139bf61e2626" lease:7587865433662652255 ] false 0 {} [] 0} /myetcd_service/service#192.168.100.122#7587865433662652252
2022/09/23 15:58:06 INF 1 (127.0.0.1:4150) connecting to nsqd
2022/09/23 15:58:06 INF 2 (127.0.0.1:4150) connecting to nsqd
2022/09/23 15:58:06 INF 3 (127.0.0.1:4150) connecting to nsqd
2022/09/23 15:58:06 INF 4 (127.0.0.1:4150) connecting to nsqd
2022/09/23 15:58:06 INF 5 (127.0.0.1:4150) connecting to nsqd
2022/09/23 15:58:06 INF 6 (127.0.0.1:4150) connecting to nsqd
2022/09/23 15:58:06 INF 7 [master-data-topic-1/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=master-data-topic-1
2022/09/23 15:58:06 INF 7 [master-data-topic-1/channel.queue.test] (MacdeMacBook-Pro.local:4150) connecting to nsqd
获取到目前所有节点的 Keys : [/myetcd_service/service#192.168.100.122#7587865433662652252]

说明它已经开始进行主节点的监控,并构建了leaderconsuemers。

接下来我们另起一个终端再次go run . 启动一个服务。

我们看到出现以下日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 ✘ mac@MacdeMacBook-Pro  ~/go/src/CoolGoPkg/apply_etcd/service_frag   master ±  go run .
&{{[127.0.0.1:2379] /myetcd_election /myetcd_service } {[127.0.0.1:4161] 127.0.0.1:4150} 7 false [master-data-topic-1]}
{[127.0.0.1:2379] /myetcd_election /myetcd_service }
watch node data put false /myetcd_service/service#192.168.100.122#7587865433662652284
watch node data put true /myetcd_service/service#192.168.100.122#7587865433662652284 ["5","4","3","2","1","0"]
之前监控的节点为: <nil>
目前新监控的节点为: [5 4 3 2 1 0]
2022/09/23 16:01:45 INF 1 [follower.queue.test.5.topic/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=follower.queue.test.5.topic
2022/09/23 16:01:45 INF 1 [follower.queue.test.5.topic/channel.queue.test] (MacdeMacBook-Pro.local:4150) connecting to nsqd
start consume real:::: follower.queue.test.5.topic
2022/09/23 16:01:45 INF 2 [follower.queue.test.4.topic/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=follower.queue.test.4.topic
2022/09/23 16:01:45 INF 2 [follower.queue.test.4.topic/channel.queue.test] (MacdeMacBook-Pro.local:4150) connecting to nsqd
start consume real:::: follower.queue.test.4.topic
2022/09/23 16:01:45 INF 3 [follower.queue.test.3.topic/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=follower.queue.test.3.topic
2022/09/23 16:01:45 ERR 3 [follower.queue.test.3.topic/channel.queue.test] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=follower.queue.test.3.topic) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
start consume real:::: follower.queue.test.3.topic
2022/09/23 16:01:45 INF 4 [follower.queue.test.2.topic/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=follower.queue.test.2.topic
2022/09/23 16:01:45 ERR 4 [follower.queue.test.2.topic/channel.queue.test] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=follower.queue.test.2.topic) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
start consume real:::: follower.queue.test.2.topic
2022/09/23 16:01:45 INF 5 [follower.queue.test.1.topic/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=follower.queue.test.1.topic
2022/09/23 16:01:45 INF 5 [follower.queue.test.1.topic/channel.queue.test] (MacdeMacBook-Pro.local:4150) connecting to nsqd
start consume real:::: follower.queue.test.1.topic
2022/09/23 16:01:45 INF 6 [follower.queue.test.0.topic/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=follower.queue.test.0.topic
2022/09/23 16:01:45 INF 6 [follower.queue.test.0.topic/channel.queue.test] (MacdeMacBook-Pro.local:4150) connecting to nsqd
start consume real:::: follower.queue.test.0.topic

看到了吗这个节点作为从节点开始消费0-5的所有分片数据topic。

我们再看主节点有什么日志打印变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
发现增加了节点:  true /myetcd_service/service#192.168.100.122#7587865433662652284
即将任务重新分配
获取到目前所有节点的 Keys : [/myetcd_service/service#192.168.100.122#7587865433662652275 /myetcd_service/service#192.168.100.122#7587865433662652284]
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
开始向节点 /myetcd_service/service#192.168.100.122#7587865433662652284 分配分片:map[string][]string{"/myetcd_service/service#192.168.100.122#7587865433662652284":[]string{"5", "4", "3", "2", "1", "0"}} 的任务
发现增加了节点: false /myetcd_service/service#192.168.100.122#7587865433662652284 ["5","4","3","2","1","0"]
即将任务重新分配
2022/09/23 16:02:41 INF 7 [master-data-topic-1/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=master-data-topic-1
2022/09/23 16:03:41 INF 7 [master-data-topic-1/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=master-data-topic-1

主节点发现了从节点/myetcd_service/service#192.168.100.122#7587865433662652284 并把任务[]string{“5”, “4”, “3”, “2”, “1”, “0”}分给了它。

我们再启动一个服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 mac@MacdeMacBook-Pro  ~/go/src/CoolGoPkg/apply_etcd/service_frag   master ±  go run .
&{{[127.0.0.1:2379] /myetcd_election /myetcd_service } {[127.0.0.1:4161] 127.0.0.1:4150} 7 false [master-data-topic-1]}
{[127.0.0.1:2379] /myetcd_election /myetcd_service }
watch node data put false /myetcd_service/service#192.168.100.122#7587865433662652293
watch node data put true /myetcd_service/service#192.168.100.122#7587865433662652293 ["4","2","0"]
之前监控的节点为: <nil>
目前新监控的节点为: [4 2 0]
2022/09/23 16:05:50 INF 1 [follower.queue.test.4.topic/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=follower.queue.test.4.topic
2022/09/23 16:05:50 INF 1 [follower.queue.test.4.topic/channel.queue.test] (MacdeMacBook-Pro.local:4150) connecting to nsqd
start consume real:::: follower.queue.test.4.topic
2022/09/23 16:05:50 INF 2 [follower.queue.test.2.topic/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=follower.queue.test.2.topic
2022/09/23 16:05:50 ERR 2 [follower.queue.test.2.topic/channel.queue.test] error querying nsqlookupd (http://127.0.0.1:4161/lookup?topic=follower.queue.test.2.topic) - got response 404 Not Found "{\"message\":\"TOPIC_NOT_FOUND\"}"
start consume real:::: follower.queue.test.2.topic
2022/09/23 16:05:50 INF 3 [follower.queue.test.0.topic/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=follower.queue.test.0.topic
2022/09/23 16:05:50 INF 3 [follower.queue.test.0.topic/channel.queue.test] (MacdeMacBook-Pro.local:4150) connecting to nsqd
start consume real:::: follower.queue.test.0.topic

这个节点作为从节点开始消费[4 2 0]的分片数据topic。

同样的我们在看主节点日志变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
发现增加了节点:  true /myetcd_service/service#192.168.100.122#7587865433662652293
即将任务重新分配
获取到目前所有节点的 Keys : [/myetcd_service/service#192.168.100.122#7587865433662652275 /myetcd_service/service#192.168.100.122#7587865433662652284 /myetcd_service/service#192.168.100.122#7587865433662652293]
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
开始向节点 /myetcd_service/service#192.168.100.122#7587865433662652293 分配分片:map[string][]string{"/myetcd_service/service#192.168.100.122#7587865433662652284":[]string{"5", "3", "1"}, "/myetcd_service/service#192.168.100.122#7587865433662652293":[]string{"4", "2", "0"}} 的 任务
开始向节点 /myetcd_service/service#192.168.100.122#7587865433662652284 分配分片:map[string][]string{"/myetcd_service/service#192.168.100.122#7587865433662652284":[]string{"5", "3", "1"}, "/myetcd_service/service#192.168.100.122#7587865433662652293":[]string{"4", "2", "0"}} 的 任务
发现增加了节点: false /myetcd_service/service#192.168.100.122#7587865433662652293 ["4","2","0"]
即将任务重新分配
发现增加了节点: false /myetcd_service/service#192.168.100.122#7587865433662652284 ["5","3","1"]
即将任务重新分配
2022/09/23 16:06:41 INF 7 [master-data-topic-1/channel.queue.test] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=master-data-topic-1

主节点发现发现增加了节点: /myetcd_service/service#192.168.100.122#7587865433662652293

并把新任务[]string{“5”, “3”, “1”}分给了上个节点 /myetcd_service/service#192.168.100.122#7587865433662652284。

把任务[]string{“4”, “2”, “0”}这个节点 /myetcd_service/service#192.168.100.122#7587865433662652293。

那么从节点1的日志有什么变化呢?

1
2
3
4
5
6
7
8
9
watch node data put  true /myetcd_service/service#192.168.100.122#7587865433662652284 ["5","3","1"]
之前监控的节点为: [5 4 3 2 1 0]
目前新监控的节点为: [5 3 1]
2022/09/23 16:05:50 INF 2 [follower.queue.test.4.topic/channel.queue.test] stopping...
stop consume real:::: follower.queue.test.4.topic
2022/09/23 16:05:50 INF 4 [follower.queue.test.2.topic/channel.queue.test] stopping...
2022/09/23 16:05:50 INF 4 [follower.queue.test.2.topic/channel.queue.test] stopping handlers
stop consume real:::: follower.queue.test.2.topic

发现变化的它开始从新消费了

好接下来我们开始发送测试数据到主节点topic,继续观察这三个节点的日志变化。

我们进入nsqd 目录下的producer_test.go文件,看到我们的测试分发函数运行它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Data struct {
ID string
}

func TestCreateProducer(t *testing.T) {
nsqAddr := "127.0.0.1:4150"
producer := CreateProducer(nsqAddr)
fmt.Println("create success")
for i := 0; i < 10000; i++ {
fmt.Println(i)
testData := Data{
ID: "测试消息:" + strconv.Itoa(i),
}
sendData, err := json.Marshal(testData)
if err != nil {
t.Log("marshal err : ", err)
return
}
err = producer.Publish("master-data-topic-1", sendData)
if err != nil {
fmt.Println(err)
t.Log("pubulish err : ", err)
continue
}
time.Sleep(time.Second)
}

}

他将会不断地发测试数据一秒一个。

运行之后我们看我们刚刚启动的三个节点日志变化如下:

image-20220923173812951

我们看到从节点已经开始各司其职去消费自己的分片的数据了。

接下来我们退出看会有什么变化

退出从节点2

主节点日志变更:

1
2
3
4
5
6
7
8
9
10
11
12
13
发现删除了节点:  /myetcd_service/service#192.168.100.122#7587865433662652525
即将任务重新分配
获取到目前所有节点的 Keys : [/myetcd_service/service#192.168.100.122#7587865433662652507 /myetcd_service/service#192.168.100.122#7587865433662652516]
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
主节点不消费数据,略过...........................
开始向节点 /myetcd_service/service#192.168.100.122#7587865433662652516 分配分片:map[string][]string{"/myetcd_service/service#192.168.100.122#7587865433662652516":[]string{"5", "4", "3", "2", "1", "0"}} 的任务
发现增加了节点: false /myetcd_service/service#192.168.100.122#7587865433662652516 ["5","4","3","2","1","0"]
即将任务重新分配

主节点发现有节点退出立刻给了从节点1增加了任务量,变为了全量消费。

从节点1的变化:

1
2
3
4
watch node data put  true /myetcd_service/service#192.168.100.122#7587865433662652516 ["5","4","3","2","1","0"]
之前监控的节点为: [5 3 1]
目前新监控的节点为: [5 4 3 2 1 0]

从节点1也任劳任怨的接收了所有的topic 消费那么我们在跑一下测试数据

image-20220923174519154

从节点1 已经承受了所有。好了完全满足我们的需求。

总结

如果要写一个高并发大数据量处理的程序,那么横向拓展一定是最基本的要求,如果你的程序不能灵活拓展那么最终出现的问题你一定要能承受,这篇文章是一个非常通用的横向拓展架构,希望可以对你有启发和帮助,如果有问题欢迎留言。