Chang Crazy's Blog

升级自己的操作系统


  • 首页

  • 归档

  • 标签

  • 分类

  • 日程表

  • 关于

  • 搜索

site

2023-12-15

website

  • google

ETCD 服务注册和发现

2021-09-27

ETCD 服务发现

在微服务中各个服务都是无状态的,不会在配置中写死上下游的访问地址,所以需要有一个地方去维护各个节点的信息。

服务起来的时候会去注册中心拉取其他服务的节点信息,并且把自己的信息推送到注册中心。

当有运行的服务下线或者出现问题的时候,把自己从配置中心摘除,或者配置中心来检测服务状态(心跳、健康检查的协议)来摘除服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"encoding/json"
"fmt"
"github.com/coreos/etcd/clientv3"
"context"
"log"
)

type Discovery struct {
cli *clientv3.Client
info *NodeInfo
nodes *NodesManager
}

func NewDiscovery(info *NodeInfo, conf clientv3.Config, mgr *NodesManager) (dis *Discovery, err error) {
d := &Discovery{}
d.info = info
if mgr == nil {
return nil, fmt.Errorf("[Discovery] mgr == nil")
}
d.nodes = mgr
d.cli, err = clientv3.New(conf)
return d, err
}

func (d *Discovery) pull() {
kv := clientv3.NewKV(d.cli)
resp, err := kv.Get(context.TODO(), "discovery/", clientv3.WithPrefix())
if err != nil {
log.Fatalf("[Discovery] kv.Get err:%+v", err)
return
}
for _, v := range resp.Kvs{
node := &NodeInfo{}
err = json.Unmarshal(v.Value, node)
if err != nil {
log.Fatalf("[Discovery] json.Unmarshal err:%+v", err)
continue
}
d.nodes.AddNode(node)
log.Printf("[Discovery] pull node:%+v", node)
}
}

func (d *Discovery) watch() {
watcher := clientv3.NewWatcher(d.cli)
watchChan := watcher.Watch(context.TODO(), "discovery", clientv3.WithPrefix())
for {
select {
case resp := <-watchChan:
d.watchEvent(resp.Events)
}
}
}

func (d *Discovery) watchEvent(evs []*clientv3.Event) {
for _, ev := range evs{
switch ev.Type{
case clientv3.EventTypePut:
node := &NodeInfo{}
err := json.Unmarshal(ev.Kv.Value, node)
if err != nil {
log.Fatalf("[Discovery] json.Unmarshal err:%+v", err)
continue
}
d.nodes.AddNode(node)
log.Printf(fmt.Sprintf("[Discovery] new node:%s",string(ev.Kv.Value)))
case clientv3.EventTypeDelete:
d.nodes.DelNode(string(ev.Kv.Key))
log.Printf(fmt.Sprintf("[Discovery] del node:%s data:%s",string(ev.Kv.Key),string(ev.Kv.Value)))
}
}
}

服务注册

在etcd中服务的注册使用租约(lease)来实现,设置租约的时候按需求设置租约的时间(ttl),类似redis中的EXPIRE key,再把服务自身的节点等信息写到对应的key中。然后定时去调用KeepAliveOnce来保持租约,如果在期间KeepAliveOnce的消息丢失或者延迟大于这个租约的ttl则etcd中将会把这个节点的信息删除,恢复正常时重新发起租约流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100


package main

import (
"encoding/json"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/pkg/errors"
"log"
"time"
"context"
)

const (
_ttl = 10
)


type Register struct {
cli *clientv3.Client
leaseId clientv3.LeaseID
lease clientv3.Lease
info *NodeInfo
closeChan chan error
}

func NewRegister(info *NodeInfo, conf clientv3.Config) (reg *Register, err error) {
r := &Register{}
r.closeChan = make(chan error)
r.info = info
r.cli, err = clientv3.New(conf)
return r, err
}


