go-common/app/infra/config/service/v1/client.go
2019-04-22 18:49:16 +08:00

571 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package v1
import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"time"
"go-common/app/infra/config/model"
"go-common/library/database/sql"
"go-common/library/ecode"
"go-common/library/log"
xtime "go-common/library/time"
)
const (
_buildVerKey = "%s_%s_%s"
_pushKey = "%s_%s_%s"
_cacheKey = "%s_%s_%d_%s"
_cacheKey2 = "%s_%s_%d_%s_2"
_fileKey = "%s_%d"
)
var (
addBusiness = "前端接口api添加配置版本"
addInfo = "添加版本:%d"
copyBusiness = "前端接口api拷贝配置版本"
copyInfo = "拷贝版本:%d新的版本:%d"
updateBusiness = "前端接口api更新配置"
updateInfo = "更新版本:%d"
)
// PushKey push sub id
func pushKey(svr, host, env string) string {
return fmt.Sprintf(_pushKey, svr, host, env)
}
// buildVerKey version mapping key
func buildVerKey(svr, bver, env string) string {
return fmt.Sprintf(_buildVerKey, svr, bver, env)
}
// cacheKey config cache key
func cacheKey(svr, bver, env string, ver int64) string {
return fmt.Sprintf(_cacheKey, svr, bver, ver, env)
}
// cacheKey config cache key
func cacheKey2(svr, bver, env string, ver int64) string {
return fmt.Sprintf(_cacheKey2, svr, bver, ver, env)
}
// fileKey
func fileKey(filename string, ver int64) string {
return fmt.Sprintf(_fileKey, filename, ver)
}
// tokenKey
func tokenKey(svr, env string) string {
return fmt.Sprintf("%s_%s", svr, env)
}
// genConfig generate config
func genConfig(ver int64, cs []*model.NSValue) (conf *model.Content, err error) {
var b []byte
data := make(map[string]string)
for _, c := range cs {
data[c.Name] = c.Config
}
if b, err = json.Marshal(data); err != nil {
return
}
mb := md5.Sum(b)
conf = &model.Content{
Version: ver,
Md5: hex.EncodeToString(mb[:]),
Content: string(b),
}
return
}
// genConfig2 generate config
func genConfig2(ver int64, cs []*model.NSValue, ns map[int64]string) (conf *model.Content, err error) {
var (
b []byte
v string
ok bool
s *model.Namespace
)
nsc := make(map[string]*model.Namespace)
for _, c := range cs {
if v, ok = ns[c.NamespaceID]; !ok && c.NamespaceID != 0 {
continue
}
if s, ok = nsc[v]; !ok {
s = &model.Namespace{Name: v, Data: map[string]string{}}
nsc[v] = s
}
s.Data[c.Name] = c.Config
}
if b, err = json.Marshal(nsc); err != nil {
return
}
mb := md5.Sum(b)
conf = &model.Content{
Version: ver,
Md5: hex.EncodeToString(mb[:]),
Content: string(b),
}
return
}
// Push version to clients & generate config caches
func (s *Service) Push(c context.Context, svr *model.Service) (err error) {
var (
hosts []*model.Host
values []*model.NSValue
conf *model.Content
conf2 *model.Content
namespaces map[int64]string
)
if values, err = s.dao.Values(c, svr.Version); err != nil {
return
}
if namespaces, err = s.dao.Namespaces(c, svr.Version); err != nil {
return
}
if len(values) == 0 {
err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
log.Error("%v", err)
return
}
// compatible old version sdk
if conf, err = genConfig(svr.Version, values); err != nil {
log.Error("get config value:%s error(%v) ", values, err)
return
}
cacheKey := cacheKey(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
if err = s.dao.SetFile(cacheKey, conf); err != nil {
log.Error("set confCashe error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Env)
err = nil
}
if conf2, err = genConfig2(svr.Version, values, namespaces); err != nil {
log.Error("get config2 value:%s error(%v) ", values, err)
return
}
cacheKey2 := cacheKey2(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
if err = s.dao.SetFile(cacheKey2, conf2); err != nil {
log.Error("set confCashe2 error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Env)
err = nil
}
s.setVersion(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
// push hosts
if hosts, err = s.dao.Hosts(c, svr.Name, svr.Env); err != nil {
log.Error("get hosts error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Version)
err = nil
return
}
for _, h := range hosts {
if h.State == model.HostOnline {
pushKey := pushKey(h.Service, h.Name, svr.Env)
if ok := s.pubEvent(pushKey, &model.Version{Version: conf.Version}); ok {
log.Info("s.events.Pub(%s, %d) ok: %t", pushKey, conf.Version, ok)
}
}
}
return
}
// Config return config content.
func (s *Service) Config(c context.Context, svr *model.Service) (conf *model.Content, err error) {
var values []*model.NSValue
if err = s.appAuth(c, svr.Name, svr.Env, svr.Token); err != nil {
return
}
cacheName := cacheKey(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
if conf, err = s.dao.File(cacheName); err == nil {
return
}
if values, err = s.dao.Values(c, svr.Version); err != nil {
return
}
if len(values) == 0 {
err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
log.Error("%v", err)
return
}
if conf, err = genConfig(svr.Version, values); err != nil {
log.Error("get config value:%s error(%v) ", values, err)
return
}
if err = s.dao.SetFile(cacheName, conf); err != nil {
err = nil
}
return
}
// Config2 return config content.
func (s *Service) Config2(c context.Context, svr *model.Service) (conf *model.Content, err error) {
var (
values []*model.NSValue
namespaces map[int64]string
)
if err = s.appAuth(c, svr.Name, svr.Env, svr.Token); err != nil {
return
}
cacheName := cacheKey2(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
if conf, err = s.dao.File(cacheName); err == nil {
return
}
if namespaces, err = s.dao.Namespaces(c, svr.Version); err != nil {
return
}
if values, err = s.dao.Values(c, svr.Version); err != nil {
return
}
if len(values) == 0 {
err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
log.Error("%v", err)
return
}
if conf, err = genConfig2(svr.Version, values, namespaces); err != nil {
log.Error("get config value:(%s) error(%v) ", values, err)
return
}
if err = s.dao.SetFile(cacheName, conf); err != nil {
err = nil
}
return
}
// File get one file content.
func (s *Service) File(c context.Context, svr *model.Service) (val string, err error) {
var (
curVer int64
ok bool
)
if err = s.appAuth(c, svr.Name, svr.Env, svr.Token); err != nil {
return
}
if svr.Version != model.UnknownVersion {
curVer = svr.Version
} else {
curVer, ok = s.version(svr.Name, svr.BuildVersion, svr.Env)
if !ok {
if curVer, err = s.dao.BuildVersion(c, svr.Name, svr.BuildVersion, svr.Env); err != nil {
log.Error("BuildVersion(%v) error(%v)", svr, err)
return
}
s.setVersion(svr.Name, svr.BuildVersion, svr.Env, curVer)
}
}
fKey := fileKey(svr.File, curVer)
if val, err = s.dao.FileStr(fKey); err == nil {
return
}
if val, err = s.dao.Value(c, svr.File, curVer); err != nil {
log.Error("Value(%v) error(%v)", svr.File, err)
return
}
s.dao.SetFileStr(fKey, val)
return
}
// CheckVersion check client version.
func (s *Service) CheckVersion(c context.Context, rhost *model.Host, env, token string) (evt chan *model.Version, err error) {
var (
curVer int64
)
if err = s.appAuth(c, rhost.Service, env, token); err != nil {
return
}
// set heartbeat
rhost.HeartbeatTime = xtime.Time(time.Now().Unix())
if err = s.dao.SetHost(c, rhost, rhost.Service, env); err != nil {
err = nil
}
evt = make(chan *model.Version, 1)
if rhost.Appoint > 0 {
if rhost.Appoint != rhost.ConfigVersion {
evt <- &model.Version{Version: rhost.Appoint}
}
return
}
// get current version, return if has new config version
if curVer, err = s.curVer(c, rhost.Service, rhost.BuildVersion, env); err != nil {
return
}
if curVer == model.UnknownVersion {
err = ecode.NothingFound
return
}
if curVer != rhost.ConfigVersion {
evt <- &model.Version{Version: curVer}
return
}
pushKey := pushKey(rhost.Service, rhost.Name, env)
s.eLock.Lock()
s.events[pushKey] = evt
s.eLock.Unlock()
return
}
// AppAuth check app is auth
func (s *Service) appAuth(c context.Context, svr, env, token string) (err error) {
var (
dbToken string
ok bool
tokenKey = tokenKey(svr, env)
)
s.eLock.RLock()
dbToken, ok = s.token[tokenKey]
s.eLock.RUnlock()
if !ok {
if dbToken, err = s.dao.Token(c, svr, env); err != nil {
log.Error("Token(%v,%v) error(%v)", svr, env, err)
return
}
s.SetToken(c, svr, env, dbToken)
}
if dbToken != token {
err = ecode.AccessDenied
}
return
}
// SetToken update Token
func (s *Service) SetToken(c context.Context, svr, env, token string) {
tokenKey := tokenKey(svr, env)
s.eLock.Lock()
s.token[tokenKey] = token
s.eLock.Unlock()
}
// Hosts return client hosts.
func (s *Service) Hosts(c context.Context, svr, env string) (hosts []*model.Host, err error) {
return s.dao.Hosts(c, svr, env)
}
// VersionSuccess return client versions which configuration is complete
func (s *Service) VersionSuccess(c context.Context, svr, env, bver string) (versions *model.Versions, err error) {
var (
vers []*model.ReVer
ver int64
)
if vers, err = s.dao.Versions(c, svr, env, model.ConfigEnd); err != nil {
log.Error("Versions(%v,%v,%v) error(%v)", svr, env, bver, err)
return
}
if ver, err = s.dao.BuildVersion(c, svr, bver, env); err != nil {
log.Error("BuildVersion(%v) error(%v)", svr, err)
return
}
versions = &model.Versions{
Version: vers,
DefVer: ver,
}
return
}
// VersionIng return client versions which configuration is creating
func (s *Service) VersionIng(c context.Context, svr, env string) (vers []int64, err error) {
var (
res []*model.ReVer
)
if res, err = s.dao.Versions(c, svr, env, model.ConfigIng); err != nil {
log.Error("Versions(%v,%v) error(%v)", svr, env, err)
return
}
vers = make([]int64, 0)
for _, reVer := range res {
vers = append(vers, reVer.Version)
}
return
}
// Builds all builds
func (s *Service) Builds(c context.Context, svr, env string) (builds []string, err error) {
return s.dao.Builds(c, svr, env)
}
// AddConfigs insert config into db.
func (s *Service) AddConfigs(c context.Context, svr, env, token, user string, data map[string]string) (err error) {
var (
svrID int64
ver int64
)
if err = s.appAuth(c, svr, env, token); err != nil {
return
}
if svrID, err = s.dao.ServiceID(c, svr, env); err != nil {
return
}
var tx *sql.Tx
if tx, err = s.dao.BeginTran(c); err != nil {
log.Error("begin tran error(%v)", err)
return
}
if ver, err = s.dao.TxInsertVer(tx, svrID, user); err != nil {
tx.Rollback()
return
}
if len(data) != 0 {
if err = s.dao.TxInsertValues(c, tx, ver, user, data); err != nil {
tx.Rollback()
return
}
}
if err = tx.Commit(); err != nil {
log.Error("tx.Commit error(%v)", err)
return
}
s.dao.InsertLog(c, user, addBusiness, fmt.Sprintf(addInfo, ver))
return
}
// CopyConfigs copy config in newVer.
func (s *Service) CopyConfigs(c context.Context, svr, env, token, user string, build string) (ver int64, err error) {
var (
svrID int64
curVer int64
values []*model.NSValue
)
if err = s.appAuth(c, svr, env, token); err != nil {
return
}
if curVer, err = s.curVer(c, svr, build, env); err != nil {
return
}
if values, err = s.dao.Values(c, curVer); err != nil {
return
}
data := make(map[string]string)
for _, c := range values {
data[c.Name] = c.Config
}
if svrID, err = s.dao.ServiceID(c, svr, env); err != nil {
return
}
var tx *sql.Tx
if tx, err = s.dao.BeginTran(c); err != nil {
log.Error("begin tran error(%v)", err)
return
}
if ver, err = s.dao.TxInsertVer(tx, svrID, user); err != nil {
tx.Rollback()
return
}
if err = s.dao.TxInsertValues(c, tx, ver, user, data); err != nil {
tx.Rollback()
return
}
if err = tx.Commit(); err != nil {
log.Error("tx.Commit error(%v)", err)
return
}
s.dao.InsertLog(c, user, copyBusiness, fmt.Sprintf(copyInfo, curVer, ver))
return
}
// UpdateConfigs update config.
func (s *Service) UpdateConfigs(c context.Context, svr, env, token, user string, ver int64, data map[string]string) (err error) {
var (
values []*model.NSValue
addData = make(map[string]string)
udata = make(map[string]string)
)
if err = s.appAuth(c, svr, env, token); err != nil {
return
}
if len(data) == 0 {
return
}
if values, err = s.dao.Values(c, ver); err != nil {
return
}
if len(values) == 0 {
return ecode.NothingFound
}
oldData := make(map[string]string)
for _, c := range values {
oldData[c.Name] = c.Config
}
for k, v := range data {
if _, ok := oldData[k]; ok {
udata[k] = v
} else {
addData[k] = v
}
}
var tx *sql.Tx
if tx, err = s.dao.BeginTran(c); err != nil {
log.Error("begin tran error(%v)", err)
return
}
if len(addData) != 0 {
if err = s.dao.TxInsertValues(c, tx, ver, user, addData); err != nil {
tx.Rollback()
return
}
}
if len(udata) != 0 {
if err = s.dao.TxUpdateValues(tx, ver, user, udata); err != nil {
tx.Rollback()
return
}
}
if err = tx.Commit(); err != nil {
log.Error("tx.Commit error(%v)", err)
return
}
s.dao.InsertLog(c, user, updateBusiness, fmt.Sprintf(updateInfo, ver))
return
}
func (s *Service) version(svr, bver, env string) (ver int64, ok bool) {
verKey := buildVerKey(svr, bver, env)
s.vLock.RLock()
ver, ok = s.versions[verKey]
s.vLock.RUnlock()
return
}
func (s *Service) setVersion(svr, bver, env string, ver int64) {
verKey := buildVerKey(svr, bver, env)
s.vLock.Lock()
s.versions[verKey] = ver
s.vLock.Unlock()
}
// ClearHost clear service hosts.
func (s *Service) ClearHost(c context.Context, svr, env string) (err error) {
return s.dao.ClearHost(c, svr, env)
}
// pubEvent publish a event to chan.
func (s *Service) pubEvent(key string, evt *model.Version) (ok bool) {
s.eLock.RLock()
c, ok := s.events[key]
s.eLock.RUnlock()
if ok {
c <- evt
}
return
}
// Unsub unsub a event.
func (s *Service) Unsub(svr, host, env string) {
key := pushKey(svr, host, env)
s.eLock.Lock()
delete(s.events, key)
s.eLock.Unlock()
}
func (s *Service) curVer(c context.Context, svr, build, env string) (ver int64, err error) {
var ok bool
// get current version, return if has new config version
ver, ok = s.version(svr, build, env)
if !ok {
if ver, err = s.dao.BuildVersion(c, svr, build, env); err != nil {
return
}
s.setVersion(svr, build, env, ver)
}
return
}