go-common/library/log/agent.go

220 lines
4.6 KiB
Go

package log
import (
"context"
"fmt"
stdlog "log"
"net"
"strconv"
"sync"
"time"
"go-common/library/conf/env"
"go-common/library/log/internal"
"go-common/library/net/metadata"
"go-common/library/net/trace"
xtime "go-common/library/time"
)
const (
_agentTimeout = xtime.Duration(20 * time.Millisecond)
_mergeWait = 1 * time.Second
_maxBuffer = 10 * 1024 * 1024 // 10mb
_defaultChan = 2048
_defaultAgentConfig = "unixpacket:///var/run/lancer/collector_tcp.sock?timeout=100ms&chan=1024"
)
var (
_logSeparator = []byte("\u0001")
_defaultTaskIDs = map[string]string{
env.DeployEnvFat1: "000069",
env.DeployEnvUat: "000069",
env.DeployEnvPre: "000161",
env.DeployEnvProd: "000161",
}
)
// AgentHandler agent struct.
type AgentHandler struct {
c *AgentConfig
msgs chan []core.Field
waiter sync.WaitGroup
pool sync.Pool
enc core.Encoder
batchSend bool
filters map[string]struct{}
}
// AgentConfig agent config.
type AgentConfig struct {
TaskID string
Buffer int
Proto string `dsn:"network"`
Addr string `dsn:"address"`
Chan int `dsn:"query.chan"`
Timeout xtime.Duration `dsn:"query.timeout"`
}
// NewAgent a Agent.
func NewAgent(ac *AgentConfig) (a *AgentHandler) {
if ac == nil {
ac = parseDSN(_agentDSN)
}
if len(ac.TaskID) == 0 {
ac.TaskID = _defaultTaskIDs[env.DeployEnv]
}
a = &AgentHandler{
c: ac,
enc: core.NewJSONEncoder(core.EncoderConfig{
EncodeTime: core.EpochTimeEncoder,
EncodeDuration: core.SecondsDurationEncoder,
}, core.NewBuffer(0)),
}
a.pool.New = func() interface{} {
return make([]core.Field, 0, 16)
}
if ac.Chan == 0 {
ac.Chan = _defaultChan
}
a.msgs = make(chan []core.Field, ac.Chan)
if ac.Timeout == 0 {
ac.Timeout = _agentTimeout
}
if ac.Buffer == 0 {
ac.Buffer = 100
}
a.waiter.Add(1)
// set fixed k/v into enc buffer
KV(_appID, c.Family).AddTo(a.enc)
KV(_deplyEnv, env.DeployEnv).AddTo(a.enc)
KV(_instanceID, c.Host).AddTo(a.enc)
KV(_zone, env.Zone).AddTo(a.enc)
if a.c.Proto == "unixpacket" {
a.batchSend = true
}
go a.writeproc()
return
}
func (h *AgentHandler) data() []core.Field {
return h.pool.Get().([]core.Field)
}
func (h *AgentHandler) free(f []core.Field) {
f = f[0:0]
h.pool.Put(f)
}
// Log log to udp statsd daemon.
func (h *AgentHandler) Log(ctx context.Context, lv Level, args ...D) {
if args == nil {
return
}
f := h.data()
for i := range args {
f = append(f, args[i])
}
if t, ok := trace.FromContext(ctx); ok {
if s, ok := t.(fmt.Stringer); ok {
f = append(f, KV(_tid, s.String()))
} else {
f = append(f, KV(_tid, fmt.Sprintf("%s", t)))
}
}
if caller := metadata.String(ctx, metadata.Caller); caller != "" {
f = append(f, KV(_caller, caller))
}
if color := metadata.String(ctx, metadata.Color); color != "" {
f = append(f, KV(_color, color))
}
if cluster := metadata.String(ctx, metadata.Cluster); cluster != "" {
f = append(f, KV(_cluster, cluster))
}
if metadata.Bool(ctx, metadata.Mirror) {
f = append(f, KV(_mirror, true))
}
select {
case h.msgs <- f:
default:
}
}
// writeproc write data into connection.
func (h *AgentHandler) writeproc() {
var (
conn net.Conn
err error
count int
quit bool
)
buf := core.NewBuffer(2048)
defer h.waiter.Done()
taskID := []byte(h.c.TaskID)
tick := time.NewTicker(_mergeWait)
for {
select {
case d := <-h.msgs:
if d == nil {
quit = true
goto DUMP
}
if buf.Len() >= _maxBuffer {
buf.Reset() // avoid oom
}
now := time.Now()
buf.Write(taskID)
buf.Write([]byte(strconv.FormatInt(now.UnixNano()/1e6, 10)))
h.enc.Encode(buf, d...)
h.free(d)
if h.batchSend {
buf.Write(_logSeparator)
if count++; count < h.c.Buffer && buf.Len() < _maxBuffer {
continue
}
}
case <-tick.C:
}
if conn == nil || err != nil {
if conn, err = net.DialTimeout(h.c.Proto, h.c.Addr, time.Duration(h.c.Timeout)); err != nil {
stdlog.Printf("net.DialTimeout(%s:%s) error(%v)\n", h.c.Proto, h.c.Addr, err)
continue
}
}
DUMP:
if conn != nil && buf.Len() > 0 {
count = 0
if _, err = conn.Write(buf.Bytes()); err != nil {
stdlog.Printf("conn.Write(%d bytes) error(%v)\n", buf.Len(), err)
conn.Close()
} else {
// only succeed reset buffer, let conn reconnect.
buf.Reset()
}
}
if quit {
if conn != nil && err == nil {
conn.Close()
}
return
}
}
}
// Close close the connection.
func (h *AgentHandler) Close() (err error) {
h.msgs <- nil
h.waiter.Wait()
return nil
}
// SetFormat .
func (h *AgentHandler) SetFormat(string) {
// discard setformat
}