go-common/app/infra/discovery/model/instance.go
2019-04-22 18:49:16 +08:00

363 lines
8.6 KiB
Go

package model
import (
"encoding/json"
"strconv"
"sync"
"time"
"go-common/library/ecode"
"go-common/library/log"
)
// InstanceStatus Status of instance
// type InstanceStatus uint32
const (
// InstanceStatusUP Ready to receive traffic
InstanceStatusUP = uint32(1)
// InstancestatusWating Intentionally shutdown for traffic
InstancestatusWating = uint32(1) << 1
)
func (i *Instance) filter(status uint32) bool {
return status&i.Status > 0
}
// Action Replicate type of node
type Action int
const (
// Register Replicate the add action to all nodes
Register Action = iota
// Renew Replicate the heartbeat action to all nodes
Renew
// Cancel Replicate the cancel action to all nodes
Cancel
// Weight Replicate the Weight action to all nodes
Weight
// Delete Replicate the Delete action to all nodes
Delete
// Status Replicate the Status action to all nodes
Status
)
// Instance holds information required for registration with
// <Discovery Server> and to be discovered by other components.
type Instance struct {
Region string `json:"region"`
Zone string `json:"zone"`
Env string `json:"env"`
Appid string `json:"appid"`
Treeid int64 `json:"treeid"`
Hostname string `json:"hostname"`
HTTP string `json:"http"`
RPC string `json:"rpc"`
Version string `json:"version"`
Metadata map[string]string `json:"metadata"`
Addrs []string `json:"addrs"`
// Status enum instance status
Status uint32 `json:"status"`
// timestamp
RegTimestamp int64 `json:"reg_timestamp"`
UpTimestamp int64 `json:"up_timestamp"` // NOTE: It is latest timestamp that status becomes UP.
RenewTimestamp int64 `json:"renew_timestamp"`
DirtyTimestamp int64 `json:"dirty_timestamp"`
LatestTimestamp int64 `json:"latest_timestamp"`
}
// NewInstance new a instance.
func NewInstance(arg *ArgRegister) (i *Instance) {
now := time.Now().UnixNano()
i = &Instance{
Region: arg.Region,
Zone: arg.Zone,
Env: arg.Env,
Appid: arg.Appid,
Treeid: arg.Treeid,
Hostname: arg.Hostname,
HTTP: arg.HTTP,
RPC: arg.RPC,
Version: arg.Version,
Status: arg.Status,
Addrs: arg.Addrs,
RegTimestamp: now,
UpTimestamp: now,
LatestTimestamp: now,
RenewTimestamp: now,
DirtyTimestamp: now,
}
i.Metadata = make(map[string]string)
if arg.Metadata != "" {
if err := json.Unmarshal([]byte(arg.Metadata), &i.Metadata); err != nil {
log.Error("json unmarshal metadata err %v", err)
}
}
return
}
// InstanceInfo the info get by consumer.
type InstanceInfo struct {
Instances []*Instance `json:"instances"`
ZoneInstances map[string][]*Instance `json:"zone_instances"`
LatestTimestamp int64 `json:"latest_timestamp"`
LatestTimestampStr string `json:"latest_timestamp_str"`
}
// Apps app distinguished by zone
type Apps struct {
apps map[string]*App
lock sync.RWMutex
latestTimestamp int64
}
// NewApps return new Apps.
func NewApps() *Apps {
return &Apps{
apps: make(map[string]*App),
}
}
// NewApp news a app by appid. If ok=false, returns the app of already exist.
func (p *Apps) NewApp(zone, appid string, treeid, lts int64) (a *App, new bool) {
p.lock.Lock()
a, ok := p.apps[zone]
if !ok {
a = NewApp(zone, appid, treeid)
p.apps[zone] = a
}
if lts <= p.latestTimestamp {
// insure increase
lts = p.latestTimestamp + 1
}
p.latestTimestamp = lts
p.lock.Unlock()
new = !ok
return
}
// App get app by zone.
func (p *Apps) App(zone string) (as []*App) {
p.lock.RLock()
if zone != "" {
a, ok := p.apps[zone]
if !ok {
p.lock.RUnlock()
return
}
as = []*App{a}
} else {
for _, a := range p.apps {
as = append(as, a)
}
}
p.lock.RUnlock()
return
}
// Del del app by zone.
func (p *Apps) Del(zone string) {
p.lock.Lock()
delete(p.apps, zone)
p.lock.Unlock()
}
// InstanceInfo return slice of instances.if up is true,return all status instance else return up status instance
func (p *Apps) InstanceInfo(zone string, latestTime int64, status uint32) (ci *InstanceInfo, err error) {
p.lock.RLock()
defer p.lock.RUnlock()
if latestTime >= p.latestTimestamp {
err = ecode.NotModified
return
}
ci = &InstanceInfo{
LatestTimestamp: p.latestTimestamp,
LatestTimestampStr: strconv.FormatInt(p.latestTimestamp/int64(time.Second), 10),
ZoneInstances: make(map[string][]*Instance),
}
var ok bool
for z, app := range p.apps {
if zone == "" || z == zone {
ok = true
as := app.Instances()
if len(as) == 0 {
continue
}
instance := make([]*Instance, 0, len(as))
for _, i := range as {
// if up is false return all status instance
if i.filter(status) {
// if i.Status == InstanceStatusUP && i.LatestTimestamp > latestTime { // TODO(felix): increase
ni := new(Instance)
*ni = *i
instance = append(instance, ni)
}
}
ci.Instances = append(ci.Instances, instance...)
ci.ZoneInstances[z] = instance
}
}
if !ok {
err = ecode.NothingFound
} else if len(ci.Instances) == 0 {
err = ecode.NotModified
}
return
}
// UpdateLatest update LatestTimestamp.
func (p *Apps) UpdateLatest(latestTime int64) {
if latestTime <= p.latestTimestamp {
// insure increase
latestTime = p.latestTimestamp + 1
}
p.latestTimestamp = latestTime
}
// App Instances distinguished by hostname
type App struct {
AppID string
Treeid int64
Zone string
instances map[string]*Instance
latestTimestamp int64
lock sync.RWMutex
}
// NewApp new App.
func NewApp(zone, appid string, treeid int64) (a *App) {
a = &App{
Treeid: treeid,
AppID: appid,
Zone: zone,
instances: make(map[string]*Instance),
}
return
}
// Instances return slice of instances.
func (a *App) Instances() (is []*Instance) {
a.lock.RLock()
is = make([]*Instance, 0, len(a.instances))
for _, i := range a.instances {
ni := new(Instance)
*ni = *i
is = append(is, ni)
}
a.lock.RUnlock()
return
}
// NewInstance new a instance.
func (a *App) NewInstance(ni *Instance, latestTime int64) (i *Instance, ok bool) {
i = new(Instance)
a.lock.Lock()
oi, ok := a.instances[ni.Hostname]
if ok {
ni.UpTimestamp = oi.UpTimestamp
if ni.DirtyTimestamp < oi.DirtyTimestamp {
log.Warn("register exist(%v) dirty timestamp over than caller(%v)", oi, ni)
ni = oi
}
}
a.instances[ni.Hostname] = ni
a.updateLatest(latestTime)
*i = *ni
a.lock.Unlock()
ok = !ok
return
}
// Renew new a instance.
func (a *App) Renew(hostname string) (i *Instance, ok bool) {
i = new(Instance)
a.lock.Lock()
defer a.lock.Unlock()
oi, ok := a.instances[hostname]
if !ok {
return
}
oi.RenewTimestamp = time.Now().UnixNano()
*i = *oi
return
}
func (a *App) updateLatest(latestTime int64) {
if latestTime <= a.latestTimestamp {
// insure increase
latestTime = a.latestTimestamp + 1
}
a.latestTimestamp = latestTime
}
// Cancel cancel a instance.
func (a *App) Cancel(hostname string, latestTime int64) (i *Instance, l int, ok bool) {
i = new(Instance)
a.lock.Lock()
defer a.lock.Unlock()
oi, ok := a.instances[hostname]
if !ok {
return
}
delete(a.instances, hostname)
l = len(a.instances)
oi.LatestTimestamp = latestTime
a.updateLatest(latestTime)
*i = *oi
return
}
// Len returns the length of instances.
func (a *App) Len() (l int) {
a.lock.RLock()
l = len(a.instances)
a.lock.RUnlock()
return
}
// Set set new status,metadata of instance .
func (a *App) Set(changes *ArgSet) (ok bool) {
a.lock.Lock()
defer a.lock.Unlock()
var (
dst *Instance
setTime int64
)
if changes.SetTimestamp == 0 {
setTime = time.Now().UnixNano()
}
for i, hostname := range changes.Hostname {
if dst, ok = a.instances[hostname]; !ok {
log.Error("Set hostname(%s) not found", hostname)
return
}
if len(changes.Status) != 0 {
if uint32(changes.Status[i]) != InstanceStatusUP && uint32(changes.Status[i]) != InstancestatusWating {
log.Error("SetStatus change status(%d) is error", changes.Status[i])
ok = false
return
}
dst.Status = uint32(changes.Status[i])
if dst.Status == InstanceStatusUP {
dst.UpTimestamp = setTime
}
}
if len(changes.Metadata) != 0 {
metadata := make(map[string]string)
if err := json.Unmarshal([]byte(changes.Metadata[i]), &metadata); err != nil {
log.Error("set change metadata err %s", changes.Metadata[i])
ok = false
return
}
dst.Metadata = metadata
}
dst.LatestTimestamp = setTime
dst.DirtyTimestamp = setTime
}
a.updateLatest(setTime)
return
}