363 lines
8.6 KiB
Go
363 lines
8.6 KiB
Go
package model
|
|
|
|
import (
|
|
"encoding/json"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"go-common/library/ecode"
|
|
"go-common/library/log"
|
|
)
|
|
|
|
// InstanceStatus Status of instance
|
|
// type InstanceStatus uint32
|
|
|
|
const (
|
|
// InstanceStatusUP Ready to receive traffic
|
|
InstanceStatusUP = uint32(1)
|
|
// InstancestatusWating Intentionally shutdown for traffic
|
|
InstancestatusWating = uint32(1) << 1
|
|
)
|
|
|
|
func (i *Instance) filter(status uint32) bool {
|
|
return status&i.Status > 0
|
|
}
|
|
|
|
// Action Replicate type of node
|
|
type Action int
|
|
|
|
const (
|
|
// Register Replicate the add action to all nodes
|
|
Register Action = iota
|
|
// Renew Replicate the heartbeat action to all nodes
|
|
Renew
|
|
// Cancel Replicate the cancel action to all nodes
|
|
Cancel
|
|
// Weight Replicate the Weight action to all nodes
|
|
Weight
|
|
// Delete Replicate the Delete action to all nodes
|
|
Delete
|
|
// Status Replicate the Status action to all nodes
|
|
Status
|
|
)
|
|
|
|
// Instance holds information required for registration with
|
|
// <Discovery Server> and to be discovered by other components.
|
|
type Instance struct {
|
|
Region string `json:"region"`
|
|
Zone string `json:"zone"`
|
|
Env string `json:"env"`
|
|
Appid string `json:"appid"`
|
|
Treeid int64 `json:"treeid"`
|
|
Hostname string `json:"hostname"`
|
|
HTTP string `json:"http"`
|
|
RPC string `json:"rpc"`
|
|
Version string `json:"version"`
|
|
Metadata map[string]string `json:"metadata"`
|
|
Addrs []string `json:"addrs"`
|
|
// Status enum instance status
|
|
Status uint32 `json:"status"`
|
|
|
|
// timestamp
|
|
RegTimestamp int64 `json:"reg_timestamp"`
|
|
UpTimestamp int64 `json:"up_timestamp"` // NOTE: It is latest timestamp that status becomes UP.
|
|
RenewTimestamp int64 `json:"renew_timestamp"`
|
|
DirtyTimestamp int64 `json:"dirty_timestamp"`
|
|
|
|
LatestTimestamp int64 `json:"latest_timestamp"`
|
|
}
|
|
|
|
// NewInstance new a instance.
|
|
func NewInstance(arg *ArgRegister) (i *Instance) {
|
|
now := time.Now().UnixNano()
|
|
i = &Instance{
|
|
Region: arg.Region,
|
|
Zone: arg.Zone,
|
|
Env: arg.Env,
|
|
Appid: arg.Appid,
|
|
Treeid: arg.Treeid,
|
|
Hostname: arg.Hostname,
|
|
HTTP: arg.HTTP,
|
|
RPC: arg.RPC,
|
|
Version: arg.Version,
|
|
Status: arg.Status,
|
|
Addrs: arg.Addrs,
|
|
RegTimestamp: now,
|
|
UpTimestamp: now,
|
|
LatestTimestamp: now,
|
|
RenewTimestamp: now,
|
|
DirtyTimestamp: now,
|
|
}
|
|
i.Metadata = make(map[string]string)
|
|
if arg.Metadata != "" {
|
|
if err := json.Unmarshal([]byte(arg.Metadata), &i.Metadata); err != nil {
|
|
log.Error("json unmarshal metadata err %v", err)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// InstanceInfo the info get by consumer.
|
|
type InstanceInfo struct {
|
|
Instances []*Instance `json:"instances"`
|
|
ZoneInstances map[string][]*Instance `json:"zone_instances"`
|
|
LatestTimestamp int64 `json:"latest_timestamp"`
|
|
LatestTimestampStr string `json:"latest_timestamp_str"`
|
|
}
|
|
|
|
// Apps app distinguished by zone
|
|
type Apps struct {
|
|
apps map[string]*App
|
|
lock sync.RWMutex
|
|
latestTimestamp int64
|
|
}
|
|
|
|
// NewApps return new Apps.
|
|
func NewApps() *Apps {
|
|
return &Apps{
|
|
apps: make(map[string]*App),
|
|
}
|
|
}
|
|
|
|
// NewApp news a app by appid. If ok=false, returns the app of already exist.
|
|
func (p *Apps) NewApp(zone, appid string, treeid, lts int64) (a *App, new bool) {
|
|
p.lock.Lock()
|
|
a, ok := p.apps[zone]
|
|
if !ok {
|
|
a = NewApp(zone, appid, treeid)
|
|
p.apps[zone] = a
|
|
}
|
|
if lts <= p.latestTimestamp {
|
|
// insure increase
|
|
lts = p.latestTimestamp + 1
|
|
}
|
|
p.latestTimestamp = lts
|
|
p.lock.Unlock()
|
|
new = !ok
|
|
return
|
|
}
|
|
|
|
// App get app by zone.
|
|
func (p *Apps) App(zone string) (as []*App) {
|
|
p.lock.RLock()
|
|
if zone != "" {
|
|
a, ok := p.apps[zone]
|
|
if !ok {
|
|
p.lock.RUnlock()
|
|
return
|
|
}
|
|
as = []*App{a}
|
|
} else {
|
|
for _, a := range p.apps {
|
|
as = append(as, a)
|
|
}
|
|
}
|
|
p.lock.RUnlock()
|
|
return
|
|
}
|
|
|
|
// Del del app by zone.
|
|
func (p *Apps) Del(zone string) {
|
|
p.lock.Lock()
|
|
delete(p.apps, zone)
|
|
p.lock.Unlock()
|
|
}
|
|
|
|
// InstanceInfo return slice of instances.if up is true,return all status instance else return up status instance
|
|
func (p *Apps) InstanceInfo(zone string, latestTime int64, status uint32) (ci *InstanceInfo, err error) {
|
|
p.lock.RLock()
|
|
defer p.lock.RUnlock()
|
|
|
|
if latestTime >= p.latestTimestamp {
|
|
err = ecode.NotModified
|
|
return
|
|
}
|
|
ci = &InstanceInfo{
|
|
LatestTimestamp: p.latestTimestamp,
|
|
LatestTimestampStr: strconv.FormatInt(p.latestTimestamp/int64(time.Second), 10),
|
|
ZoneInstances: make(map[string][]*Instance),
|
|
}
|
|
var ok bool
|
|
for z, app := range p.apps {
|
|
if zone == "" || z == zone {
|
|
ok = true
|
|
as := app.Instances()
|
|
if len(as) == 0 {
|
|
continue
|
|
}
|
|
instance := make([]*Instance, 0, len(as))
|
|
for _, i := range as {
|
|
// if up is false return all status instance
|
|
if i.filter(status) {
|
|
// if i.Status == InstanceStatusUP && i.LatestTimestamp > latestTime { // TODO(felix): increase
|
|
ni := new(Instance)
|
|
*ni = *i
|
|
instance = append(instance, ni)
|
|
}
|
|
}
|
|
ci.Instances = append(ci.Instances, instance...)
|
|
ci.ZoneInstances[z] = instance
|
|
}
|
|
}
|
|
if !ok {
|
|
err = ecode.NothingFound
|
|
} else if len(ci.Instances) == 0 {
|
|
err = ecode.NotModified
|
|
}
|
|
return
|
|
}
|
|
|
|
// UpdateLatest update LatestTimestamp.
|
|
func (p *Apps) UpdateLatest(latestTime int64) {
|
|
if latestTime <= p.latestTimestamp {
|
|
// insure increase
|
|
latestTime = p.latestTimestamp + 1
|
|
}
|
|
p.latestTimestamp = latestTime
|
|
}
|
|
|
|
// App Instances distinguished by hostname
|
|
type App struct {
|
|
AppID string
|
|
Treeid int64
|
|
Zone string
|
|
instances map[string]*Instance
|
|
latestTimestamp int64
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
// NewApp new App.
|
|
func NewApp(zone, appid string, treeid int64) (a *App) {
|
|
a = &App{
|
|
Treeid: treeid,
|
|
AppID: appid,
|
|
Zone: zone,
|
|
instances: make(map[string]*Instance),
|
|
}
|
|
return
|
|
}
|
|
|
|
// Instances return slice of instances.
|
|
func (a *App) Instances() (is []*Instance) {
|
|
a.lock.RLock()
|
|
is = make([]*Instance, 0, len(a.instances))
|
|
for _, i := range a.instances {
|
|
ni := new(Instance)
|
|
*ni = *i
|
|
is = append(is, ni)
|
|
}
|
|
a.lock.RUnlock()
|
|
return
|
|
}
|
|
|
|
// NewInstance new a instance.
|
|
func (a *App) NewInstance(ni *Instance, latestTime int64) (i *Instance, ok bool) {
|
|
i = new(Instance)
|
|
a.lock.Lock()
|
|
oi, ok := a.instances[ni.Hostname]
|
|
if ok {
|
|
ni.UpTimestamp = oi.UpTimestamp
|
|
if ni.DirtyTimestamp < oi.DirtyTimestamp {
|
|
log.Warn("register exist(%v) dirty timestamp over than caller(%v)", oi, ni)
|
|
ni = oi
|
|
}
|
|
}
|
|
a.instances[ni.Hostname] = ni
|
|
a.updateLatest(latestTime)
|
|
*i = *ni
|
|
a.lock.Unlock()
|
|
ok = !ok
|
|
return
|
|
}
|
|
|
|
// Renew new a instance.
|
|
func (a *App) Renew(hostname string) (i *Instance, ok bool) {
|
|
i = new(Instance)
|
|
a.lock.Lock()
|
|
defer a.lock.Unlock()
|
|
oi, ok := a.instances[hostname]
|
|
if !ok {
|
|
return
|
|
}
|
|
oi.RenewTimestamp = time.Now().UnixNano()
|
|
*i = *oi
|
|
return
|
|
}
|
|
|
|
func (a *App) updateLatest(latestTime int64) {
|
|
if latestTime <= a.latestTimestamp {
|
|
// insure increase
|
|
latestTime = a.latestTimestamp + 1
|
|
}
|
|
a.latestTimestamp = latestTime
|
|
}
|
|
|
|
// Cancel cancel a instance.
|
|
func (a *App) Cancel(hostname string, latestTime int64) (i *Instance, l int, ok bool) {
|
|
i = new(Instance)
|
|
a.lock.Lock()
|
|
defer a.lock.Unlock()
|
|
oi, ok := a.instances[hostname]
|
|
if !ok {
|
|
return
|
|
}
|
|
delete(a.instances, hostname)
|
|
l = len(a.instances)
|
|
oi.LatestTimestamp = latestTime
|
|
a.updateLatest(latestTime)
|
|
*i = *oi
|
|
return
|
|
}
|
|
|
|
// Len returns the length of instances.
|
|
func (a *App) Len() (l int) {
|
|
a.lock.RLock()
|
|
l = len(a.instances)
|
|
a.lock.RUnlock()
|
|
return
|
|
}
|
|
|
|
// Set set new status,metadata of instance .
|
|
func (a *App) Set(changes *ArgSet) (ok bool) {
|
|
a.lock.Lock()
|
|
defer a.lock.Unlock()
|
|
var (
|
|
dst *Instance
|
|
setTime int64
|
|
)
|
|
if changes.SetTimestamp == 0 {
|
|
setTime = time.Now().UnixNano()
|
|
}
|
|
for i, hostname := range changes.Hostname {
|
|
if dst, ok = a.instances[hostname]; !ok {
|
|
log.Error("Set hostname(%s) not found", hostname)
|
|
return
|
|
}
|
|
if len(changes.Status) != 0 {
|
|
if uint32(changes.Status[i]) != InstanceStatusUP && uint32(changes.Status[i]) != InstancestatusWating {
|
|
log.Error("SetStatus change status(%d) is error", changes.Status[i])
|
|
ok = false
|
|
return
|
|
}
|
|
dst.Status = uint32(changes.Status[i])
|
|
if dst.Status == InstanceStatusUP {
|
|
dst.UpTimestamp = setTime
|
|
}
|
|
}
|
|
if len(changes.Metadata) != 0 {
|
|
metadata := make(map[string]string)
|
|
if err := json.Unmarshal([]byte(changes.Metadata[i]), &metadata); err != nil {
|
|
log.Error("set change metadata err %s", changes.Metadata[i])
|
|
ok = false
|
|
return
|
|
}
|
|
dst.Metadata = metadata
|
|
}
|
|
dst.LatestTimestamp = setTime
|
|
dst.DirtyTimestamp = setTime
|
|
}
|
|
a.updateLatest(setTime)
|
|
return
|
|
}
|