Files
go-common/library/net/rpc/warden/balancer/wrr/test/base_test.go
2019-04-22 18:49:16 +08:00

299 lines
7.0 KiB
Go

package test
import (
"context"
"io"
"log"
"os"
"sync"
"testing"
"time"
"go-common/library/conf/env"
"go-common/library/naming"
"go-common/library/net/netutil/breaker"
"go-common/library/net/rpc/warden"
"go-common/library/net/rpc/warden/balancer/wrr"
pb "go-common/library/net/rpc/warden/proto/testproto"
"go-common/library/net/rpc/warden/resolver"
xtime "go-common/library/time"
"google.golang.org/grpc"
)
type testBuilder struct {
addrs []*naming.Instance
}
type testDiscovery struct {
mu sync.Mutex
b *testBuilder
id string
ch chan struct{}
}
func (b *testBuilder) Build(id string) naming.Resolver {
return &testDiscovery{id: id, b: b}
}
func (b *testBuilder) Scheme() string {
return "testbuilder"
}
func (d *testDiscovery) Fetch(ctx context.Context) (map[string][]*naming.Instance, bool) {
d.mu.Lock()
addrs := d.b.addrs
d.mu.Unlock()
if len(addrs) == 0 {
return nil, false
}
return map[string][]*naming.Instance{env.Zone: addrs}, true
}
func (d *testDiscovery) Watch() <-chan struct{} {
d.mu.Lock()
defer d.mu.Unlock()
if d.ch == nil {
d.ch = make(chan struct{}, 1)
}
return d.ch
}
func (d *testDiscovery) Close() error {
return nil
}
func (d *testDiscovery) Scheme() string {
return "discovery"
}
func (d *testDiscovery) set(addrs []*naming.Instance) {
d.mu.Lock()
defer d.mu.Unlock()
d.b.addrs = addrs
select {
case d.ch <- struct{}{}:
default:
return
}
}
func TestMain(m *testing.M) {
s1 := runServer(":18080")
s2 := runServer(":18081")
s3 := runServer(":18082")
b = &testBuilder{}
resolver.Register(b)
dis = b.Build("test_app").(*testDiscovery)
go func() {
time.Sleep(time.Millisecond * 10)
dis.set([]*naming.Instance{{
Addrs: []string{"grpc://127.0.0.1:18080"},
AppID: "test_app",
Metadata: map[string]string{"weight": "100"},
}, {
Addrs: []string{"grpc://127.0.0.1:18081"},
AppID: "test_app",
Metadata: map[string]string{"color": "red"},
}, {
Addrs: []string{"grpc://127.0.0.1:18082"},
AppID: "test_app",
}})
}()
c = newClient()
time.Sleep(time.Millisecond * 30)
ret := m.Run()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
s1.Shutdown(ctx)
s2.Shutdown(ctx)
s3.Shutdown(ctx)
os.Exit(ret)
}
type helloServer struct {
addr string
}
func (s *helloServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: s.addr}, nil
}
func (s *helloServer) StreamHello(ss pb.Greeter_StreamHelloServer) error {
for i := 0; i < 3; i++ {
in, err := ss.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
ret := &pb.HelloReply{Message: "Hello " + in.Name, Success: true}
err = ss.Send(ret)
if err != nil {
return err
}
}
return nil
}
func runServer(addr string) *warden.Server {
server := warden.NewServer(&warden.ServerConfig{Timeout: xtime.Duration(time.Second)})
pb.RegisterGreeterServer(server.Server(), &helloServer{addr: addr})
go func() {
err := server.Run(addr)
if err != nil {
panic("run server failed!" + err.Error())
}
}()
return server
}
// NewClient returns a new blank Client instance with a default client interceptor.
// opt can be used to add grpc dial options.
func newClient() (client pb.GreeterClient) {
c := warden.NewClient(&warden.ClientConfig{
Dial: xtime.Duration(time.Second * 10),
Timeout: xtime.Duration(time.Second * 10),
Breaker: &breaker.Config{
Window: xtime.Duration(3 * time.Second),
Sleep: xtime.Duration(3 * time.Second),
Bucket: 10,
Ratio: 0.3,
Request: 20,
},
},
grpc.WithBalancerName(wrr.Name),
)
conn, err := c.Dial(context.Background(), "discovery://authority/111")
if err != nil {
log.Fatalf("can't not connect: %v", err)
}
client = pb.NewGreeterClient(conn)
return
}
var b *testBuilder
var dis *testDiscovery
var c pb.GreeterClient
func TestBalancer(t *testing.T) {
testBalancerBasic(t)
testBalancerFailover(t)
testBalancerUpdateColor(t)
testBalancerUpdateScore(t)
}
func testBalancerBasic(t *testing.T) {
time.Sleep(time.Millisecond * 10)
var idx8080 int
var idx8082 int
for i := 0; i < 6; i++ {
resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
if err != nil {
t.Fatalf("testBalancerBasic: say hello failed!err:=%v", err)
}
if resp.Message == ":18082" {
idx8082++
} else if resp.Message == ":18080" {
idx8080++
}
}
if idx8080 != 3 {
t.Fatalf("testBalancerBasic: server 18080 response times should be 3")
}
if idx8082 != 3 {
t.Fatalf("testBalancerBasic: server 18082 response times should be 3")
}
}
func testBalancerFailover(t *testing.T) {
dis.set([]*naming.Instance{{
Addrs: []string{"grpc://127.0.0.1:18080"},
AppID: "test_app",
Metadata: map[string]string{"weight": "100"},
}, {
Addrs: []string{"grpc://127.0.0.1:18081"},
AppID: "test_app",
Metadata: map[string]string{"color": "red"},
}})
time.Sleep(time.Millisecond * 20)
var idx8080 int
var idx8082 int
for i := 0; i < 4; i++ {
resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
if err != nil {
t.Fatalf("testBalancerFailover: say hello failed!err:=%v", err)
}
if resp.Message == ":18082" {
idx8082++
} else if resp.Message == ":18080" {
idx8080++
}
}
if idx8080 != 4 {
t.Fatalf("testBalancerFailover: server 8080 response should be 4")
}
}
func testBalancerUpdateColor(t *testing.T) {
dis.set([]*naming.Instance{{
Addrs: []string{"grpc://127.0.0.1:18080"},
AppID: "test_app",
Metadata: map[string]string{"weight": "100"},
}, {
Addrs: []string{"grpc://127.0.0.1:18081"},
AppID: "test_app",
}})
time.Sleep(time.Millisecond * 30)
var idx8080 int
var idx8081 int
for i := 0; i < 4; i++ {
resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
if err != nil {
t.Fatalf("testBalancerUpdateColor: say hello failed!err:=%v", err)
}
if resp.Message == ":18081" {
idx8081++
} else if resp.Message == ":18080" {
idx8080++
}
}
if idx8080 != 2 {
t.Fatalf("testBalancerUpdateColor: server 8080 response should be 2")
}
if idx8081 != 2 {
t.Fatalf("testBalancerUpdateColor: server 8081 response should be 2")
}
}
func testBalancerUpdateScore(t *testing.T) {
dis.set([]*naming.Instance{{
Addrs: []string{"grpc://127.0.0.1:18080"},
AppID: "test_app",
Metadata: map[string]string{"weight": "100"},
}, {
Addrs: []string{"grpc://127.0.0.1:18081"},
AppID: "test_app",
Metadata: map[string]string{"weight": "300"},
}})
time.Sleep(time.Millisecond * 10)
var idx8080 int
var idx8081 int
for i := 0; i < 4; i++ {
resp, err := c.SayHello(context.Background(), &pb.HelloRequest{Age: 123, Name: "asdasd"})
if err != nil {
t.Fatalf("testBalancerUpdateScore: say hello failed!err:=%v", err)
}
if resp.Message == ":18081" {
idx8081++
} else if resp.Message == ":18080" {
idx8080++
}
}
if idx8080 != 1 {
t.Fatalf("testBalancerUpdateScore: server 8080 response should be 2")
}
if idx8081 != 3 {
t.Fatalf("testBalancerUpdateScore: server 8081 response should be 2")
}
}