old_console/correlator/rules/rule.go
2024-11-02 14:12:45 +03:00

222 lines
5.7 KiB
Go

package rules
import (
"encoding/json"
"errors"
"fmt"
"iwarma.ru/console/correlator/es"
"iwarma.ru/console/correlator/events"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/olivere/elastic/v7"
"github.com/spf13/viper"
"iwarma.ru/console/correlator/config"
)
// Rule is a correlator rule
type Rule struct {
Id string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Multi bool `json:"multi"`
Depth time.Duration `json:"depth"`
Predicate Predicate `json:"predicat"`
Actions []Action `json:"actions"`
}
// GetRangeQuery Create range query for selected depth
func (rule Rule) GetRangeQuery() (*elastic.BoolQuery, error) {
if int(rule.Depth) == 0 {
return nil, errors.New("have empty duration")
}
return elastic.NewBoolQuery().Should(
elastic.NewRangeQuery("event_first").Gte(time.Now().UTC().Add(-rule.Depth)).Lte(time.Now().UTC()),
elastic.NewRangeQuery("event_last").Gte(time.Now().UTC().Add(-rule.Depth)).Lte(time.Now().UTC())), nil
}
// Do Apply rule
func (rule Rule) Do(client *es.Elastic) (*[]*events.Event, error) {
cl := log.WithFields(log.Fields{"name": rule.Name, "id": rule.Id})
cl.Debug("Starting rule")
defer cl.Debug("Done rule")
if len(rule.Id) == 0 {
return nil, errors.New("need rule ID to be set")
}
if rule.Depth == 0 {
return nil, errors.New("bad duration value")
}
// Don't process same event twice
tagsQuery := elastic.NewMatchQuery("rule_tags", rule.Id)
// Some types of predicates need additional processing
query, err := rule.GetRangeQuery()
if err != nil {
cl.Error(err)
return nil, err
}
query.Must(rule.Predicate).MustNot(tagsQuery)
// Show query if user want
client.DebugQuery(events.GetAggregatedIndexName(), query)
aggregatedEvents := make([]*events.Event, 0)
// Query for events
raw, errs1 := client.Query(events.GetAggregatedIndexName(), query)
aggregated, errs2 := events.ParseEvents(raw, errs1, viper.GetInt(config.Threads))
cl.Debug("Start reading results")
for event := range aggregated {
aggregatedEvents = append(aggregatedEvents, event)
}
cl.Debug("Finish reading results")
for err := range errs2 {
cl.Debug("Got errors")
return nil, err
}
cl.Debugf("Done getting events. Got %v events", len(aggregatedEvents))
if len(aggregatedEvents) == 0 {
cl.Debug("No aggregated events suitable for rule")
return nil, nil
}
cl.Debug("Starting actions")
var wg sync.WaitGroup
wg.Add(len(rule.Actions))
actionErrs := make(chan error, len(rule.Actions))
// Apply actions
for _, cur := range rule.Actions {
go func(action Action) {
actionLog := cl.WithField("multi", rule.Multi)
defer wg.Done()
actionLog.Tracef("Start action %v", action.GetType())
defer actionLog.Tracef("Finish action %v", action.GetType())
if rule.Multi {
actionLog.Debugf("Sending %v events to action", len(aggregatedEvents))
err := action.Perform(&aggregatedEvents)
if err != nil {
actionErrs <- err
}
} else {
slice := aggregatedEvents[0:1]
actionLog.Debugf("Sending %v events to action", len(slice))
err := action.Perform(&slice)
if err != nil {
actionErrs <- err
}
}
}(cur)
}
wg.Wait()
close(actionErrs)
for err := range actionErrs {
if err != nil {
cl.Errorf("%v", err)
return nil, err
}
}
return &aggregatedEvents, nil
}
//###################################################
// Json serializer/deserializer
//###################################################
func (rule Rule) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
Id string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Depth string `json:"depth"`
Predicat Predicate `json:"predicat"`
Multi bool `json:"multi"`
Actions *[]Action `json:"actions"`
}{
Id: rule.Id,
Name: rule.Name,
Description: rule.Description,
Depth: rule.Depth.String(),
Predicat: rule.Predicate,
Actions: &rule.Actions,
})
}
func (rule *Rule) UnmarshalJSON(b []byte) error {
alias := &struct {
Id string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Depth string `json:"depth"`
Predicat Predicate `json:"predicat"`
Multi bool `json:"multi"`
Actions []interface{} `json:"actions"`
}{}
err := json.Unmarshal(b, alias)
if err != nil {
log.Debugf("Can't unmarshall rule: %v", string(b))
return err
}
rule.Depth, err = time.ParseDuration(alias.Depth)
if err != nil {
log.Debugf("Can't parse depth: %v", alias.Depth)
return err
}
//https://pkg.go.dev/github.com/mitchellh/mapstructure?tab=doc
rule.Id = alias.Id
rule.Name = alias.Name
rule.Description = alias.Description
rule.Predicate = alias.Predicat
rule.Multi = alias.Multi
for _, cur := range alias.Actions {
var item Action
switch curType := cur.(map[string]interface{})["type"].(string); curType {
case HttpActionType:
item = &HttpAction{}
case SyslogActionType:
item = &SyslogAction{}
case IncidentActionType:
item = &IncidentAction{}
case BashActionType:
item = &BashAction{}
case ExecActionType:
item = &ExecAction{}
case AssetActionType:
item = &AssetAction{}
case FirewallActionType:
item = &FirewallAction{}
default:
return fmt.Errorf("can't parse action with type %v", curType)
}
err = item.ParseInterface(cur)
if err != nil {
cl := log.WithFields(log.Fields{"type": cur.(map[string]interface{})["type"].(string)})
cl.Debugf("Can't parse interface: %v", err)
return err
}
rule.Actions = append(rule.Actions, item)
}
return nil
}