package events import ( "github.com/olivere/elastic/v7" log "github.com/sirupsen/logrus" "github.com/spf13/viper" "iwarma.ru/console/correlator/config" "iwarma.ru/console/correlator/es" "strconv" "sync" "time" ) type UpdateType byte // Type of aggregated record in store const ( typeNone UpdateType = iota typeNew typeUpdate ) // AggregatedEventItem Aggregated event with mark for update purpose type AggregatedEventItem struct { AggregatedEvent Event Updated UpdateType } // AggregatedEventStore Sync store for aggregated events type AggregatedEventStore struct { mapMutex sync.Mutex eventsMap map[string]*AggregatedEventItem aggregatedProcessor *elastic.BulkProcessor normalizedProcessor *elastic.BulkProcessor aggregatedWindow TimeWindow client *es.Elastic stat Stat cl *log.Entry } func (store *AggregatedEventStore) GetStat() *Stat { store.mapMutex.Lock() defer store.mapMutex.Unlock() return &store.stat } func (store *AggregatedEventStore) GetClient() *es.Elastic { return store.client } // NewAggregatedEventStore Create new AggregatedEventStore func NewAggregatedEventStore(el *es.Elastic, window TimeWindow) *AggregatedEventStore { result := new(AggregatedEventStore) result.eventsMap = make(map[string]*AggregatedEventItem) result.cl = log.WithField("part", "AggregatedEventStore") var err error // Prepare aggregated events bulk processor result.aggregatedProcessor, err = el.NewBulkProcessor( viper.GetInt(config.AggregatorUpdateWorkers), viper.GetInt(config.AggregatorBulkCount), viper.GetDuration(config.AggregatorBulkFlushInterval), viper.GetBool(config.Verbose)) if err != nil { result.cl.Errorf("Can't start aggregated processor: %v", err) panic(err) } result.normalizedProcessor, err = el.NewBulkProcessor( viper.GetInt(config.AggregatorNormalizedWorkers), viper.GetInt(config.AggregatorNormalizerBulkCount), viper.GetDuration(config.AggregatorNormalizedBulkFlushInterval), viper.GetBool(config.Verbose)) if err != nil { result.cl.Errorf("Can't start normalizer processor: %v", err) panic(err) } result.aggregatedWindow = window result.client = el return result } // UpdateWindow Update aggregated store window and clean internal aggregated map // Must call this after all bulk update ready, or you loose events func (store *AggregatedEventStore) UpdateWindow(window TimeWindow) { // We need to send all updates before we can change window store.SendUpdateBulk() // Now we can update window store.mapMutex.Lock() defer store.mapMutex.Unlock() store.aggregatedWindow = window store.eventsMap = make(map[string]*AggregatedEventItem) } // GetWindow Get current aggregation window func (store *AggregatedEventStore) GetWindow() TimeWindow { return store.aggregatedWindow } // MapSize Get size of store func (store *AggregatedEventStore) MapSize() int { store.mapMutex.Lock() defer store.mapMutex.Unlock() return len(store.eventsMap) } // TotalEventCount Get total event processed by store func (store *AggregatedEventStore) TotalEventCount() uint64 { store.mapMutex.Lock() defer store.mapMutex.Unlock() return store.stat.EventsProcessed } // AddEvents Read input channel and store normalized events func (store *AggregatedEventStore) AddEvents(input chan *Event) { for event := range input { startIter := time.Now() hash := event.Hash() store.mapMutex.Lock() store.stat.EventsProcessed++ item, ok := store.eventsMap[hash] if ok { // update record item.AggregatedEvent[EventCount] = item.AggregatedEvent.GetInt(EventCount) + 1 var eventTimestamp time.Time eventTimestamp, err := time.Parse(time.RFC822, event.GetString("event_timestamp")) if err != nil { eventTimestamp = time.Now() } item.AggregatedEvent[LastEvent] = eventTimestamp if item.Updated != typeNew { item.Updated = typeUpdate } } else { var eventTimestamp time.Time eventTimestamp, err := time.Parse(time.RFC822, event.GetString("event_timestamp")) if err != nil { eventTimestamp = time.Now() } // Create new record event.SetValue(Hash, hash) event.SetValue(FirstEvent, eventTimestamp) event.SetValue(LastEvent, eventTimestamp) event.SetValue(EventCount, 1) event.SetValue(Created, time.Now().UTC()) item = &AggregatedEventItem{ AggregatedEvent: *event, Updated: typeNew, } store.eventsMap[hash] = item store.stat.EventsAggregated++ } store.stat.AverageIterationTime.Add(time.Since(startIter)) store.mapMutex.Unlock() // Create bulk request to delete processed normalized event store.normalizedProcessor.Add(elastic.NewBulkDeleteRequest(). Index(event.GetString("index")).Id(event.GetString(EventID))) } } // SendUpdateBulk Send aggregated events to server func (store *AggregatedEventStore) SendUpdateBulk() { windowHash := strconv.FormatUint(uint64(store.aggregatedWindow.Hash()), 10) store.mapMutex.Lock() var newCount, updateCount, allCount int32 for _, value := range store.eventsMap { value.AggregatedEvent.SetValue(AggregatedId, windowHash+"_"+value.AggregatedEvent.HashToString()) if value.Updated == typeNew { request := elastic.NewBulkIndexRequest().Index(GetAggregatedIndexName()).Doc(value.AggregatedEvent).Id(value.AggregatedEvent.GetString(AggregatedId)) value.Updated = typeNone store.aggregatedProcessor.Add(request) newCount++ } else if value.Updated == typeUpdate { request := elastic.NewBulkUpdateRequest().Index(GetAggregatedIndexName()).Doc(value.AggregatedEvent).Id(value.AggregatedEvent.GetString(AggregatedId)) value.Updated = typeNone store.aggregatedProcessor.Add(request) updateCount++ } else { allCount++ } } // Need to flush bulk processors before mutex unlock err := store.normalizedProcessor.Flush() if err != nil { store.cl.Errorf("Can't send bulk update to normalized events: %v", err) } err = store.aggregatedProcessor.Flush() if err != nil { store.cl.Errorf("Can't send bulk update to aggregated events: %v", err) } store.mapMutex.Unlock() if viper.GetBool(config.Verbose) { cl := log.WithField("func", "SendUpdateBulk") cl.Debugf("New records: %d, Update records: %d, Total records: %d", newCount, updateCount, allCount+newCount+updateCount) } }