124 lines
3.1 KiB
Go
124 lines
3.1 KiB
Go
package service
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"go-common/app/infra/canal/model"
|
|
|
|
pb "github.com/pingcap/tidb-tools/tidb_binlog/slave_binlog_proto/go-binlog"
|
|
)
|
|
|
|
// lower case column field type in mysql
|
|
// https://dev.mysql.com/doc/refman/8.0/en/data-types.html
|
|
// for numeric type: int bigint smallint tinyint float double decimal bit
|
|
// for string type: text longtext mediumtext char tinytext varchar
|
|
// blob longblog mediumblog binary tinyblob varbinary
|
|
// enum set
|
|
// for json type: json
|
|
|
|
// for text and char type, string_value is set
|
|
// for blob and binary type, bytes_value is set
|
|
// for enum, set, uint64_value is set
|
|
// for json, bytes_value is set
|
|
|
|
func tidbMakeData(m *msg) (data *model.Data, err error) {
|
|
action := m.mu.GetType()
|
|
if (action != pb.MutationType_Insert) && (action != pb.MutationType_Delete) && (action != pb.MutationType_Update) {
|
|
err = errInvalidAction
|
|
return
|
|
}
|
|
data = &model.Data{
|
|
Action: strings.ToLower(action.String()),
|
|
Table: m.table,
|
|
}
|
|
var keys []string
|
|
switch action {
|
|
case pb.MutationType_Insert, pb.MutationType_Delete:
|
|
var values = m.mu.GetRow().GetColumns()
|
|
for i, c := range m.columns {
|
|
for _, key := range m.keys {
|
|
if c.Name == key {
|
|
keys = append(keys, columnToString(values[i]))
|
|
break
|
|
}
|
|
}
|
|
if m.ignore[c.Name] {
|
|
continue
|
|
}
|
|
if data.New == nil {
|
|
data.New = make(map[string]interface{}, len(m.columns))
|
|
}
|
|
if strings.Contains(c.GetMysqlType(), "binary") {
|
|
data.New[c.Name] = base64.StdEncoding.EncodeToString(values[i].GetBytesValue())
|
|
continue
|
|
}
|
|
data.New[c.Name] = columnToValue(values[i])
|
|
}
|
|
case pb.MutationType_Update:
|
|
if m.mu.Row == nil || m.mu.ChangeRow == nil {
|
|
err = errInvalidUpdate
|
|
return
|
|
}
|
|
var oldValues = m.mu.GetChangeRow().GetColumns()
|
|
var newValues = m.mu.GetRow().GetColumns()
|
|
for i, c := range m.columns {
|
|
for _, key := range m.keys {
|
|
if c.Name == key {
|
|
keys = append(keys, columnToString(newValues[i]))
|
|
break
|
|
}
|
|
}
|
|
if m.ignore[c.Name] {
|
|
continue
|
|
}
|
|
if data.New == nil {
|
|
data.New = make(map[string]interface{}, len(m.columns))
|
|
}
|
|
if data.Old == nil {
|
|
data.Old = make(map[string]interface{}, len(m.columns))
|
|
}
|
|
if strings.Contains(c.GetMysqlType(), "binary") {
|
|
data.Old[c.Name] = base64.StdEncoding.EncodeToString(oldValues[i].GetBytesValue())
|
|
data.New[c.Name] = base64.StdEncoding.EncodeToString(newValues[i].GetBytesValue())
|
|
continue
|
|
}
|
|
data.Old[c.Name] = columnToValue(oldValues[i])
|
|
data.New[c.Name] = columnToValue(newValues[i])
|
|
}
|
|
}
|
|
if len(keys) == 0 {
|
|
data.Key = columnToString(m.mu.GetRow().GetColumns()[0])
|
|
} else {
|
|
data.Key = strings.Join(keys, ",")
|
|
}
|
|
if data.New == nil && data.Old == nil {
|
|
data = nil
|
|
}
|
|
return
|
|
}
|
|
|
|
func columnToValue(c *pb.Column) interface{} {
|
|
if c.GetIsNull() {
|
|
return nil
|
|
}
|
|
if c.Int64Value != nil {
|
|
return c.GetInt64Value()
|
|
}
|
|
if c.Uint64Value != nil {
|
|
return c.GetUint64Value()
|
|
}
|
|
if c.DoubleValue != nil {
|
|
return c.GetDoubleValue()
|
|
}
|
|
if c.StringValue != nil {
|
|
return c.GetStringValue()
|
|
}
|
|
return c.GetBytesValue()
|
|
}
|
|
|
|
func columnToString(c *pb.Column) string {
|
|
return fmt.Sprint(columnToValue(c))
|
|
}
|