175 lines
3.5 KiB
Go
175 lines
3.5 KiB
Go
package events
|
|
|
|
import (
|
|
"crypto/sha512"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/olivere/elastic/v7"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/spf13/viper"
|
|
"iwarma.ru/console/correlator/config"
|
|
"sync"
|
|
)
|
|
|
|
type Event map[string]interface{}
|
|
|
|
// AddTag Add tags to event
|
|
func (event *Event) AddTag(ruleId string) []string {
|
|
cl := log.WithField("func", "AddTag")
|
|
tags := event.GetValue(Tags)
|
|
var result []string
|
|
if tags == nil {
|
|
cl.Debug("Received nil tags. Create empty slice")
|
|
tags = make([]string, 0)
|
|
}
|
|
_, ok := tags.([]string)
|
|
if !ok {
|
|
items := make([]string, 0)
|
|
for _, tag := range tags.([]interface{}) {
|
|
items = append(items, tag.(string))
|
|
}
|
|
result = append(items, ruleId)
|
|
event.SetValue(Tags, result)
|
|
} else {
|
|
result = append(tags.([]string), ruleId)
|
|
event.SetValue(Tags, result)
|
|
|
|
}
|
|
cl.Debugf("Return event tags -> %v", result)
|
|
return result
|
|
}
|
|
|
|
func (event *Event) GetValue(name string) interface{} {
|
|
val, ok := (*event)[name]
|
|
|
|
if ok {
|
|
return val
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (event *Event) GetString(name string) string {
|
|
val, ok := (*event)[name].(string)
|
|
|
|
if ok {
|
|
return val
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
func (event *Event) GetInt(name string) int {
|
|
val, ok := (*event)[name].(int)
|
|
|
|
if ok {
|
|
return val
|
|
}
|
|
|
|
return 0
|
|
}
|
|
|
|
func (event *Event) SetValue(key string, value interface{}) {
|
|
(*event)[key] = value
|
|
}
|
|
|
|
//Frequently used fields are placed in constants
|
|
const (
|
|
Hash = "event_hash"
|
|
FirstEvent = "event_first"
|
|
LastEvent = "event_last"
|
|
EventCount = "event_count"
|
|
Created = "@created"
|
|
Tags = "rule_tags"
|
|
AggregatedId = "aggregated_id"
|
|
CeleryDone = "celery_done"
|
|
EventID = "event_id"
|
|
)
|
|
|
|
// Hash Calculate hash to check that two Normalized events are
|
|
// at the same aggregated event
|
|
func (event *Event) Hash() string {
|
|
cl := log.WithField("func", "PrepareFieldsString")
|
|
fieldsList := viper.GetStringSlice(config.AggregatedFields)
|
|
var fieldString string
|
|
for _, item := range fieldsList {
|
|
val := event.GetValue(item)
|
|
if val != nil {
|
|
fieldString += fmt.Sprintf("%v", val)
|
|
} else {
|
|
cl.Tracef("Bad filed name %v", item)
|
|
|
|
}
|
|
}
|
|
if len(fieldString) == 0 {
|
|
cl.Warn("Fields list is empty")
|
|
}
|
|
hash := sha512.Sum512_256([]byte(fmt.Sprintf("%v", fieldString)))
|
|
return hex.EncodeToString(hash[:])
|
|
}
|
|
|
|
func (event *Event) HashToString() string {
|
|
return event.GetString(Hash)
|
|
}
|
|
|
|
// ParseEvents Read query results from Elastic.Query and convert them to Event
|
|
func ParseEvents(raw chan *elastic.SearchHit, inputErrors chan error, threadCount int) (chan *Event, chan error) {
|
|
|
|
cl := log.WithField("func", "ParseNormalizedEvents")
|
|
cl.Trace("Start")
|
|
|
|
// We need at least one thread
|
|
if threadCount <= 0 {
|
|
threadCount = 1
|
|
}
|
|
|
|
results := make(chan *Event, threadCount)
|
|
errs := make(chan error, 1)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(threadCount)
|
|
|
|
for i := 0; i < threadCount; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
// Process all hits until raw channel is open
|
|
for hit := range raw {
|
|
if hit == nil {
|
|
continue
|
|
}
|
|
|
|
var item interface{}
|
|
var event Event
|
|
err := json.Unmarshal(hit.Source, &item)
|
|
event = item.(map[string]interface{})
|
|
if err != nil {
|
|
cl.Errorf("Can't read normlized event: %v", err)
|
|
cl.Debugf("Input value: %v", hit)
|
|
errs <- err
|
|
continue
|
|
}
|
|
|
|
// Add information about event's index
|
|
event["index"] = hit.Index
|
|
|
|
results <- &event
|
|
}
|
|
|
|
// Send up error if we have one
|
|
if err := <-inputErrors; err != nil {
|
|
errs <- err
|
|
}
|
|
}()
|
|
}
|
|
|
|
go func() {
|
|
wg.Wait()
|
|
close(results)
|
|
close(errs)
|
|
cl.Trace("Finish")
|
|
}()
|
|
|
|
return results, errs
|
|
}
|