224 lines
3.9 KiB
Go
224 lines
3.9 KiB
Go
package databusutil
|
|
|
|
import (
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"go-common/library/queue/databus"
|
|
xtime "go-common/library/time"
|
|
)
|
|
|
|
const (
|
|
_stateStarted = 1
|
|
_stateClosed = 2
|
|
)
|
|
|
|
// Config the config is the base configuration for initiating a new group.
|
|
type Config struct {
|
|
// Size merge size
|
|
Size int
|
|
// Num merge goroutine num
|
|
Num int
|
|
// Ticker duration of submit merges when no new message
|
|
Ticker xtime.Duration
|
|
// Chan size of merge chan and done chan
|
|
Chan int
|
|
}
|
|
|
|
func (c *Config) fix() {
|
|
if c.Size <= 0 {
|
|
c.Size = 1024
|
|
}
|
|
if int64(c.Ticker) <= 0 {
|
|
c.Ticker = xtime.Duration(time.Second * 5)
|
|
}
|
|
if c.Num <= 0 {
|
|
c.Num = runtime.GOMAXPROCS(0)
|
|
}
|
|
if c.Chan <= 0 {
|
|
c.Chan = 1024
|
|
}
|
|
}
|
|
|
|
type message struct {
|
|
next *message
|
|
data *databus.Message
|
|
object interface{}
|
|
done bool
|
|
}
|
|
|
|
// Group group.
|
|
type Group struct {
|
|
c *Config
|
|
head, last *message
|
|
state int
|
|
mu sync.Mutex
|
|
|
|
mc []chan *message // merge chan
|
|
dc chan []*message // done chan
|
|
qc chan struct{} // quit chan
|
|
msg <-chan *databus.Message
|
|
|
|
New func(msg *databus.Message) (interface{}, error)
|
|
Split func(msg *databus.Message, data interface{}) int
|
|
Do func(msgs []interface{})
|
|
|
|
pool *sync.Pool
|
|
}
|
|
|
|
// NewGroup new a group.
|
|
func NewGroup(c *Config, m <-chan *databus.Message) *Group {
|
|
// NOTE if c || m == nil runtime panic
|
|
if c == nil {
|
|
c = new(Config)
|
|
}
|
|
c.fix()
|
|
g := &Group{
|
|
c: c,
|
|
msg: m,
|
|
|
|
mc: make([]chan *message, c.Num),
|
|
dc: make(chan []*message, c.Chan),
|
|
qc: make(chan struct{}),
|
|
|
|
pool: &sync.Pool{
|
|
New: func() interface{} {
|
|
return new(message)
|
|
},
|
|
},
|
|
}
|
|
for i := 0; i < c.Num; i++ {
|
|
g.mc[i] = make(chan *message, c.Chan)
|
|
}
|
|
return g
|
|
}
|
|
|
|
// Start start group, it is safe for concurrent use by multiple goroutines.
|
|
func (g *Group) Start() {
|
|
g.mu.Lock()
|
|
if g.state == _stateStarted {
|
|
g.mu.Unlock()
|
|
return
|
|
}
|
|
g.state = _stateStarted
|
|
g.mu.Unlock()
|
|
go g.consumeproc()
|
|
for i := 0; i < g.c.Num; i++ {
|
|
go g.mergeproc(g.mc[i])
|
|
}
|
|
go g.commitproc()
|
|
}
|
|
|
|
// Close close group, it is safe for concurrent use by multiple goroutines.
|
|
func (g *Group) Close() (err error) {
|
|
g.mu.Lock()
|
|
if g.state == _stateClosed {
|
|
g.mu.Unlock()
|
|
return
|
|
}
|
|
g.state = _stateClosed
|
|
g.mu.Unlock()
|
|
close(g.qc)
|
|
return
|
|
}
|
|
|
|
func (g *Group) message() *message {
|
|
return g.pool.Get().(*message)
|
|
}
|
|
|
|
func (g *Group) freeMessage(m *message) {
|
|
*m = message{}
|
|
g.pool.Put(m)
|
|
}
|
|
|
|
func (g *Group) consumeproc() {
|
|
var (
|
|
ok bool
|
|
err error
|
|
msg *databus.Message
|
|
)
|
|
for {
|
|
select {
|
|
case <-g.qc:
|
|
return
|
|
case msg, ok = <-g.msg:
|
|
if !ok {
|
|
g.Close()
|
|
return
|
|
}
|
|
}
|
|
// marked head to first commit
|
|
m := g.message()
|
|
m.data = msg
|
|
if m.object, err = g.New(msg); err != nil {
|
|
g.freeMessage(m)
|
|
continue
|
|
}
|
|
g.mu.Lock()
|
|
if g.head == nil {
|
|
g.head = m
|
|
g.last = m
|
|
} else {
|
|
g.last.next = m
|
|
g.last = m
|
|
}
|
|
g.mu.Unlock()
|
|
g.mc[g.Split(m.data, m.object)%g.c.Num] <- m
|
|
}
|
|
}
|
|
|
|
func (g *Group) mergeproc(mc <-chan *message) {
|
|
ticker := time.NewTicker(time.Duration(g.c.Ticker))
|
|
msgs := make([]interface{}, 0, g.c.Size)
|
|
marks := make([]*message, 0, g.c.Size)
|
|
for {
|
|
select {
|
|
case <-g.qc:
|
|
return
|
|
case msg := <-mc:
|
|
msgs = append(msgs, msg.object)
|
|
marks = append(marks, msg)
|
|
if len(msgs) < g.c.Size {
|
|
continue
|
|
}
|
|
case <-ticker.C:
|
|
}
|
|
if len(msgs) > 0 {
|
|
g.Do(msgs)
|
|
msgs = make([]interface{}, 0, g.c.Size)
|
|
}
|
|
if len(marks) > 0 {
|
|
g.dc <- marks
|
|
marks = make([]*message, 0, g.c.Size)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (g *Group) commitproc() {
|
|
commits := make(map[int32]*databus.Message)
|
|
for {
|
|
select {
|
|
case <-g.qc:
|
|
return
|
|
case done := <-g.dc:
|
|
// merge partitions to commit offset
|
|
for _, d := range done {
|
|
d.done = true
|
|
}
|
|
g.mu.Lock()
|
|
for g.head != nil && g.head.done {
|
|
cur := g.head
|
|
commits[cur.data.Partition] = cur.data
|
|
g.head = cur.next
|
|
g.freeMessage(cur)
|
|
}
|
|
g.mu.Unlock()
|
|
for k, m := range commits {
|
|
m.Commit()
|
|
delete(commits, k)
|
|
}
|
|
}
|
|
}
|
|
}
|