func (r *Register) Run() {
dur := time.Duration(time.Second)
timer := time.NewTicker(dur)
r.register()
for {
select {
case <-timer.C:
r.keepAlive()
case <-r.closeChan:
goto EXIT
}
}
EXIT:
log.Printf("[Register] Run exit...")
}

func (r *Register) Stop() {
r.revoke()
close(r.closeChan)
}

func (r *Register) register() (err error) {
r.leaseId = 0
kv := clientv3.NewKV(r.cli)
r.lease = clientv3.NewLease(r.cli)
leaseResp, err := r.lease.Grant(context.TODO(), _ttl)
if err != nil {
err = errors.Wrapf(err, "[Register] register Grant err")
return
}
data, err := json.Marshal(r.info)
_, err = kv.Put(context.TODO(), r.info.UniqueId, string(data), clientv3.WithLease(leaseResp.ID))
if err != nil {
err = errors.Wrapf(err, "[Register] register kv.Put err %s-%+v", r.info.Name, string(data))
return
}
r.leaseId = leaseResp.ID
return
}

func (r *Register) keepAlive() (err error) {
_, err = r.lease.KeepAliveOnce(context.TODO(), r.leaseId)
if err != nil {
// 租约丢失,重新注册
if err == rpctypes.ErrLeaseNotFound {
r.register()
err = nil
}
err = errors.Wrapf(err, "[Register] keepAlive err")
}
log.Printf(fmt.Sprintf("[Register] keepalive... leaseId:%+v", r.leaseId))
return err
}

func(r *Register) revoke() (err error) {
_, err = r.cli.Revoke(context.TODO(), r.leaseId)
if err != nil {
err = errors.Wrapf(err, "[Register] revoke err")
return
}
log.Printf(fmt.Sprintf("[Register] revoke node:%+v", r.leaseId))
return
}

节点管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"log"
"math/rand"
"strings"
"sync"
)

type NodeInfo struct {
Addr string
Name string
UniqueId string
}

type NodesManager struct {
sync.RWMutex
// <name,<id,node>>
nodes map[string]map[string]*NodeInfo
}

func NewNodeManager() (m *NodesManager){
return &NodesManager{
nodes: map[string]map[string]*NodeInfo{},
}
}

func (n *NodesManager) AddNode(node *NodeInfo) {
if node == nil {
return
}
n.Lock()
defer n.Unlock()
if _, exist := n.nodes[node.Name]; !exist {
n.nodes[node.Name] = map[string]*NodeInfo{}
}
n.nodes[node.Name][node.UniqueId] = node
}

func (n *NodesManager) DelNode(id string) {
sli := strings.Split(id,"/")
name := sli[len(sli)-2]
n.Lock()
defer n.Unlock()
if _, exist := n.nodes[name]; exist {
delete(n.nodes[name], id)
}
}

func (n *NodesManager) Pick(name string) *NodeInfo {
n.RLock()
defer n.RUnlock()
if nodes, exist := n.nodes[name]; !exist {
return nil
} else {
// 纯随机取节点
idx := rand.Intn(len(nodes))
for _, v := range nodes {
if idx == 0 {
return v
}
idx--
}
}
return nil
}

func (n *NodesManager) Dump () {
for k, v := range n.nodes {
for kk, vv := range v {
log.Printf("[NodesManager] Name:%s Id:%s Node:%+v", k, kk, vv)
}
}

}

测试效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"github.com/coreos/etcd/clientv3"
"log"
"time"
)

