373 lines
8.0 KiB
Go
373 lines
8.0 KiB
Go
package paladin
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"go-common/library/conf/env"
|
|
"go-common/library/ecode"
|
|
"go-common/library/log"
|
|
xip "go-common/library/net/ip"
|
|
"go-common/library/net/netutil"
|
|
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const (
|
|
_apiGet = "http://%s/config/v2/get?%s"
|
|
_apiCheck = "http://%s/config/v2/check?%s"
|
|
|
|
_maxLoadRetries = 3
|
|
)
|
|
|
|
var (
|
|
_ Client = &sven{}
|
|
|
|
svenHost string
|
|
svenVersion string
|
|
svenPath string
|
|
svenToken string
|
|
svenAppoint string
|
|
svenTreeid string
|
|
|
|
_debug bool
|
|
)
|
|
|
|
func init() {
|
|
flag.StringVar(&svenHost, "conf_host", os.Getenv("CONF_HOST"), `config api host.`)
|
|
flag.StringVar(&svenVersion, "conf_version", os.Getenv("CONF_VERSION"), `app version.`)
|
|
flag.StringVar(&svenPath, "conf_path", os.Getenv("CONF_PATH"), `config file path.`)
|
|
flag.StringVar(&svenToken, "conf_token", os.Getenv("CONF_TOKEN"), `config token.`)
|
|
flag.StringVar(&svenAppoint, "conf_appoint", os.Getenv("CONF_APPOINT"), `config appoint.`)
|
|
flag.StringVar(&svenTreeid, "tree_id", os.Getenv("TREE_ID"), `tree id.`)
|
|
|
|
if env.DeployEnv == env.DeployEnvDev {
|
|
_debug = true
|
|
}
|
|
}
|
|
|
|
type watcher struct {
|
|
keys []string
|
|
ch chan Event
|
|
}
|
|
|
|
func newWatcher(keys []string) *watcher {
|
|
return &watcher{keys: keys, ch: make(chan Event, 5)}
|
|
}
|
|
|
|
func (w *watcher) HasKey(key string) bool {
|
|
if len(w.keys) == 0 {
|
|
return true
|
|
}
|
|
for _, k := range w.keys {
|
|
if k == key {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (w *watcher) Handle(event Event) {
|
|
select {
|
|
case w.ch <- event:
|
|
default:
|
|
log.Error("paladin: discard event:%+v", event)
|
|
}
|
|
}
|
|
|
|
func (w *watcher) Chan() <-chan Event {
|
|
return w.ch
|
|
}
|
|
|
|
func (w *watcher) Close() {
|
|
close(w.ch)
|
|
}
|
|
|
|
// sven is sven config client.
|
|
type sven struct {
|
|
values *Map
|
|
wmu sync.RWMutex
|
|
watchers map[*watcher]struct{}
|
|
|
|
httpCli *http.Client
|
|
backoff *netutil.BackoffConfig
|
|
}
|
|
|
|
// NewSven new a config client.
|
|
func NewSven() (Client, error) {
|
|
s := &sven{
|
|
values: new(Map),
|
|
watchers: make(map[*watcher]struct{}),
|
|
httpCli: &http.Client{Timeout: 60 * time.Second},
|
|
backoff: &netutil.BackoffConfig{
|
|
MaxDelay: 5 * time.Second,
|
|
BaseDelay: 1.0 * time.Second,
|
|
Factor: 1.6,
|
|
Jitter: 0.2,
|
|
},
|
|
}
|
|
if err := s.checkEnv(); err != nil {
|
|
return nil, err
|
|
}
|
|
ver, err := s.load()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
go s.watchproc(ver)
|
|
return s, nil
|
|
}
|
|
|
|
func (s *sven) checkEnv() error {
|
|
if svenHost == "" || svenVersion == "" || svenPath == "" || svenToken == "" || svenTreeid == "" {
|
|
return fmt.Errorf("config env invalid. conf_host(%s) conf_version(%s) conf_path(%s) conf_token(%s) conf_appoint(%s) tree_id(%s)", svenHost, svenVersion, svenPath, svenToken, svenAppoint, svenTreeid)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Get return value by key.
|
|
func (s *sven) Get(key string) *Value {
|
|
return s.values.Get(key)
|
|
}
|
|
|
|
// GetAll return value map.
|
|
func (s *sven) GetAll() *Map {
|
|
return s.values
|
|
}
|
|
|
|
// WatchEvent watch with the specified keys.
|
|
func (s *sven) WatchEvent(ctx context.Context, keys ...string) <-chan Event {
|
|
w := newWatcher(keys)
|
|
s.wmu.Lock()
|
|
s.watchers[w] = struct{}{}
|
|
s.wmu.Unlock()
|
|
return w.Chan()
|
|
}
|
|
|
|
// Close close watcher.
|
|
func (s *sven) Close() (err error) {
|
|
s.wmu.RLock()
|
|
for w := range s.watchers {
|
|
w.Close()
|
|
}
|
|
s.wmu.RUnlock()
|
|
return
|
|
}
|
|
|
|
func (s *sven) fireEvent(event Event) {
|
|
s.wmu.RLock()
|
|
for w := range s.watchers {
|
|
if w.HasKey(event.Key) {
|
|
w.Handle(event)
|
|
}
|
|
}
|
|
s.wmu.RUnlock()
|
|
}
|
|
|
|
func (s *sven) load() (ver int64, err error) {
|
|
var (
|
|
v *version
|
|
cs []*content
|
|
)
|
|
if v, err = s.check(-1); err != nil {
|
|
log.Error("paladin: s.check(-1) error(%v)", err)
|
|
return
|
|
}
|
|
for i := 0; i < _maxLoadRetries; i++ {
|
|
if cs, err = s.config(v); err == nil {
|
|
all := make(map[string]*Value, len(cs))
|
|
for _, v := range cs {
|
|
all[v.Name] = &Value{val: v.Config, raw: v.Config}
|
|
}
|
|
s.values.Store(all)
|
|
return v.Version, nil
|
|
}
|
|
log.Error("paladin: s.config(%v) error(%v)", ver, err)
|
|
time.Sleep(s.backoff.Backoff(i))
|
|
}
|
|
return 0, err
|
|
}
|
|
|
|
func (s *sven) watchproc(ver int64) {
|
|
var retry int
|
|
for {
|
|
v, err := s.check(ver)
|
|
if err != nil {
|
|
if ecode.NotModified.Equal(err) {
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
log.Error("paladin: s.check(%d) error(%v)", ver, err)
|
|
retry++
|
|
time.Sleep(s.backoff.Backoff(retry))
|
|
continue
|
|
}
|
|
cs, err := s.config(v)
|
|
if err != nil {
|
|
log.Error("paladin: s.config(%v) error(%v)", ver, err)
|
|
retry++
|
|
time.Sleep(s.backoff.Backoff(retry))
|
|
continue
|
|
}
|
|
all := s.values.Load()
|
|
news := make(map[string]*Value, len(cs))
|
|
for _, v := range cs {
|
|
if _, ok := all[v.Name]; !ok {
|
|
go s.fireEvent(Event{Event: EventAdd, Key: v.Name, Value: v.Config})
|
|
} else if v.Config != "" {
|
|
go s.fireEvent(Event{Event: EventUpdate, Key: v.Name, Value: v.Config})
|
|
} else {
|
|
go s.fireEvent(Event{Event: EventRemove, Key: v.Name, Value: v.Config})
|
|
}
|
|
news[v.Name] = &Value{val: v.Config, raw: v.Config}
|
|
}
|
|
for k, v := range all {
|
|
if _, ok := news[k]; !ok {
|
|
news[k] = v
|
|
}
|
|
}
|
|
s.values.Store(news)
|
|
ver = v.Version
|
|
retry = 0
|
|
}
|
|
}
|
|
|
|
type version struct {
|
|
Version int64 `json:"version"`
|
|
Diffs []int64 `json:"diffs"`
|
|
}
|
|
|
|
type config struct {
|
|
Version int64 `json:"version"`
|
|
Content string `json:"content"`
|
|
Md5 string `json:"md5"`
|
|
}
|
|
|
|
type content struct {
|
|
Cid int64 `json:"cid"`
|
|
Name string `json:"name"`
|
|
Config string `json:"config"`
|
|
}
|
|
|
|
func (s *sven) check(ver int64) (v *version, err error) {
|
|
params := newParams()
|
|
params.Set("version", strconv.FormatInt(ver, 10))
|
|
params.Set("appoint", svenAppoint)
|
|
var res struct {
|
|
Code int `json:"code"`
|
|
Data *version `json:"data"`
|
|
}
|
|
uri := fmt.Sprintf(_apiCheck, svenHost, params.Encode())
|
|
if _debug {
|
|
fmt.Printf("paladin: check(%d) uri(%s)\n", ver, uri)
|
|
}
|
|
req, err := http.NewRequest("GET", uri, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
resp, err := s.httpCli.Do(req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
err = errors.Errorf("paladin: httpCli.GET(%s) error(%d)", params.Encode(), resp.StatusCode)
|
|
return
|
|
}
|
|
b, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if err = json.Unmarshal(b, &res); err != nil {
|
|
return
|
|
}
|
|
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
|
|
err = ec
|
|
return
|
|
}
|
|
if res.Data == nil {
|
|
err = errors.Errorf("paladin: http version is nil. params(%s)", params.Encode())
|
|
return
|
|
}
|
|
v = res.Data
|
|
return
|
|
}
|
|
|
|
func (s *sven) config(ver *version) (cts []*content, err error) {
|
|
ids, _ := json.Marshal(ver.Diffs)
|
|
params := newParams()
|
|
params.Set("version", strconv.FormatInt(ver.Version, 10))
|
|
params.Set("ids", string(ids))
|
|
var res struct {
|
|
Code int `json:"code"`
|
|
Data *config `json:"data"`
|
|
}
|
|
uri := fmt.Sprintf(_apiGet, svenHost, params.Encode())
|
|
if _debug {
|
|
fmt.Printf("paladin: config(%+v) uri(%s)\n", ver, uri)
|
|
}
|
|
req, err := http.NewRequest("GET", uri, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
resp, err := s.httpCli.Do(req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
err = errors.Errorf("paladin: httpCli.GET(%s) error(%d)", params.Encode(), resp.StatusCode)
|
|
return
|
|
}
|
|
b, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if err = json.Unmarshal(b, &res); err != nil {
|
|
return
|
|
}
|
|
if !ecode.Int(res.Code).Equal(ecode.OK) || res.Data == nil {
|
|
err = errors.Errorf("paladin: http config is nil. params(%s) ecode(%d)", params.Encode(), res.Code)
|
|
return
|
|
}
|
|
if err = json.Unmarshal([]byte(res.Data.Content), &cts); err != nil {
|
|
return
|
|
}
|
|
for _, c := range cts {
|
|
if err = ioutil.WriteFile(path.Join(svenPath, c.Name), []byte(c.Config), 0644); err != nil {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func newParams() url.Values {
|
|
params := url.Values{}
|
|
params.Set("service", serviceName())
|
|
params.Set("build", svenVersion)
|
|
params.Set("token", svenToken)
|
|
params.Set("hostname", env.Hostname)
|
|
params.Set("ip", ipAddr())
|
|
return params
|
|
}
|
|
|
|
func ipAddr() string {
|
|
if env.IP != "" {
|
|
return env.IP
|
|
}
|
|
return xip.InternalIP()
|
|
}
|
|
|
|
func serviceName() string {
|
|
return fmt.Sprintf("%s_%s_%s", svenTreeid, env.DeployEnv, env.Zone)
|
|
}
|