go-common/app/infra/config/service/v2/client.go
2019-04-22 18:49:16 +08:00

770 lines
17 KiB
Go

package v2
import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"go-common/app/infra/config/model"
"go-common/library/ecode"
"go-common/library/log"
xtime "go-common/library/time"
)
const (
_buildVerKey = "%d_%s"
_pushKey = "%s_%s"
_cacheKey = "%s_%d"
_pushForceKey = "%s_%s_%s"
_tagForceKey = "%d_%s_%d_tagforce"
_tagIDkey = "%d_%s_tagID"
_lastForceKey = "%d_%s_lastForce"
)
func lastForceKey(svr int64, bver string) string {
return fmt.Sprintf(_tagIDkey, svr, bver)
}
func tagIDkey(svr int64, bver string) string {
return fmt.Sprintf(_tagIDkey, svr, bver)
}
// tagForceKey...
func tagForceKey(svr int64, bver string, tagID int64) string {
return fmt.Sprintf(_tagForceKey, svr, bver, tagID)
}
// pushForceKey push
func pushForceKey(svr, host, ip string) string {
return fmt.Sprintf(_pushForceKey, svr, host, ip)
}
// PushKey push sub id
func pushKey(svr, host string) string {
return fmt.Sprintf(_pushKey, svr, host)
}
// buildVerKey version mapping key
func buildVerKey(svr int64, bver string) string {
return fmt.Sprintf(_buildVerKey, svr, bver)
}
// cacheKey config cache key
func cacheKey(svr string, ver int64) string {
return fmt.Sprintf(_cacheKey, svr, ver)
}
// genConfig generate config
func genConfig(ver int64, values []*model.Value) (conf *model.Content, err error) {
var b []byte
if b, err = json.Marshal(values); err != nil {
return
}
mb := md5.Sum(b)
conf = &model.Content{
Version: ver,
Md5: hex.EncodeToString(mb[:]),
Content: string(b),
}
return
}
// Push version to clients & generate config caches
func (s *Service) Push(c context.Context, svr *model.Service) (err error) {
var (
hosts []*model.Host
values []*model.Value
conf *model.Content
app *model.App
ids []int64
force int8
)
if ids, err = s.dao.ConfIDs(svr.Version); err != nil {
return
}
if values, err = s.dao.ConfigsByIDs(ids); err != nil {
return
}
if len(values) == 0 {
err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
log.Error("%v", err)
return
}
// compatible old version sdk
if conf, err = genConfig(svr.Version, values); err != nil {
log.Error("get config value:%s error(%v) ", values, err)
return
}
cacheName := cacheKey(svr.Name, svr.Version)
if err = s.dao.SetFile(cacheName, conf); err != nil {
return
}
if app, err = s.app(svr.Name); err != nil {
return
}
tag := s.setTag(app.ID, svr.BuildVersion, &cacheTag{Tag: svr.Version, ConfIDs: ids})
s.setTagID(app.ID, tag.C.Tag, svr.BuildVersion)
if force, err = s.dao.TagForce(svr.Version); err != nil {
return
}
if force == 1 {
s.setLFVForces(svr.Version, app.ID, svr.BuildVersion)
}
// push hosts
if hosts, err = s.dao.Hosts(c, svr.Name); err != nil {
log.Error("get hosts error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Version)
err = nil
return
}
for _, h := range hosts {
if h.State == model.HostOnline {
pushKey := pushKey(h.Service, h.Name)
if ok := s.pubEvent(pushKey, &model.Diff{Version: svr.Version, Diffs: tag.diff()}); ok {
log.Info("s.events.Pub(%s, %d) ok: %t", pushKey, conf.Version, ok)
}
}
}
return
}
// Config return config content.
func (s *Service) Config(c context.Context, svrName, token string, ver int64, ids []int64) (conf *model.Content, err error) {
var (
values, all []*model.Value
)
if _, err = s.appAuth(c, svrName, token); err != nil {
return
}
cacheName := cacheKey(svrName, ver)
if conf, err = s.dao.File(cacheName); err == nil {
if len(ids) == 0 {
return
}
if err = json.Unmarshal([]byte(conf.Content), &all); err != nil {
return
}
} else {
if all, err = s.values(ver); err != nil {
return
}
if conf, err = genConfig(ver, all); err != nil {
return
}
cacheName := cacheKey(svrName, ver)
if err = s.dao.SetFile(cacheName, conf); err != nil {
return
}
if len(ids) == 0 {
return
}
}
if len(all) == 0 {
err = ecode.NothingFound
return
}
for _, v := range all {
for _, id := range ids {
if v.ConfigID == id {
values = append(values, v)
}
}
}
if len(values) == 0 {
log.Error("Config(%s,%v) error", svrName, ids)
err = ecode.NothingFound
return
}
if conf, err = genConfig(ver, values); err != nil {
log.Error("get config value:%s error(%v) ", values, err)
return
}
return
}
// File get one file content.
func (s *Service) File(c context.Context, svr *model.Service) (val string, err error) {
var (
curVer, appID int64
all []*model.Value
conf *model.Content
tag *curTag
)
if appID, err = s.appAuth(c, svr.Name, svr.Token); err != nil {
return
}
if svr.Version != model.UnknownVersion {
curVer = svr.Version
} else {
// get current version, return if has new config version
if tag, err = s.curTag(appID, svr.BuildVersion); err != nil {
return
}
curVer = tag.cur()
}
cacheName := cacheKey(svr.Name, svr.Version)
if conf, err = s.dao.File(cacheName); err == nil {
if err = json.Unmarshal([]byte(conf.Content), &all); err != nil {
return
}
} else {
if all, err = s.values(curVer); err != nil {
return
}
if conf, err = genConfig(curVer, all); err != nil {
return
}
cacheName := cacheKey(svr.Name, curVer)
if err = s.dao.SetFile(cacheName, conf); err != nil {
return
}
}
for _, v := range all {
if v.Name == svr.File {
val = v.Config
break
}
}
return
}
// CheckVersion check client version.
func (s *Service) CheckVersion(c context.Context, rhost *model.Host, token string) (evt chan *model.Diff, err error) {
var (
appID, tagID int64
tag *curTag
ForceVersion int64
tagForce int8
lastForce int64
)
if appID, err = s.appAuth(c, rhost.Service, token); err != nil {
return
}
// set heartbeat
rhost.HeartbeatTime = xtime.Time(time.Now().Unix())
s.dao.SetHost(c, rhost, rhost.Service)
evt = make(chan *model.Diff, 1)
//请开始你的表演
tagForce, err = s.tagForce(appID, rhost.BuildVersion)
if err != nil {
return
}
rhost.Force = tagForce
s.dao.SetHost(c, rhost, rhost.Service)
if rhost.ConfigVersion > 0 {
// force has a higher priority than appoint
if ForceVersion, err = s.curForce(rhost, appID); err != nil {
return
}
if ForceVersion > 0 {
rhost.ForceVersion = ForceVersion
s.dao.SetHost(c, rhost, rhost.Service)
if ForceVersion != rhost.ConfigVersion {
evt <- &model.Diff{Version: ForceVersion}
}
return
}
if lastForce, err = s.lastForce(appID, rhost.ConfigVersion, rhost.BuildVersion); err != nil {
return
}
if rhost.ConfigVersion <= lastForce {
if rhost.ConfigVersion != lastForce {
evt <- &model.Diff{Version: lastForce}
}
return
}
}
if rhost.Appoint > 0 {
if rhost.Appoint != rhost.ConfigVersion {
evt <- &model.Diff{Version: rhost.Appoint}
}
return
}
//结束表演
// get current version, return if has new config version
if tag, err = s.curTag(appID, rhost.BuildVersion); err != nil {
return
}
if tagID = tag.cur(); tagID == 0 {
err = ecode.NothingFound
return
}
if tagID != rhost.ConfigVersion {
ver := &model.Diff{Version: tagID}
if rhost.ConfigVersion == tag.old() {
ver.Diffs = tag.diff()
}
evt <- ver
return
}
pushKey := pushKey(rhost.Service, rhost.Name)
s.eLock.Lock()
s.events[pushKey] = evt
s.eLock.Unlock()
return
}
// values get configs by tag id.
func (s *Service) values(tagID int64) (values []*model.Value, err error) {
var (
ids []int64
)
if ids, err = s.dao.ConfIDs(tagID); err != nil {
return
}
values, err = s.dao.ConfigsByIDs(ids)
return
}
// appAuth check app is auth
func (s *Service) appAuth(c context.Context, svr, token string) (appID int64, err error) {
app, err := s.app(svr)
if err != nil {
return
}
if app.Token != token {
err = ecode.AccessDenied
return
}
appID = app.ID
return
}
// Hosts return client hosts.
func (s *Service) Hosts(c context.Context, svr string) (hosts []*model.Host, err error) {
return s.dao.Hosts(c, svr)
}
// Builds all builds
func (s *Service) Builds(c context.Context, svr string) (builds []string, err error) {
var (
app *model.App
)
if app, err = s.app(svr); err != nil {
return
}
return s.dao.BuildsByAppID(app.ID)
}
// VersionSuccess return client versions which configuration is complete
func (s *Service) VersionSuccess(c context.Context, svr, bver string) (versions *model.Versions, err error) {
var (
vers []*model.ReVer
ver int64
app *model.App
)
if app, err = s.app(svr); err != nil {
return
}
if ver, err = s.dao.TagID(app.ID, bver); err != nil {
log.Error("BuildVersion(%v) error(%v)", svr, err)
return
}
if vers, err = s.dao.Tags(app.ID); err != nil {
return
}
versions = &model.Versions{
Version: vers,
DefVer: ver,
}
return
}
// ClearHost clear service hosts.
func (s *Service) ClearHost(c context.Context, svr string) (err error) {
return s.dao.ClearHost(c, svr)
}
// pubEvent publish a event to chan.
func (s *Service) pubEvent(key string, evt *model.Diff) (ok bool) {
s.eLock.RLock()
c, ok := s.events[key]
s.eLock.RUnlock()
if ok {
c <- evt
}
return
}
// Unsub unsub a event.
func (s *Service) Unsub(svr, host string) {
key := pushKey(svr, host)
s.eLock.Lock()
delete(s.events, key)
s.eLock.Unlock()
}
func (s *Service) curTag(appID int64, build string) (tag *curTag, err error) {
var (
ok bool
tagID int64
ids []int64
tagForce int8
)
// get current version, return if has new config version
verKey := buildVerKey(appID, build)
s.tLock.RLock()
tag, ok = s.tags[verKey]
s.tLock.RUnlock()
if !ok {
if tagID, err = s.dao.TagID(appID, build); err != nil {
return
}
if ids, err = s.dao.ConfIDs(tagID); err != nil {
return
}
if tagForce, err = s.dao.TagForce(tagID); err != nil {
return
}
tag = s.setTag(appID, build, &cacheTag{Tag: tagID, ConfIDs: ids, Force: tagForce})
}
return
}
func (s *Service) setTag(appID int64, bver string, cTag *cacheTag) (nTag *curTag) {
var (
oTag *curTag
ok bool
)
verKey := buildVerKey(appID, bver)
nTag = &curTag{C: cTag}
s.tLock.Lock()
oTag, ok = s.tags[verKey]
if ok && oTag.C != nil {
nTag.O = oTag.C
}
s.tags[verKey] = nTag
s.tLock.Unlock()
return
}
func (s *Service) app(svr string) (app *model.App, err error) {
var (
ok bool
treeID int64
)
s.aLock.RLock()
app, ok = s.apps[svr]
s.aLock.RUnlock()
if !ok {
arrs := strings.Split(svr, "_")
if len(arrs) != 3 {
err = ecode.RequestErr
return
}
if treeID, err = strconv.ParseInt(arrs[0], 10, 64); err != nil {
return
}
if app, err = s.dao.AppByTree(arrs[2], arrs[1], treeID); err != nil {
log.Error("Token(%v) error(%v)", svr, err)
return
}
s.aLock.Lock()
s.apps[svr] = app
s.aLock.Unlock()
}
return app, nil
}
//SetToken set token.
func (s *Service) SetToken(svr, token string) (err error) {
var (
ok bool
treeID int64
app, nApp *model.App
)
s.aLock.RLock()
app, ok = s.apps[svr]
s.aLock.RUnlock()
if ok {
nApp = &model.App{ID: app.ID, Name: app.Name, Token: token}
} else {
arrs := strings.Split(svr, "_")
if len(arrs) != 3 {
err = ecode.RequestErr
return
}
if treeID, err = strconv.ParseInt(arrs[0], 10, 64); err != nil {
return
}
if nApp, err = s.dao.AppByTree(arrs[2], arrs[1], treeID); err != nil {
log.Error("Token(%v) error(%v)", svr, err)
return
}
}
s.aLock.Lock()
s.apps[svr] = nApp
s.aLock.Unlock()
return
}
//TmpBuilds get builds.
func (s *Service) TmpBuilds(svr, env string) (builds []string, err error) {
var (
apps []*model.DBApp
appIDs []int64
)
switch env {
case "10":
env = "dev"
case "11":
env = "fat1"
case "13":
env = "uat"
case "14":
env = "pre"
case "3":
env = "prod"
default:
}
if apps, err = s.dao.AppsByNameEnv(svr, env); err != nil {
return
}
if len(apps) == 0 {
err = ecode.NothingFound
return
}
for _, app := range apps {
appIDs = append(appIDs, app.ID)
}
return s.dao.BuildsByAppIDs(appIDs)
}
// AppService ...
func (s *Service) AppService(zone, env, token string) (service string, err error) {
var (
ok bool
key string
app *model.App
)
key = fmt.Sprintf("%s_%s_%s", zone, env, token)
s.bLock.RLock()
app, ok = s.services[key]
s.bLock.RUnlock()
if !ok {
app, err = s.dao.AppGet(zone, env, token)
if err != nil {
log.Error("AppService error(%v)", err)
return
}
s.bLock.Lock()
s.services[key] = app
s.bLock.Unlock()
}
service = fmt.Sprintf("%d_%s_%s", app.TreeID, app.Env, app.Zone)
return
}
// Force version to clients & generate config caches
func (s *Service) Force(c context.Context, svr *model.Service, hostnames map[string]string, sType int8) (err error) {
var (
values []*model.Value
ids []int64
)
if sType == 1 { // push 1 clear other
if ids, err = s.dao.ConfIDs(svr.Version); err != nil {
return
}
if values, err = s.dao.ConfigsByIDs(ids); err != nil {
return
}
if len(values) == 0 {
err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
log.Error("%v", err)
return
}
}
for key, val := range hostnames {
pushForceKey := pushForceKey(svr.Name, key, val)
s.pubForce(pushForceKey, svr.Version)
log.Info("s.events.Pub(%s, %d)", pushForceKey, svr.Version)
}
return
}
func (s *Service) curForce(rhost *model.Host, appID int64) (version int64, err error) {
var (
ok bool
)
// get force version
pushForceKey := pushForceKey(rhost.Service, rhost.Name, rhost.IP)
s.fLock.RLock()
version, ok = s.forces[pushForceKey]
s.fLock.RUnlock()
if !ok {
if version, err = s.dao.Force(appID, rhost.Name); err != nil {
return
}
s.fLock.Lock()
s.forces[pushForceKey] = version
s.fLock.Unlock()
}
return
}
// pubEvent publish a forces.
func (s *Service) pubForce(key string, version int64) {
s.fLock.Lock()
s.forces[key] = version
s.fLock.Unlock()
}
func (s *Service) tagForce(appID int64, build string) (force int8, err error) {
var (
ok bool
tagID int64
)
key := tagIDkey(appID, build)
s.tagIDLock.RLock()
tagID, ok = s.tagID[key]
s.tagIDLock.RUnlock()
if !ok {
if tagID, err = s.dao.TagID(appID, build); err != nil {
return
}
s.setTagID(appID, tagID, build)
}
verKey := tagForceKey(appID, build, tagID)
s.tfLock.RLock()
force, ok = s.forceType[verKey]
s.tfLock.RUnlock()
if !ok {
if force, err = s.dao.TagForce(tagID); err != nil {
return
}
s.tfLock.Lock()
s.forceType[verKey] = force
s.tfLock.Unlock()
}
return
}
func (s *Service) setTagID(appID, tagID int64, build string) {
key := tagIDkey(appID, build)
s.tagIDLock.Lock()
s.tagID[key] = tagID
s.tagIDLock.Unlock()
}
func (s *Service) lastForce(appID, version int64, build string) (lastForce int64, err error) {
var ok bool
var buildID int64
key := lastForceKey(appID, build)
s.lfvLock.RLock()
lastForce, ok = s.lfvforces[key]
s.lfvLock.RUnlock()
if !ok {
if buildID, err = s.dao.BuildID(appID, build); err != nil {
return
}
if lastForce, err = s.dao.LastForce(appID, buildID); err != nil {
return
}
s.lfvLock.Lock()
s.lfvforces[key] = lastForce
s.lfvLock.Unlock()
}
return
}
func (s *Service) setLFVForces(lastForce, appID int64, build string) {
key := lastForceKey(appID, build)
s.lfvLock.Lock()
s.lfvforces[key] = lastForce
s.lfvLock.Unlock()
}
//CheckLatest ...
func (s *Service) CheckLatest(c context.Context, rhost *model.Host, token string) (ver int64, err error) {
var (
appID, tagID int64
tag *curTag
)
if appID, err = s.appAuth(c, rhost.Service, token); err != nil {
return
}
// get current version, return if has new config version
if tag, err = s.curTag(appID, rhost.BuildVersion); err != nil {
return
}
if tagID = tag.cur(); tagID == 0 {
err = ecode.NothingFound
return
}
ver = tagID
return
}
// ConfigCheck return config content.
func (s *Service) ConfigCheck(c context.Context, svrName, token string, ver int64, ids []int64) (conf *model.Content, err error) {
var (
values, all []*model.Value
appID int64
tagAll *model.DBTag
)
if appID, err = s.appAuth(c, svrName, token); err != nil {
return
}
if tagAll, err = s.dao.TagAll(ver); err != nil {
return
}
if appID != tagAll.AppID {
err = ecode.RequestErr
return
}
cacheName := cacheKey(svrName, ver)
if conf, err = s.dao.File(cacheName); err == nil {
if len(ids) == 0 {
return
}
if err = json.Unmarshal([]byte(conf.Content), &all); err != nil {
return
}
} else {
if all, err = s.values(ver); err != nil {
return
}
if conf, err = genConfig(ver, all); err != nil {
return
}
cacheName := cacheKey(svrName, ver)
if err = s.dao.SetFile(cacheName, conf); err != nil {
return
}
if len(ids) == 0 {
return
}
}
if len(all) == 0 {
err = ecode.NothingFound
return
}
for _, v := range all {
for _, id := range ids {
if v.ConfigID == id {
values = append(values, v)
}
}
}
if len(values) == 0 {
log.Error("Config(%s,%v) error", svrName, ids)
err = ecode.NothingFound
return
}
if conf, err = genConfig(ver, values); err != nil {
log.Error("get config value:%s error(%v) ", values, err)
return
}
return
}