func main() {

nodes := NewNodeManager()
dis, _ := NewDiscovery(&NodeInfo{
Name: "server name/aaaa",
Addr: "127.0.0.1:8888",
}, clientv3.Config{
Endpoints: []string{"tx.cxc233.com:8888"},
DialTimeout: 5 * time.Second,
}, nodes)

reg, _ := NewRegister(&NodeInfo{
Name: "testsvr",
Addr: "127.0.0.1:8888",
UniqueId: "discovery/testsvr/instance_id/aaabbbccc",
}, clientv3.Config{
Endpoints: []string{"tx.cxc233.com:8888"},
DialTimeout: 5 * time.Second,
})

reg2, _ := NewRegister(&NodeInfo{
Name: "testsvr",
Addr: "127.0.0.1:8888",
UniqueId: "discovery/testsvr/instance_id/testqqqqq",
}, clientv3.Config{
Endpoints: []string{"tx.cxc233.com:8888"},
DialTimeout: 5 * time.Second,
})

go reg.Run()
time.Sleep(time.Second * 2)
dis.pull()
go dis.watch()
time.Sleep(time.Second * 1)
go reg2.Run()
time.Sleep(time.Second * 1)
nodes.Dump()
log.Printf("[Main] nodes pick:%+v",nodes.Pick("testsvr"))
time.Sleep(time.Second * 5)
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2020-10-18 22:36:39.102233 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:40.100365 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:40.100365 I | [Discovery] pull node:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/aaabbbccc}
2020-10-18 22:36:41.100389 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:41.143304 I | [Discovery] new node:{"Addr":"127.0.0.1:8888","Name":"testsvr","UniqueId":"discovery/testsvr/instance_id/testqqqqq"}
2020-10-18 22:36:42.097662 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:42.100819 I | [NodesManager] Name:testsvr Id:discovery/testsvr/instance_id/aaabbbccc Node:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/aaabbbccc}
2020-10-18 22:36:42.100819 I | [NodesManager] Name:testsvr Id:discovery/testsvr/instance_id/testqqqqq Node:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/testqqqqq}
2020-10-18 22:36:42.100819 I | [Main] nodes pick:&{Addr:127.0.0.1:8888 Name:testsvr UniqueId:discovery/testsvr/instance_id/testqqqqq}
2020-10-18 22:36:42.107588 I | [Register] keepalive... leaseId:7587849142468224216
2020-10-18 22:36:43.091650 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:43.109634 I | [Register] keepalive... leaseId:7587849142468224216
2020-10-18 22:36:44.093966 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:44.108130 I | [Register] keepalive... leaseId:7587849142468224216
2020-10-18 22:36:45.093075 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:45.114071 I | [Register] keepalive... leaseId:7587849142468224216
2020-10-18 22:36:46.107634 I | [Register] keepalive... leaseId:7587849142468224212
2020-10-18 22:36:46.114016 I | [Register] keepalive... leaseId:7587849142468224216
2020-10-18 22:36:47.091863 I | [Register] keepalive... leaseId:7587849142468224212

golang项目部署(Dockerfile)

2021-09-27

Go Dockerfile 构建

FROM golang:1.15

docker有一个基本镜像叫做scratch,它是一个空的镜像,在临时基础镜像上运行的应用程序只能访问内核

由于需要依赖cgo,所以我们使用scratch无法满足需求,我们需要另外一个运行时基础镜像alpine,看下dockerhub官方的介绍,它也仅仅只有5MB大小。

生成镜像,使用当前文件构建一个镜像

1
docker build -t changcrazy/service.doumi.com:v1  .

启动一个容器

1
docker run -d centos_nginx:v1 /usr/local/nginx/sbin/nginx -g "daemon off;"
阅读全文 »

Yaf Yar使用

2020-05-10

Yar (Yet Another RPC Framework) 一个rpc框架

Yar 是一个轻量级, 高效的RPC框架, 它提供了一种简单方法来让PHP项目之间可以互相远程调用对方的本地方法. 并且Yar也提供了并行调用的能力. 可以支持同时调用多个远程服务的方法.

阅读全文 »

Go 并发

2020-05-02

常见的并发模型:

进程与线程(apache) C10K 问题

  • nginx,nodejs 异步非阻塞 复杂度高
  • golang ,lua,erlang 协程方式

golang 中使用goroutine 实现并发的,channels 在多个goroutine 间的数据同步和通信,select在多个channels中选择数据的读取和写入。

