old_console/correlator/events/events.go
2024-11-02 14:12:45 +03:00

175 lines
3.5 KiB
Go

package events
import (
"crypto/sha512"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/olivere/elastic/v7"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"iwarma.ru/console/correlator/config"
"sync"
)
type Event map[string]interface{}
// AddTag Add tags to event
func (event *Event) AddTag(ruleId string) []string {
cl := log.WithField("func", "AddTag")
tags := event.GetValue(Tags)
var result []string
if tags == nil {
cl.Debug("Received nil tags. Create empty slice")
tags = make([]string, 0)
}
_, ok := tags.([]string)
if !ok {
items := make([]string, 0)
for _, tag := range tags.([]interface{}) {
items = append(items, tag.(string))
}
result = append(items, ruleId)
event.SetValue(Tags, result)
} else {
result = append(tags.([]string), ruleId)
event.SetValue(Tags, result)
}
cl.Debugf("Return event tags -> %v", result)
return result
}
func (event *Event) GetValue(name string) interface{} {
val, ok := (*event)[name]
if ok {
return val
}
return nil
}
func (event *Event) GetString(name string) string {
val, ok := (*event)[name].(string)
if ok {
return val
}
return ""
}
func (event *Event) GetInt(name string) int {
val, ok := (*event)[name].(int)
if ok {
return val
}
return 0
}
func (event *Event) SetValue(key string, value interface{}) {
(*event)[key] = value
}
//Frequently used fields are placed in constants
const (
Hash = "event_hash"
FirstEvent = "event_first"
LastEvent = "event_last"
EventCount = "event_count"
Created = "@created"
Tags = "rule_tags"
AggregatedId = "aggregated_id"
CeleryDone = "celery_done"
EventID = "event_id"
)
// Hash Calculate hash to check that two Normalized events are
// at the same aggregated event
func (event *Event) Hash() string {
cl := log.WithField("func", "PrepareFieldsString")
fieldsList := viper.GetStringSlice(config.AggregatedFields)
var fieldString string
for _, item := range fieldsList {
val := event.GetValue(item)
if val != nil {
fieldString += fmt.Sprintf("%v", val)
} else {
cl.Tracef("Bad filed name %v", item)
}
}
if len(fieldString) == 0 {
cl.Warn("Fields list is empty")
}
hash := sha512.Sum512_256([]byte(fmt.Sprintf("%v", fieldString)))
return hex.EncodeToString(hash[:])
}
func (event *Event) HashToString() string {
return event.GetString(Hash)
}
// ParseEvents Read query results from Elastic.Query and convert them to Event
func ParseEvents(raw chan *elastic.SearchHit, inputErrors chan error, threadCount int) (chan *Event, chan error) {
cl := log.WithField("func", "ParseNormalizedEvents")
cl.Trace("Start")
// We need at least one thread
if threadCount <= 0 {
threadCount = 1
}
results := make(chan *Event, threadCount)
errs := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(threadCount)
for i := 0; i < threadCount; i++ {
go func() {
defer wg.Done()
// Process all hits until raw channel is open
for hit := range raw {
if hit == nil {
continue
}
var item interface{}
var event Event
err := json.Unmarshal(hit.Source, &item)
event = item.(map[string]interface{})
if err != nil {
cl.Errorf("Can't read normlized event: %v", err)
cl.Debugf("Input value: %v", hit)
errs <- err
continue
}
// Add information about event's index
event["index"] = hit.Index
results <- &event
}
// Send up error if we have one
if err := <-inputErrors; err != nil {
errs <- err
}
}()
}
go func() {
wg.Wait()
close(results)
close(errs)
cl.Trace("Finish")
}()
return results, errs
}