1: 背景
由于需要將mongodb的相關鏈路數據上報至opentelemetry-collector,根據mongo-driver文檔說明,初始化Monitor并實現Started,Succeeded,Failed三個接口,即可捕獲相關鏈路數據。
2: 要求
· 線程安全
· 支持設置最大數量緩存,如果達到最大數量后,支持LRU進行淘汰數據
· 支持緩存數據最長存活時間設置,達到時間的緩存數據進行清理
· 支持從緩存中淘汰策略設置
· 支持數據沖突時,更新數據位置以及過期時間
3: 代碼實現
package mongodriver
import (
	"container/list"
	"context"
	"fmt"
	"strconv"
	"strings"
	"sync"
	"time"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/bsontype"
	"go.mongodb.org/mongo-driver/event"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/codes"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
	"go.opentelemetry.io/otel/trace"
)
// FIFOCache 并發安全的 FIFO 緩存實現, 具備定時清理緩存功能
type FIFOCache struct {
	// 最大保留的元素
	MaxEntries int
	// 從緩存中淘汰時激活的回調函
	OnEvictedCallback func(key Key, value interface{}, reason string, autoFinished bool)
	// 使用鏈表保存先后關系
	ll    *list.List
	cache map[interface{}]*list.Element
	lock        sync.RWMutex
	ttlInMinute int32
	done        chan struct{}
}
const (
	RemoveReasonExpired  = "EXPIRED"   // 因過期觸發清理
	RemoveReasonExplicit = "EXPLICIT"  // 正常情況清理
	RemoveReasonSize     = "MAX_SIZE"  // 因超過容量觸發清理
	RemoveReasonCleanAll = "CLEAN_ALL" // 觸發清除所有數據
	defaultEvictedSpanKey = "evicted" // 觸發remove時,寫入的tag的key
)
type Key interface{}
type Entry struct {
	key   Key
	value interface{}
	ttl   time.Time // 數據過期時間
}
// NewFIFOCache 創建一個新緩存,maxEntries 必須大于0.
// ttlInMinute 配置緩存對象的最大存活時間, 單位為minute, 0表示不開啟過期功能(不宜配置過低的時間,太低則建議考慮場景是否合適)
func NewFIFOCache(maxEntries int, ttlInMinute int32) *FIFOCache {
	if maxEntries < 0 {
		panic("maxEntries must be more than 0")
	}
	cache := &FIFOCache{
		MaxEntries: maxEntries,
		OnEvictedCallback: func(key Key, value interface{}, reason string, autoFinished bool) {
			if !autoFinished {
				return
			}
			span, ok := value.(trace.Span)
			if !ok {
				return
			}
			span.SetAttributes(attribute.String(defaultEvictedSpanKey, reason))
			span.End()
		},
		ll:    list.New(),
		cache: make(map[interface{}]*list.Element, maxEntries),
		done:  make(chan struct{}),
	}
	cache.ttlInMinute = ttlInMinute
	if ttlInMinute > 0 {
		go func() {
			for {
				select {
				case <-cache.done:
					println("stop cache goroutine")
					return
				case <-time.After(time.Second * 10):
					cache.cleanInterval(RemoveReasonExpired)
				}
			}
		}()
	}
	return cache
}
// Set 添加一個元素至緩存,如果存在舊值則用新的覆蓋
func (c *FIFOCache) Set(key Key, value interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
	if ee, ok := c.cache[key]; ok { // 如果已存在則直接移動到最前
		c.ll.MoveToFront(ee)
		if e, ok := ee.Value.(*Entry); ok {
			e.value = value
			if c.ttlInMinute > 0 {
				e.ttl = time.Now().Add(time.Minute * time.Duration(c.ttlInMinute))
			}
			ee.Value = e
		}
		return
	}
	item := &Entry{key: key, value: value}
	if c.ttlInMinute > 0 {
		item.ttl = time.Now().Add(time.Minute * time.Duration(c.ttlInMinute))
	}
	element := c.ll.PushFront(item)
	c.cache[key] = element
	if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
		c.removeOldest(RemoveReasonSize)
	}
}
func (c *FIFOCache) Get(key Key) (value interface{}, ok bool) {
	c.lock.Lock()
	defer c.lock.Unlock()
	if element, hit := c.cache[key]; hit {
		if e, ok := element.Value.(*Entry); ok {
			return e.value, true
		}
	}
	return
}
// Remove 刪除一個元素, 存在則返回ok為true
// @param key 存儲的key
// @param reason 清理原因,具體見定義常量
// @param autoFinish 是否自動上報數據,如果為true,則執行span.End()
func (c *FIFOCache) Remove(key Key, reason string, autoFinish bool) (value interface{}, ok bool) {
	c.lock.Lock()
	defer c.lock.Unlock()
	if element, hit := c.cache[key]; hit {
		c.removeElement(element, reason, autoFinish)
		if e, ok := element.Value.(*Entry); ok {
			return e.value, ok
		}
	}
	return nil, false
}
func (c *FIFOCache) removeOldest(reason string) {
	element := c.ll.Back()
	if element != nil {
		c.removeElement(element, reason, true)
	}
}
func (c *FIFOCache) removeElement(e *list.Element, reason string, autoFinished bool) {
	c.ll.Remove(e)
	kv, ok := e.Value.(*Entry)
	if !ok {
		return
	}
	delete(c.cache, kv.key)
	if c.OnEvictedCallback != nil {
		c.OnEvictedCallback(kv.key, kv.value, reason, autoFinished)
	}
}
func (c *FIFOCache) Len() int {
	if c.cache == nil {
		return 0
	}
	return c.ll.Len()
}
// ClearAll 清空所有數據
func (c *FIFOCache) ClearAll() {
	c.lock.Lock()
	defer c.lock.Unlock()
	if c.OnEvictedCallback != nil {
		for _, e := range c.cache {
			if kv, ok := e.Value.(*Entry); ok {
				c.OnEvictedCallback(kv.key, kv.value, RemoveReasonCleanAll, true)
			}
		}
	}
	c.ll = list.New()
	c.cache = make(map[interface{}]*list.Element)
}
// Close 關閉 goroutines
func (c *FIFOCache) Close() {
	c.lock.Lock()
	defer c.lock.Unlock()
	close(c.done)
}
// 清理過期緩存元素,不宜頻繁調用
func (c *FIFOCache) cleanInterval(reason string) {
	c.lock.Lock()
	defer c.lock.Unlock()
	for { // 一直執行移除元素直到遇到不過期的節點
		element := c.ll.Back()
		if element == nil {
			break
		}
		kv, ok := element.Value.(*Entry)
		if ok && !time.Time.IsZero(kv.ttl) && time.Until(kv.ttl) < 0 {
			c.removeElement(element, reason, true)
			continue // 繼續查找過期元素
		}
		break
	}
}
const (
	TracerKey                = "gitlab.daliqc.cn/apm/apm-sdk/sdk-go/sql/mongodriver"
	defaultTTLInMinute int32 = 10         // 最大緩存時間,單位分鐘,存活超過10分鐘的span的數據將清除
	defaultMaxEntries        = 10 * 10000 // 最大緩存10w條span數據,大約133MB數據量
)
type SpanKey struct {
	ConnectionID string
	RequestID    int64
}
type monitor struct {
	cache *FIFOCache
}
// NewMonitor creates a new mongodb event CommandMonitor.
func NewMonitor() *event.CommandMonitor {
	cache := NewFIFOCache(defaultMaxEntries, defaultTTLInMinute)
	m := &monitor{cache: cache}
	return &event.CommandMonitor{
		Started:   m.Started,
		Succeeded: m.Succeeded,
		Failed:    m.Failed,
	}
}
func (m *monitor) Started(ctx context.Context, evt *event.CommandStartedEvent) {
	tracer := otel.GetTracerProvider().Tracer(TracerKey)
	var spanName string
	hostname, port := m.peerInfo(evt)
	attrs := []attribute.KeyValue{
		semconv.DBSystemMongoDB,
		semconv.DBOperation(evt.CommandName),
		semconv.DBName(evt.DatabaseName),
		semconv.NetPeerName(hostname),
		semconv.NetPeerPort(port),
		semconv.NetTransportTCP,
	}
	if collection, err := m.extractCollection(evt); err == nil && collection != "" {
		spanName = collection + "."
		attrs = append(attrs, semconv.DBMongoDBCollection(collection))
	}
	spanName += evt.CommandName
	opts := []trace.SpanStartOption{
		trace.WithSpanKind(trace.SpanKindClient),
		trace.WithAttributes(attrs...),
	}
	_, span := tracer.Start(ctx, spanName, opts...)
	key := SpanKey{
		ConnectionID: evt.ConnectionID,
		RequestID:    evt.RequestID,
	}
	m.cache.Set(key, span)
}
func (m *monitor) Succeeded(ctx context.Context, evt *event.CommandSucceededEvent) {
	m.Finished(ctx, &evt.CommandFinishedEvent, nil)
}
func (m *monitor) Failed(ctx context.Context, evt *event.CommandFailedEvent) {
	m.Finished(ctx, &evt.CommandFinishedEvent, fmt.Errorf("%s", evt.Failure))
}
func (m *monitor) Finished(ctx context.Context, evt *event.CommandFinishedEvent, err error) {
	key := SpanKey{
		ConnectionID: evt.ConnectionID,
		RequestID:    evt.RequestID,
	}
	value, ok := m.cache.Remove(key, RemoveReasonExplicit, false)
	if !ok {
		return
	}
	span, ok := value.(trace.Span)
	if !ok {
		return
	}
	if err != nil {
		span.SetStatus(codes.Error, err.Error())
	}
	span.End()
}
func (m *monitor) extractCollection(evt *event.CommandStartedEvent) (string, error) {
	elt, err := evt.Command.IndexErr(0)
	if err != nil {
		return "", err
	}
	if key, err := elt.KeyErr(); err == nil && key == evt.CommandName {
		var v bson.RawValue
		if v, err = elt.ValueErr(); err != nil || v.Type != bsontype.String {
			return "", err
		}
		return v.StringValue(), nil
	}
	return "", fmt.Errorf("collection name not found")
}
func (m *monitor) peerInfo(evt *event.CommandStartedEvent) (hostname string, port int) {
	hostname = evt.ConnectionID
	port = 27017
	if idx := strings.IndexByte(hostname, '['); idx >= 0 {
		hostname = hostname[:idx]
	}
	if idx := strings.IndexByte(hostname, ':'); idx >= 0 {
		port = func(p int, e error) int { return p }(strconv.Atoi(hostname[idx+1:]))
		hostname = hostname[:idx]
	}
	return hostname, port
}