old_console/correlator/events/util.go
2024-11-02 14:12:45 +03:00

264 lines
8.3 KiB
Go

package events
import (
"fmt"
"github.com/google/uuid"
"github.com/olivere/elastic/v7"
log "github.com/sirupsen/logrus"
"iwarma.ru/console/correlator/es"
"time"
)
//func PrepareElastic() {
// viper.Set(config.ElasticUrl, "http://elasticsearch:9200")
// viper.Set(config.ElasticRetryCount, 1)
// viper.Set(config.ElasticUsername, "elastic")
// viper.Set(config.ElasticPassword, "changeme")
// viper.Set(config.Verbose, true)
// viper.Set(config.ElasticAggregatedIndexName, "test-aggregated")
// viper.Set(config.ElasticNormalizedIndexName, "test-normalized")
// viper.Set(config.AggregatorIterationDuration, time.Second*2)
// viper.Set(config.Threads, 10)
//}
func ClearIndex(el *es.Elastic, index ...string) error {
for _, cur := range index {
exists, err := el.CheckIndex(cur)
if err != nil {
log.Errorf("%+v", err)
return err
}
if exists {
err = el.DeleteIndex(cur)
if err != nil {
return err
}
}
}
return nil
}
func FillNormalizedEvents(index string, count int, el *es.Elastic) error {
bulk := el.NewBulkRequest()
for i := 0; i < count; i++ {
event := Event{
"timestamp": time.Time{},
"type": "test",
"event_timestamp": time.Time{},
"event_id": uuid.NewString(),
"event_severity": uint8(i),
"event_src_msg": fmt.Sprintf("Test message %v", i),
"event_protocol": "TCP",
"device_vendor": "TestDevice",
"device_product": "TestProduct",
"device_version": "1.0",
"device_action": "Test",
"device_timezone": "Europe/Moscow",
"message": "Test message",
"sign_id": fmt.Sprintf("%v", i),
"sign_category": "Test",
"sign_subcategory": "Test",
"application": "Test application",
"source_ip": "127.0.0.1",
"source_mac": "00:50:56:c0:00:08",
"source_host": "localhost",
"source_port": uint32(i),
"source_user": "root",
"source_timezone": "Europe/Moscow",
"source_software": "test server",
"source_action": "test_action",
"destination_ip": "127.0.0.1",
"destination_mac": "00:50:56:c0:00:10",
"destination_timezone": "Europe/Moscow",
"destination_software": "Apache server",
"destination_host": "localhost",
"destination_action": "connect",
"destination_port": uint32(i),
"destination_user": "user",
"cs1": uint32(i * 5),
"cs1Label": "Signature",
"cs2": uint32(i + 20),
"cs2Label": "line_number",
"object_type": "webserver",
}
bulk = bulk.Add(elastic.NewBulkIndexRequest().Index(index).Id(event.GetString(EventID)).Doc(event))
}
bulkResponse, err := el.ExecuteBulk(bulk)
if err != nil {
log.Errorf("Can't index documents: %v", err)
return err
}
if bulkResponse.Errors {
log.Errorf("Got errors from bulk requset: %v", bulkResponse.Failed())
return fmt.Errorf("bulk error")
}
if len(bulkResponse.Indexed()) != count {
log.Errorf("Bad bulk index count. Got %v, expect %v", len(bulkResponse.Indexed()), count)
return fmt.Errorf("bad bulk count")
}
// wait until elastic is ready
time.Sleep(time.Second)
return nil
}
// FillNormalizedEventsForAggregation All thise event will be aggregated in one
func FillNormalizedEventsForAggregation(index string, count int, el *es.Elastic) error {
bulk := el.NewBulkRequest()
for i := 0; i < count; i++ {
event := Event{
"timestamp": time.Time{},
"type": "test",
"event_timestamp": time.Time{},
"event_id": uuid.NewString(),
"event_severity": 5,
"event_src_msg": "Message",
"event_protocol": "TCP",
"device_vendor": "TestDevice",
"device_product": "TestProduct",
"device_version": "1.0",
"device_action": "Test",
"device_timezone": "Europe/Moscow",
"sign_id": "Test",
"sign_category": "Test",
"sign_subcategory": "Test",
"application": "Test application",
"source_ip": "127.0.0.1",
"source_mac": "00:50:56:c0:00:08",
"source_host": "localhost",
"source_port": 5000,
"message": "Suricata message",
"source_user": "root",
"source_timezone": "Europe/Moscow",
"source_software": "test server",
"source_action": "test_action",
"destination_ip": "127.0.0.1",
"destination_mac": "00:50:56:c0:00:10",
"destination_timezone": "Europe/Moscow",
"destination_software": "Apache server",
"destination_host": "localhost",
"destination_action": "connect",
"destination_port": 8000,
"destination_user": "user",
"cs1": 2563,
"cs1Label": "Signature",
"cs2": 12,
"cs2Label": "line_number",
"object_type": "webserver",
}
bulk = bulk.Add(elastic.NewBulkIndexRequest().Index(index).Id(event.GetString(EventID)).Doc(event))
}
bulkResponse, err := el.ExecuteBulk(bulk)
if err != nil {
log.Errorf("Can't index documents: %v", err)
return err
}
if bulkResponse.Errors {
log.Errorf("Got errors from bulk requset: %v", bulkResponse.Failed())
return fmt.Errorf("bulk error")
}
if len(bulkResponse.Indexed()) != count {
log.Errorf("Bad bulk index count. Got %v, expect %v", len(bulkResponse.Indexed()), count)
return fmt.Errorf("bad bulk count")
}
// wait until elastic is ready
time.Sleep(time.Second)
return nil
}
// FillAggregatedEvents These events must not bee aggregated
func FillAggregatedEvents(index string, count int, el *es.Elastic) error {
bulk := el.NewBulkRequest()
for i := 0; i < count; i++ {
hash := uuid.NewString()
event := Event{
Hash: hash,
FirstEvent: time.Now().UTC().Add(time.Second * (-10)),
LastEvent: time.Now().UTC(),
EventCount: 0,
Created: time.Now().UTC().Add(time.Second * (-10)),
Tags: nil,
AggregatedId: hash,
CeleryDone: false,
"index": index,
"timestamp": time.Time{},
"type": "test",
"event_timestamp": time.Time{},
"event_id": uuid.NewString(),
"event_severity": uint8(i),
"event_src_msg": fmt.Sprintf("Test message %v", i),
"event_protocol": "TCP",
"device_vendor": "TestDevice",
"device_product": "TestProduct",
"device_version": "1.0",
"device_action": "Test",
"device_timezone": "Europe/Moscow",
"message": "Test message",
"sign_id": fmt.Sprintf("%v", i),
"sign_category": "Test",
"sign_subcategory": "Test",
"application": "Test application",
"source_ip": "127.0.0.1",
"source_mac": "00:50:56:c0:00:08",
"source_host": "localhost",
"source_port": uint32(i),
"source_user": "root",
"source_timezone": "Europe/Moscow",
"source_software": "test server",
"source_action": "test_action",
"destination_ip": "127.0.0.1",
"destination_mac": "00:50:56:c0:00:10",
"destination_timezone": "Europe/Moscow",
"destination_software": "Apache server",
"destination_host": "localhost",
"destination_action": "connect",
"destination_port": uint32(i),
"destination_user": "user",
"cs1": uint32(i * 5),
"cs1Label": "Signature",
"cs2": uint32(i + 20),
"cs2Label": "line_number",
"object_type": "webserver",
}
bulk = bulk.Add(elastic.NewBulkIndexRequest().Index(index).Id(event.GetString(AggregatedId)).Doc(event))
}
bulkResponse, err := el.ExecuteBulk(bulk)
if err != nil {
log.Errorf("Can't index documents: %v", err)
return err
}
if bulkResponse.Errors {
log.Errorf("Got errors from bulk requset: %v", bulkResponse.Failed())
return fmt.Errorf("bulk error")
}
if len(bulkResponse.Indexed()) != count {
log.Errorf("Bad bulk index count. Got %v, expect %v", len(bulkResponse.Indexed()), count)
return fmt.Errorf("bad bulk count")
}
// wait until elastic is ready
time.Sleep(time.Second)
return nil
}