Files
2019-04-22 18:49:16 +08:00

338 lines
7.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package watermark
import (
"bufio"
"context"
"crypto/md5"
"encoding/json"
"fmt"
"io"
"os"
"strconv"
"time"
"go-common/app/interface/main/creative/model/newcomer"
"go-common/app/interface/main/creative/model/watermark"
"go-common/app/service/main/account/model"
"go-common/library/ecode"
"go-common/library/log"
"go-common/library/net/metadata"
)
var (
wmTipFormat = "原创水印将展示在视频%s角以防他人盗用"
wmTipMap = map[int]string{
1: "左上",
2: "右上", // default
3: "左下",
4: "右下",
}
)
// WaterMark get user watermark.
func (s *Service) WaterMark(c context.Context, mid int64) (w *watermark.Watermark, err error) {
if w, err = s.wm.WaterMark(c, mid); err != nil {
log.Error("s.wm.WaterMark(%d) error(%v)", mid, err)
return
}
if w == nil {
w = &watermark.Watermark{
MID: mid,
Pos: 2,
Ty: 1,
}
}
w.Tip = fmt.Sprintf(wmTipFormat, wmTipMap[int(w.Pos)])
var pf *model.Profile
ip := metadata.String(c, metadata.RemoteIP)
if pf, err = s.acc.Profile(c, mid, ip); err != nil {
log.Error("s.acc.Profile(%d) error(%v)", mid, err)
return
}
if pf == nil {
return
}
w.Uname = pf.Name
return
}
// WaterMarkSet set watermark for user.
func (s *Service) WaterMarkSet(c context.Context, wp *watermark.WatermarkParam) (wm *watermark.Watermark, err error) {
var (
w *watermark.Watermark
wmm *watermark.Watermark
pf *model.Profile
)
mid, st, ty, pos, sync, ip := wp.MID, wp.State, wp.Ty, wp.Pos, wp.Sync, wp.IP
if !watermark.IsState(st) {
err = ecode.CreativeWaterMarkWrongState
return
}
if !watermark.IsType(ty) {
err = ecode.CreativeWaterMarkWrongType
return
}
if !watermark.IsPos(pos) {
err = ecode.CreativeWaterMarkWrongPosition // 位置参数错误
return
}
if w, err = s.wm.WaterMark(c, mid); err != nil {
log.Error("s.wm.Watermark(%d) error(%v)", mid, err)
return
}
wm = &watermark.Watermark{
MID: mid,
State: st,
Ty: ty,
Pos: pos,
MTime: time.Now(),
}
if w != nil {
wm.ID = w.ID
wm.Uname = w.Uname
wm.URL = w.URL
wm.MD5 = w.MD5
wm.Info = w.Info
wm.CTime = w.CTime
}
if st == watermark.StatOpen || st == watermark.StatPreview { //开启、预览
var (
text string
isName bool
)
if ty == watermark.TypeName || ty == watermark.TypeNewName { //增加用户名在logo下方的水印
if pf, err = s.acc.Profile(c, mid, ip); err != nil {
log.Error("s.acc.Profile(%d) error(%v)", mid, err)
return
}
if pf == nil {
return
}
if w != nil && w.State == watermark.StatOpen && ty == w.Ty && pos == w.Pos && w.Uname == pf.Name && sync == 0 {
log.Info("repeat uname watermark")
return
}
text = pf.Name
wm.Uname = text
isName = true
} else if ty == watermark.TypeUID {
if w != nil && w.State == watermark.StatOpen && ty == w.Ty && pos == w.Pos {
log.Info("repeat uid watermark")
return
}
text = strconv.FormatInt(mid, 10)
isName = false
}
if ty == watermark.TypeName || ty == watermark.TypeUID { //old get wm
if wmm, err = s.draw(c, mid, text, isName); err != nil {
log.Error("s.draw error(%v)", err)
err = ecode.CreativeWaterMarkCreateFailed
return
}
if wmm == nil {
return
}
wm.Info, wm.URL, wm.MD5 = wmm.Info, wmm.URL, wmm.MD5
} else if ty == watermark.TypeNewName { //new get wm
var gm *watermark.Watermark
gm, err = s.GenWm(c, mid, wm.Uname, ip)
if err != nil || gm == nil {
return
}
wm.Info, wm.URL, wm.MD5 = gm.Info, gm.URL, gm.MD5
}
}
if st == watermark.StatPreview { //预览不更新db
return
}
if w == nil {
wm.CTime = time.Now()
_, err = s.wm.AddWaterMark(c, wm)
} else {
_, err = s.wm.UpWaterMark(c, wm)
}
res, _ := s.WaterMark(c, mid)
if res != nil && res.State == 1 && res.URL != "" {
s.p.TaskPub(mid, newcomer.MsgForWaterMark, newcomer.MsgFinishedCount)
}
return
}
func (s *Service) userInfoConsumer() {
defer s.wg.Done()
var (
msgs = s.userInfoSub.Messages()
err error
c = context.TODO()
)
for {
msg, ok := <-msgs
if !ok {
log.Error("s.userInfoSub.Messages closed")
return
}
msg.Commit()
s.userInfoMo++
u := &watermark.Msg{}
if err = json.Unmarshal(msg.Value, u); err != nil {
log.Error("json.Unmarshal(%v) error(%v)", string(msg.Value), err)
continue
}
if u == nil || u.Action != "update" {
continue
}
s.update(c, u)
log.Info("userInfoConsumer key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
}
}
func (s *Service) update(c context.Context, u *watermark.Msg) (err error) {
if u.Old.Uname == u.New.Uname {
return
}
var w, wm *watermark.Watermark
if w, err = s.wm.WaterMark(c, u.New.MID); err != nil {
log.Error("s.wm.Watermark(%d) error(%v)", u.New.MID, err)
return
}
if w == nil {
return
}
log.Info("user mid(%d) origin data(%+v)", w.MID, w)
uname := u.New.Uname
if w.Ty == watermark.TypeName {
wm, err = s.draw(c, w.MID, uname, true)
if err != nil {
log.Error("s.draw error(%v)", err)
err = ecode.CreativeWaterMarkCreateFailed
return
}
if wm == nil {
return
}
if wm.MD5 == "" {
log.Error("md5Sum fail")
err = ecode.CreativeWaterMarkCreateFailed
return
}
w.Info, w.URL, w.MD5 = wm.Info, wm.URL, wm.MD5
} else if w.Ty == watermark.TypeNewName { //new get wm
var gm *watermark.Watermark
gm, err = s.GenWm(c, w.MID, uname, "")
if err != nil || gm == nil {
return
}
w.Info, w.URL, w.MD5 = gm.Info, gm.URL, gm.MD5
}
w.Uname = uname
w.MTime = time.Now()
_, err = s.wm.UpWaterMark(c, w)
log.Info("user mid(%d) uname from (%s) to (%s) update data(%+v)", u.New.MID, u.Old.Uname, u.New.Uname, w)
return
}
func (s *Service) draw(c context.Context, mid int64, text string, isUname bool) (w *watermark.Watermark, err error) {
dw, err := s.drawimg.Make(c, mid, text, isUname)
if err != nil {
log.Error("s.drawimg.Make error(%v)", err)
return
}
if dw == nil {
return
}
file := dw.File
defer os.Remove(file)
url, err := s.bfs.UploadByFile(c, file)
if err != nil {
log.Error("s.bfs.UploadByFile error(%v)", err)
return
}
info, err := ImageInfo(dw.CanvasWidth, dw.CanvasHeight)
if err != nil {
return
}
w = &watermark.Watermark{}
w.URL, w.Info, w.MD5 = url, info, MD5Sum(file)
return
}
// MD5Sum calculate file md5.
func MD5Sum(file string) string {
f, err := os.Open(file)
if err != nil {
log.Error("md5Sum os.Open error(%v)", err)
return ""
}
defer f.Close()
r := bufio.NewReader(f)
h := md5.New()
_, err = io.Copy(h, r)
if err != nil {
log.Error("md5Sum io.Copy error(%v)", err)
return ""
}
return fmt.Sprintf("%x", h.Sum(nil))
}
//GenWm for new genwm api.
func (s *Service) GenWm(c context.Context, mid int64, uname, ip string) (wm *watermark.Watermark, err error) {
var genwm *watermark.GenWatermark
genwm, err = s.wm.GenWm(c, mid, uname, ip)
if err != nil {
log.Error("s.wm.GenWm error(%v)", err)
return
}
if genwm == nil {
return
}
wm = &watermark.Watermark{}
info, err := ImageInfo(genwm.Width, genwm.Height)
if err != nil {
return
}
wm.URL, wm.MD5, wm.Info = genwm.Location, genwm.MD5, info
return
}
//ImageInfo for image info json.
func ImageInfo(width, height int) (info string, err error) {
imgInfo := &watermark.Image{
Width: width,
Height: height,
}
var bs []byte
if bs, err = json.Marshal(&imgInfo); err != nil {
log.Error("json.Marshal error(%v)", err)
return
}
info = string(bs)
return
}
// AsyncWaterMarkSet fn
func (s *Service) AsyncWaterMarkSet(wp *watermark.WatermarkParam) {
if s.closed {
log.Warn("AsyncWaterMarkSet chan is closed")
return
}
select {
case s.wmChan <- wp:
default:
log.Error("AsyncWaterMarkSet chan is full data(%+v)", wp)
}
}
func (s *Service) asyncWmSetProc() {
c := context.Background()
for {
v, ok := <-s.wmChan
if ok {
log.Info("watermark set by async with data(%+v)", v)
if _, err := s.WaterMarkSet(c, v); err != nil {
log.Error("s.WaterMarkSet watermark err (%+v)", err)
}
}
}
}