102 lines
2.2 KiB
Go
102 lines
2.2 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"go-common/app/infra/databus/conf"
|
|
"go-common/app/infra/databus/dao"
|
|
"go-common/app/infra/databus/model"
|
|
"go-common/library/log"
|
|
"go-common/library/stat/prom"
|
|
)
|
|
|
|
const (
|
|
_authUpdateInterval = 1 * time.Minute
|
|
)
|
|
|
|
// Service service instance
|
|
type Service struct {
|
|
dao *dao.Dao
|
|
// auth
|
|
auths map[string]*model.Auth
|
|
// the auth of cluster changed
|
|
clusterChan chan model.Auth
|
|
// stats prom
|
|
StatProm *prom.Prom
|
|
CountProm *prom.Prom
|
|
TimeProm *prom.Prom
|
|
}
|
|
|
|
// New new and return service
|
|
func New(c *conf.Config) (s *Service) {
|
|
s = &Service{
|
|
dao: dao.New(c),
|
|
// cluster
|
|
clusterChan: make(chan model.Auth, 5),
|
|
// stats prom
|
|
StatProm: prom.New().WithState("go_databus_state", []string{"role", "group", "topic", "partition"}),
|
|
// count prom: count consumer and producer partition speed
|
|
CountProm: prom.New().WithState("go_databus_counter", []string{"operation", "group", "topic"}),
|
|
TimeProm: prom.New().WithTimer("go_databus_timer", []string{"group"}),
|
|
}
|
|
s.fillAuth()
|
|
go s.proc()
|
|
return
|
|
}
|
|
|
|
// Ping check mysql connection
|
|
func (s *Service) Ping(c context.Context) error {
|
|
return s.dao.Ping(c)
|
|
}
|
|
|
|
// Close close mysql connection
|
|
func (s *Service) Close() {
|
|
if s.dao != nil {
|
|
s.dao.Close()
|
|
}
|
|
}
|
|
|
|
func (s *Service) proc() {
|
|
for {
|
|
s.fillAuth()
|
|
time.Sleep(_authUpdateInterval)
|
|
}
|
|
}
|
|
|
|
func (s *Service) fillAuth() (err error) {
|
|
auths, err := s.dao.Auth(context.Background())
|
|
if err != nil {
|
|
log.Error("service.fillAuth error(%v)", err)
|
|
return
|
|
}
|
|
var changed []*model.Auth
|
|
// check cluster change event
|
|
for group, nw := range auths {
|
|
old, ok := s.auths[group]
|
|
if !ok {
|
|
continue
|
|
}
|
|
if old.Cluster != nw.Cluster {
|
|
changed = append(changed, old)
|
|
log.Info("cluster changed group(%s) topic(%s) oldCluster(%s) newCluster(%s)", old.Group, old.Topic, old.Cluster, nw.Cluster)
|
|
}
|
|
}
|
|
s.auths = auths
|
|
for _, ch := range changed {
|
|
s.clusterChan <- *ch
|
|
}
|
|
return
|
|
}
|
|
|
|
// AuthApp check auth from cache
|
|
func (s *Service) AuthApp(group string) (a *model.Auth, ok bool) {
|
|
a, ok = s.auths[group]
|
|
return
|
|
}
|
|
|
|
// ClusterEvent return cluster change event
|
|
func (s *Service) ClusterEvent() (group <-chan model.Auth) {
|
|
return s.clusterChan
|
|
}
|