1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
package main
import ( "encoding/json" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/pkg/errors" "log" "time" "context" )
const ( _ttl = 10 )
type Register struct { cli *clientv3.Client leaseId clientv3.LeaseID lease clientv3.Lease info *NodeInfo closeChan chan error }
func NewRegister(info *NodeInfo, conf clientv3.Config) (reg *Register, err error) { r := &Register{} r.closeChan = make(chan error) r.info = info r.cli, err = clientv3.New(conf) return r, err }
func (r *Register) Run() { dur := time.Duration(time.Second) timer := time.NewTicker(dur) r.register() for { select { case <-timer.C: r.keepAlive() case <-r.closeChan: goto EXIT } } EXIT: log.Printf("[Register] Run exit...") }
func (r *Register) Stop() { r.revoke() close(r.closeChan) }
func (r *Register) register() (err error) { r.leaseId = 0 kv := clientv3.NewKV(r.cli) r.lease = clientv3.NewLease(r.cli) leaseResp, err := r.lease.Grant(context.TODO(), _ttl) if err != nil { err = errors.Wrapf(err, "[Register] register Grant err") return } data, err := json.Marshal(r.info) _, err = kv.Put(context.TODO(), r.info.UniqueId, string(data), clientv3.WithLease(leaseResp.ID)) if err != nil { err = errors.Wrapf(err, "[Register] register kv.Put err %s-%+v", r.info.Name, string(data)) return } r.leaseId = leaseResp.ID return }
func (r *Register) keepAlive() (err error) { _, err = r.lease.KeepAliveOnce(context.TODO(), r.leaseId) if err != nil { if err == rpctypes.ErrLeaseNotFound { r.register() err = nil } err = errors.Wrapf(err, "[Register] keepAlive err") } log.Printf(fmt.Sprintf("[Register] keepalive... leaseId:%+v", r.leaseId)) return err }
func(r *Register) revoke() (err error) { _, err = r.cli.Revoke(context.TODO(), r.leaseId) if err != nil { err = errors.Wrapf(err, "[Register] revoke err") return } log.Printf(fmt.Sprintf("[Register] revoke node:%+v", r.leaseId)) return }
|