Files
go-common/app/infra/databus/service/service.go
2019-04-22 18:49:16 +08:00

102 lines
2.2 KiB
Go

package service
import (
"context"
"time"
"go-common/app/infra/databus/conf"
"go-common/app/infra/databus/dao"
"go-common/app/infra/databus/model"
"go-common/library/log"
"go-common/library/stat/prom"
)
const (
_authUpdateInterval = 1 * time.Minute
)
// Service service instance
type Service struct {
dao *dao.Dao
// auth
auths map[string]*model.Auth
// the auth of cluster changed
clusterChan chan model.Auth
// stats prom
StatProm *prom.Prom
CountProm *prom.Prom
TimeProm *prom.Prom
}
// New new and return service
func New(c *conf.Config) (s *Service) {
s = &Service{
dao: dao.New(c),
// cluster
clusterChan: make(chan model.Auth, 5),
// stats prom
StatProm: prom.New().WithState("go_databus_state", []string{"role", "group", "topic", "partition"}),
// count prom: count consumer and producer partition speed
CountProm: prom.New().WithState("go_databus_counter", []string{"operation", "group", "topic"}),
TimeProm: prom.New().WithTimer("go_databus_timer", []string{"group"}),
}
s.fillAuth()
go s.proc()
return
}
// Ping check mysql connection
func (s *Service) Ping(c context.Context) error {
return s.dao.Ping(c)
}
// Close close mysql connection
func (s *Service) Close() {
if s.dao != nil {
s.dao.Close()
}
}
func (s *Service) proc() {
for {
s.fillAuth()
time.Sleep(_authUpdateInterval)
}
}
func (s *Service) fillAuth() (err error) {
auths, err := s.dao.Auth(context.Background())
if err != nil {
log.Error("service.fillAuth error(%v)", err)
return
}
var changed []*model.Auth
// check cluster change event
for group, nw := range auths {
old, ok := s.auths[group]
if !ok {
continue
}
if old.Cluster != nw.Cluster {
changed = append(changed, old)
log.Info("cluster changed group(%s) topic(%s) oldCluster(%s) newCluster(%s)", old.Group, old.Topic, old.Cluster, nw.Cluster)
}
}
s.auths = auths
for _, ch := range changed {
s.clusterChan <- *ch
}
return
}
// AuthApp check auth from cache
func (s *Service) AuthApp(group string) (a *model.Auth, ok bool) {
a, ok = s.auths[group]
return
}
// ClusterEvent return cluster change event
func (s *Service) ClusterEvent() (group <-chan model.Auth) {
return s.clusterChan
}