并发与并行

并发: 指同一时刻,系统通过调度,来回切换交替的运行多个任务,“看起来”是同时进行

并行: 指同一个时刻,两个任务”真正的“同时进行

讲解地址 搬运废书问题

将复杂的任务拆分,通过goroutine去并发执行。

Coroutine

  • 指针,& 取地址。

修改结构体的变量内容的时候,方法传入的结构体变量参数需要使用指针,也就是结构体的地址

阅读全文 »

Go Cheat Sheet

2020-05-02

Go Cheat Sheet

一个整理了很多语言的cs

GO cheatsheet

GO cheatsheet

类库和框架等

Index

  1. Basic Syntax
  2. Operators
    • Arithmetic
    • Comparison
    • Logical
    • Other
  3. Declarations
  4. Functions
    • Functions as values and closures
    • Variadic Functions
  5. Built-in Types
  6. Type Conversions
  7. Packages
  8. Control structures
    • If
    • Loops
    • Switch
  9. Arrays, Slices, Ranges
    • Arrays
    • Slices
    • Operations on Arrays and Slices
  10. Maps
  11. Structs
  12. Pointers
  13. Interfaces
  14. Embedding
  15. Errors
  16. Concurrency
    • Goroutines
    • Channels
    • Channel Axioms
  17. Printing
  18. Reflection
    • Type Switch
    • Examples
  19. Snippets
    • Http-Server
阅读全文 »

Mysql 5.7 以上版本对JSON字段的操作

2020-05-02

官方文档mysql 5.7 json
官方文档mysql 5.7 json functions

官方文档mysql 8.0 json

阅读全文 »

Google 搜索技巧

2020-05-02

Google 搜索技巧

  1. 使用“本尊”

最好使用 http://www.google.com/ncr NCR: No Country Redirection,而不是http://www.google.com.hk;有时,直接输入http://www.google.com也会被自动转到“本地Google”,比如,我用日本的 VPN,浏览器就会把我转到http://www.google.co.jp……

  1. 优先使用英文关键字搜索

这是个好习惯。别说不会英文,不会就学,没那么难。

  1. 基本技巧

Google 搜索引擎也许是世界上最简单的应用界面,只有一个输入框和一个按钮。然而,用好它还是需要花点时间去学习的。Google 有帮助文档,还专门设计了个学习网站 A Google A Day

a)加号

在 Google 的输入框里,所有的空格都被 Google 理解为加号+。如果你输入的是 purpose of education那么 Google 返回的文章里既有“purpose”存在,也有“education”存在,但不一定有“purpose of education”存在。另外,过分常用的、单独存在没有意义的词汇往往被忽略掉,比如冠词“a”、“the”;介词“of”、“in”、“on”、“at”、“to”;连词“and”、“or”、“but”;从属连词“that”、“which”、“when”;代词“my”、“his”、“them”等等。

阅读全文 »

Go Install

2020-05-02

快速安装:

yum install go

源码安装最新版

git clone https://github.com/golang/go.gitcd go/git checkout 1.13版本cd srcall.bash

成功后就将go/bin/go 添加到环境变量即可。
环境变量

goexport GOPATH=$HOME/www/go:$HOME/www/baxian/baxian_sea/doc/data/tokumxexport GOBIN=$HOME/www/bin

阅读全文 »

Elastic Search

2019-01-30

ES 命令

delete index

curl -X DELETE ‘http://localhost:9200/samples'

list all indexes

curl -X GET ‘http://localhost:9200/_cat/indices?v'

list all docs in index

curl -X GET ‘http://localhost:9200/sample/_search'

阅读全文 »
12…7
changyuan

changyuan

不断升级自己的操作系统

69 日志
1 分类
26 标签
GitHub Twitter Weibo Zhihu
Links
  • Weibo
  • GitHub
© 2023 changyuan
载入天数... 载入时分秒...