old_console/correlator/events/store.go
2024-11-02 14:12:45 +03:00

214 lines
6.2 KiB
Go

package events
import (
"github.com/olivere/elastic/v7"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"iwarma.ru/console/correlator/config"
"iwarma.ru/console/correlator/es"
"strconv"
"sync"
"time"
)
type UpdateType byte
// Type of aggregated record in store
const (
typeNone UpdateType = iota
typeNew
typeUpdate
)
// AggregatedEventItem Aggregated event with mark for update purpose
type AggregatedEventItem struct {
AggregatedEvent Event
Updated UpdateType
}
// AggregatedEventStore Sync store for aggregated events
type AggregatedEventStore struct {
mapMutex sync.Mutex
eventsMap map[string]*AggregatedEventItem
aggregatedProcessor *elastic.BulkProcessor
normalizedProcessor *elastic.BulkProcessor
aggregatedWindow TimeWindow
client *es.Elastic
stat Stat
cl *log.Entry
}
func (store *AggregatedEventStore) GetStat() *Stat {
store.mapMutex.Lock()
defer store.mapMutex.Unlock()
return &store.stat
}
func (store *AggregatedEventStore) GetClient() *es.Elastic {
return store.client
}
// NewAggregatedEventStore Create new AggregatedEventStore
func NewAggregatedEventStore(el *es.Elastic, window TimeWindow) *AggregatedEventStore {
result := new(AggregatedEventStore)
result.eventsMap = make(map[string]*AggregatedEventItem)
result.cl = log.WithField("part", "AggregatedEventStore")
var err error
// Prepare aggregated events bulk processor
result.aggregatedProcessor, err = el.NewBulkProcessor(
viper.GetInt(config.AggregatorUpdateWorkers),
viper.GetInt(config.AggregatorBulkCount),
viper.GetDuration(config.AggregatorBulkFlushInterval),
viper.GetBool(config.Verbose))
if err != nil {
result.cl.Errorf("Can't start aggregated processor: %v", err)
panic(err)
}
result.normalizedProcessor, err = el.NewBulkProcessor(
viper.GetInt(config.AggregatorNormalizedWorkers),
viper.GetInt(config.AggregatorNormalizerBulkCount),
viper.GetDuration(config.AggregatorNormalizedBulkFlushInterval),
viper.GetBool(config.Verbose))
if err != nil {
result.cl.Errorf("Can't start normalizer processor: %v", err)
panic(err)
}
result.aggregatedWindow = window
result.client = el
return result
}
// UpdateWindow Update aggregated store window and clean internal aggregated map
// Must call this after all bulk update ready, or you loose events
func (store *AggregatedEventStore) UpdateWindow(window TimeWindow) {
// We need to send all updates before we can change window
store.SendUpdateBulk()
// Now we can update window
store.mapMutex.Lock()
defer store.mapMutex.Unlock()
store.aggregatedWindow = window
store.eventsMap = make(map[string]*AggregatedEventItem)
}
// GetWindow Get current aggregation window
func (store *AggregatedEventStore) GetWindow() TimeWindow {
return store.aggregatedWindow
}
// MapSize Get size of store
func (store *AggregatedEventStore) MapSize() int {
store.mapMutex.Lock()
defer store.mapMutex.Unlock()
return len(store.eventsMap)
}
// TotalEventCount Get total event processed by store
func (store *AggregatedEventStore) TotalEventCount() uint64 {
store.mapMutex.Lock()
defer store.mapMutex.Unlock()
return store.stat.EventsProcessed
}
// AddEvents Read input channel and store normalized events
func (store *AggregatedEventStore) AddEvents(input chan *Event) {
for event := range input {
startIter := time.Now()
hash := event.Hash()
store.mapMutex.Lock()
store.stat.EventsProcessed++
item, ok := store.eventsMap[hash]
if ok {
// update record
item.AggregatedEvent[EventCount] = item.AggregatedEvent.GetInt(EventCount) + 1
var eventTimestamp time.Time
eventTimestamp, err := time.Parse(time.RFC822, event.GetString("event_timestamp"))
if err != nil {
eventTimestamp = time.Now()
}
item.AggregatedEvent[LastEvent] = eventTimestamp
if item.Updated != typeNew {
item.Updated = typeUpdate
}
} else {
var eventTimestamp time.Time
eventTimestamp, err := time.Parse(time.RFC822, event.GetString("event_timestamp"))
if err != nil {
eventTimestamp = time.Now()
}
// Create new record
event.SetValue(Hash, hash)
event.SetValue(FirstEvent, eventTimestamp)
event.SetValue(LastEvent, eventTimestamp)
event.SetValue(EventCount, 1)
event.SetValue(Created, time.Now().UTC())
item = &AggregatedEventItem{
AggregatedEvent: *event,
Updated: typeNew,
}
store.eventsMap[hash] = item
store.stat.EventsAggregated++
}
store.stat.AverageIterationTime.Add(time.Since(startIter))
store.mapMutex.Unlock()
// Create bulk request to delete processed normalized event
store.normalizedProcessor.Add(elastic.NewBulkDeleteRequest().
Index(event.GetString("index")).Id(event.GetString(EventID)))
}
}
// SendUpdateBulk Send aggregated events to server
func (store *AggregatedEventStore) SendUpdateBulk() {
windowHash := strconv.FormatUint(uint64(store.aggregatedWindow.Hash()), 10)
store.mapMutex.Lock()
var newCount, updateCount, allCount int32
for _, value := range store.eventsMap {
value.AggregatedEvent.SetValue(AggregatedId, windowHash+"_"+value.AggregatedEvent.HashToString())
if value.Updated == typeNew {
request := elastic.NewBulkIndexRequest().Index(GetAggregatedIndexName()).Doc(value.AggregatedEvent).Id(value.AggregatedEvent.GetString(AggregatedId))
value.Updated = typeNone
store.aggregatedProcessor.Add(request)
newCount++
} else if value.Updated == typeUpdate {
request := elastic.NewBulkUpdateRequest().Index(GetAggregatedIndexName()).Doc(value.AggregatedEvent).Id(value.AggregatedEvent.GetString(AggregatedId))
value.Updated = typeNone
store.aggregatedProcessor.Add(request)
updateCount++
} else {
allCount++
}
}
// Need to flush bulk processors before mutex unlock
err := store.normalizedProcessor.Flush()
if err != nil {
store.cl.Errorf("Can't send bulk update to normalized events: %v", err)
}
err = store.aggregatedProcessor.Flush()
if err != nil {
store.cl.Errorf("Can't send bulk update to aggregated events: %v", err)
}
store.mapMutex.Unlock()
if viper.GetBool(config.Verbose) {
cl := log.WithField("func", "SendUpdateBulk")
cl.Debugf("New records: %d, Update records: %d, Total records: %d", newCount, updateCount, allCount+newCount+updateCount)
}
}