153 lines
3.3 KiB
Go
153 lines
3.3 KiB
Go
|
package databus_test
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"go-common/library/naming/discovery"
|
||
|
"go-common/library/queue/databus"
|
||
|
xtime "go-common/library/time"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
pCfg = &databus.Config{
|
||
|
// Key: "0PvKGhAqDvsK7zitmS8t",
|
||
|
// Secret: "0PvKGhAqDvsK7zitmS8u",
|
||
|
// Group: "databus_test_group",
|
||
|
// Topic: "databus_test_topic",
|
||
|
Key: "dbe67e6a4c36f877",
|
||
|
Secret: "8c775ea242caa367ba5c876c04576571",
|
||
|
Group: "Test1-MainCommonArch-P",
|
||
|
Topic: "test1",
|
||
|
Action: "pub",
|
||
|
Name: "databus",
|
||
|
Proto: "tcp",
|
||
|
// Addr: "172.16.33.158:6205",
|
||
|
Addr: "172.18.33.50:6205",
|
||
|
Active: 10,
|
||
|
Idle: 5,
|
||
|
DialTimeout: xtime.Duration(time.Second),
|
||
|
WriteTimeout: xtime.Duration(time.Second),
|
||
|
ReadTimeout: xtime.Duration(time.Second),
|
||
|
IdleTimeout: xtime.Duration(time.Minute),
|
||
|
}
|
||
|
sCfg = &databus.Config{
|
||
|
// Key: "0PvKGhAqDvsK7zitmS8t",
|
||
|
// Secret: "0PvKGhAqDvsK7zitmS8u",
|
||
|
// Group: "databus_test_group",
|
||
|
// Topic: "databus_test_topic",
|
||
|
Key: "dbe67e6a4c36f877",
|
||
|
Secret: "8c775ea242caa367ba5c876c04576571",
|
||
|
Group: "Test1-MainCommonArch-S",
|
||
|
Topic: "test1",
|
||
|
Action: "sub",
|
||
|
Name: "databus",
|
||
|
Proto: "tcp",
|
||
|
// Addr: "172.16.33.158:6205",
|
||
|
Addr: "172.18.33.50:6205",
|
||
|
Active: 10,
|
||
|
Idle: 5,
|
||
|
DialTimeout: xtime.Duration(time.Second),
|
||
|
WriteTimeout: xtime.Duration(time.Second),
|
||
|
ReadTimeout: xtime.Duration(time.Second * 35),
|
||
|
IdleTimeout: xtime.Duration(time.Minute),
|
||
|
}
|
||
|
dCfg = &discovery.Config{
|
||
|
Nodes: []string{"172.18.33.50:7171"},
|
||
|
Key: "0c4b8fe3ff35a4b6",
|
||
|
Secret: "b370880d1aca7d3a289b9b9a7f4d6812",
|
||
|
Zone: "sh001",
|
||
|
Env: "uat",
|
||
|
}
|
||
|
)
|
||
|
|
||
|
type TestMsg struct {
|
||
|
Now int64 `json:"now"`
|
||
|
}
|
||
|
|
||
|
func testSub(t *testing.T, d *databus.Databus) {
|
||
|
for {
|
||
|
m, ok := <-d.Messages()
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
t.Logf("sub message: %s", string(m.Value))
|
||
|
if err := m.Commit(); err != nil {
|
||
|
t.Errorf("sub commit error(%v)\n", err)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func testPub(t *testing.T, d *databus.Databus) {
|
||
|
// pub
|
||
|
m := &TestMsg{Now: time.Now().UnixNano()}
|
||
|
if err := d.Send(context.TODO(), "test", m); err != nil {
|
||
|
t.Errorf("d.Send(test) error(%v)", err)
|
||
|
} else {
|
||
|
t.Logf("pub message %v", m)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestDatabus(t *testing.T) {
|
||
|
d := databus.New(pCfg)
|
||
|
// pub
|
||
|
testPub(t, d)
|
||
|
testPub(t, d)
|
||
|
testPub(t, d)
|
||
|
d.Close()
|
||
|
|
||
|
// sub
|
||
|
d = databus.New(sCfg)
|
||
|
go testSub(t, d)
|
||
|
|
||
|
time.Sleep(time.Second * 15)
|
||
|
d.Close()
|
||
|
}
|
||
|
|
||
|
func TestDiscoveryDatabus(t *testing.T) {
|
||
|
d := databus.New(pCfg)
|
||
|
// pub
|
||
|
testPub(t, d)
|
||
|
testPub(t, d)
|
||
|
testPub(t, d)
|
||
|
d.Close()
|
||
|
|
||
|
// sub
|
||
|
d = databus.New(sCfg)
|
||
|
go testSub(t, d)
|
||
|
|
||
|
time.Sleep(time.Second * 15)
|
||
|
d.Close()
|
||
|
}
|
||
|
|
||
|
func BenchmarkPub(b *testing.B) {
|
||
|
d := databus.New(pCfg)
|
||
|
defer d.Close()
|
||
|
b.ResetTimer()
|
||
|
b.RunParallel(func(pb *testing.PB) {
|
||
|
for pb.Next() {
|
||
|
m := &TestMsg{Now: time.Now().UnixNano()}
|
||
|
if err := d.Send(context.TODO(), "test", m); err != nil {
|
||
|
b.Errorf("d.Send(test) error(%v)", err)
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func BenchmarkDiscoveryPub(b *testing.B) {
|
||
|
d := databus.New(pCfg)
|
||
|
defer d.Close()
|
||
|
b.ResetTimer()
|
||
|
b.RunParallel(func(pb *testing.PB) {
|
||
|
for pb.Next() {
|
||
|
m := &TestMsg{Now: time.Now().UnixNano()}
|
||
|
if err := d.Send(context.TODO(), "test", m); err != nil {
|
||
|
b.Errorf("d.Send(test) error(%v)", err)
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
})
|
||
|
}
|