package rules import ( "encoding/json" "errors" "fmt" "iwarma.ru/console/correlator/es" "iwarma.ru/console/correlator/events" "sync" "time" log "github.com/sirupsen/logrus" "github.com/olivere/elastic/v7" "github.com/spf13/viper" "iwarma.ru/console/correlator/config" ) // Rule is a correlator rule type Rule struct { Id string `json:"id"` Name string `json:"name"` Description string `json:"description"` Multi bool `json:"multi"` Depth time.Duration `json:"depth"` Predicate Predicate `json:"predicat"` Actions []Action `json:"actions"` } // GetRangeQuery Create range query for selected depth func (rule Rule) GetRangeQuery() (*elastic.BoolQuery, error) { if int(rule.Depth) == 0 { return nil, errors.New("have empty duration") } return elastic.NewBoolQuery().Should( elastic.NewRangeQuery("event_first").Gte(time.Now().UTC().Add(-rule.Depth)).Lte(time.Now().UTC()), elastic.NewRangeQuery("event_last").Gte(time.Now().UTC().Add(-rule.Depth)).Lte(time.Now().UTC())), nil } // Do Apply rule func (rule Rule) Do(client *es.Elastic) (*[]*events.Event, error) { cl := log.WithFields(log.Fields{"name": rule.Name, "id": rule.Id}) cl.Debug("Starting rule") defer cl.Debug("Done rule") if len(rule.Id) == 0 { return nil, errors.New("need rule ID to be set") } if rule.Depth == 0 { return nil, errors.New("bad duration value") } // Don't process same event twice tagsQuery := elastic.NewMatchQuery("rule_tags", rule.Id) // Some types of predicates need additional processing query, err := rule.GetRangeQuery() if err != nil { cl.Error(err) return nil, err } query.Must(rule.Predicate).MustNot(tagsQuery) // Show query if user want client.DebugQuery(events.GetAggregatedIndexName(), query) aggregatedEvents := make([]*events.Event, 0) // Query for events raw, errs1 := client.Query(events.GetAggregatedIndexName(), query) aggregated, errs2 := events.ParseEvents(raw, errs1, viper.GetInt(config.Threads)) cl.Debug("Start reading results") for event := range aggregated { aggregatedEvents = append(aggregatedEvents, event) } cl.Debug("Finish reading results") for err := range errs2 { cl.Debug("Got errors") return nil, err } cl.Debugf("Done getting events. Got %v events", len(aggregatedEvents)) if len(aggregatedEvents) == 0 { cl.Debug("No aggregated events suitable for rule") return nil, nil } cl.Debug("Starting actions") var wg sync.WaitGroup wg.Add(len(rule.Actions)) actionErrs := make(chan error, len(rule.Actions)) // Apply actions for _, cur := range rule.Actions { go func(action Action) { actionLog := cl.WithField("multi", rule.Multi) defer wg.Done() actionLog.Tracef("Start action %v", action.GetType()) defer actionLog.Tracef("Finish action %v", action.GetType()) if rule.Multi { actionLog.Debugf("Sending %v events to action", len(aggregatedEvents)) err := action.Perform(&aggregatedEvents) if err != nil { actionErrs <- err } } else { slice := aggregatedEvents[0:1] actionLog.Debugf("Sending %v events to action", len(slice)) err := action.Perform(&slice) if err != nil { actionErrs <- err } } }(cur) } wg.Wait() close(actionErrs) for err := range actionErrs { if err != nil { cl.Errorf("%v", err) return nil, err } } return &aggregatedEvents, nil } //################################################### // Json serializer/deserializer //################################################### func (rule Rule) MarshalJSON() ([]byte, error) { return json.Marshal(&struct { Id string `json:"id"` Name string `json:"name"` Description string `json:"description"` Depth string `json:"depth"` Predicat Predicate `json:"predicat"` Multi bool `json:"multi"` Actions *[]Action `json:"actions"` }{ Id: rule.Id, Name: rule.Name, Description: rule.Description, Depth: rule.Depth.String(), Predicat: rule.Predicate, Actions: &rule.Actions, }) } func (rule *Rule) UnmarshalJSON(b []byte) error { alias := &struct { Id string `json:"id"` Name string `json:"name"` Description string `json:"description"` Depth string `json:"depth"` Predicat Predicate `json:"predicat"` Multi bool `json:"multi"` Actions []interface{} `json:"actions"` }{} err := json.Unmarshal(b, alias) if err != nil { log.Debugf("Can't unmarshall rule: %v", string(b)) return err } rule.Depth, err = time.ParseDuration(alias.Depth) if err != nil { log.Debugf("Can't parse depth: %v", alias.Depth) return err } //https://pkg.go.dev/github.com/mitchellh/mapstructure?tab=doc rule.Id = alias.Id rule.Name = alias.Name rule.Description = alias.Description rule.Predicate = alias.Predicat rule.Multi = alias.Multi for _, cur := range alias.Actions { var item Action switch curType := cur.(map[string]interface{})["type"].(string); curType { case HttpActionType: item = &HttpAction{} case SyslogActionType: item = &SyslogAction{} case IncidentActionType: item = &IncidentAction{} case BashActionType: item = &BashAction{} case ExecActionType: item = &ExecAction{} case AssetActionType: item = &AssetAction{} case FirewallActionType: item = &FirewallAction{} default: return fmt.Errorf("can't parse action with type %v", curType) } err = item.ParseInterface(cur) if err != nil { cl := log.WithFields(log.Fields{"type": cur.(map[string]interface{})["type"].(string)}) cl.Debugf("Can't parse interface: %v", err) return err } rule.Actions = append(rule.Actions, item) } return nil }