264 lines
8.3 KiB
Go
264 lines
8.3 KiB
Go
package events
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/google/uuid"
|
|
"github.com/olivere/elastic/v7"
|
|
log "github.com/sirupsen/logrus"
|
|
"iwarma.ru/console/correlator/es"
|
|
"time"
|
|
)
|
|
|
|
//func PrepareElastic() {
|
|
// viper.Set(config.ElasticUrl, "http://elasticsearch:9200")
|
|
// viper.Set(config.ElasticRetryCount, 1)
|
|
// viper.Set(config.ElasticUsername, "elastic")
|
|
// viper.Set(config.ElasticPassword, "changeme")
|
|
// viper.Set(config.Verbose, true)
|
|
// viper.Set(config.ElasticAggregatedIndexName, "test-aggregated")
|
|
// viper.Set(config.ElasticNormalizedIndexName, "test-normalized")
|
|
// viper.Set(config.AggregatorIterationDuration, time.Second*2)
|
|
// viper.Set(config.Threads, 10)
|
|
//}
|
|
|
|
func ClearIndex(el *es.Elastic, index ...string) error {
|
|
for _, cur := range index {
|
|
exists, err := el.CheckIndex(cur)
|
|
if err != nil {
|
|
log.Errorf("%+v", err)
|
|
return err
|
|
}
|
|
|
|
if exists {
|
|
err = el.DeleteIndex(cur)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func FillNormalizedEvents(index string, count int, el *es.Elastic) error {
|
|
bulk := el.NewBulkRequest()
|
|
|
|
for i := 0; i < count; i++ {
|
|
event := Event{
|
|
"timestamp": time.Time{},
|
|
"type": "test",
|
|
"event_timestamp": time.Time{},
|
|
"event_id": uuid.NewString(),
|
|
"event_severity": uint8(i),
|
|
"event_src_msg": fmt.Sprintf("Test message %v", i),
|
|
"event_protocol": "TCP",
|
|
"device_vendor": "TestDevice",
|
|
"device_product": "TestProduct",
|
|
"device_version": "1.0",
|
|
"device_action": "Test",
|
|
"device_timezone": "Europe/Moscow",
|
|
"message": "Test message",
|
|
"sign_id": fmt.Sprintf("%v", i),
|
|
"sign_category": "Test",
|
|
"sign_subcategory": "Test",
|
|
"application": "Test application",
|
|
"source_ip": "127.0.0.1",
|
|
"source_mac": "00:50:56:c0:00:08",
|
|
"source_host": "localhost",
|
|
"source_port": uint32(i),
|
|
"source_user": "root",
|
|
"source_timezone": "Europe/Moscow",
|
|
"source_software": "test server",
|
|
"source_action": "test_action",
|
|
"destination_ip": "127.0.0.1",
|
|
"destination_mac": "00:50:56:c0:00:10",
|
|
"destination_timezone": "Europe/Moscow",
|
|
"destination_software": "Apache server",
|
|
"destination_host": "localhost",
|
|
"destination_action": "connect",
|
|
"destination_port": uint32(i),
|
|
"destination_user": "user",
|
|
"cs1": uint32(i * 5),
|
|
"cs1Label": "Signature",
|
|
"cs2": uint32(i + 20),
|
|
"cs2Label": "line_number",
|
|
"object_type": "webserver",
|
|
}
|
|
|
|
bulk = bulk.Add(elastic.NewBulkIndexRequest().Index(index).Id(event.GetString(EventID)).Doc(event))
|
|
}
|
|
|
|
bulkResponse, err := el.ExecuteBulk(bulk)
|
|
if err != nil {
|
|
log.Errorf("Can't index documents: %v", err)
|
|
return err
|
|
}
|
|
|
|
if bulkResponse.Errors {
|
|
log.Errorf("Got errors from bulk requset: %v", bulkResponse.Failed())
|
|
return fmt.Errorf("bulk error")
|
|
}
|
|
|
|
if len(bulkResponse.Indexed()) != count {
|
|
log.Errorf("Bad bulk index count. Got %v, expect %v", len(bulkResponse.Indexed()), count)
|
|
return fmt.Errorf("bad bulk count")
|
|
}
|
|
|
|
// wait until elastic is ready
|
|
time.Sleep(time.Second)
|
|
|
|
return nil
|
|
}
|
|
|
|
// FillNormalizedEventsForAggregation All thise event will be aggregated in one
|
|
func FillNormalizedEventsForAggregation(index string, count int, el *es.Elastic) error {
|
|
bulk := el.NewBulkRequest()
|
|
|
|
for i := 0; i < count; i++ {
|
|
|
|
event := Event{
|
|
"timestamp": time.Time{},
|
|
"type": "test",
|
|
"event_timestamp": time.Time{},
|
|
"event_id": uuid.NewString(),
|
|
"event_severity": 5,
|
|
"event_src_msg": "Message",
|
|
"event_protocol": "TCP",
|
|
"device_vendor": "TestDevice",
|
|
"device_product": "TestProduct",
|
|
"device_version": "1.0",
|
|
"device_action": "Test",
|
|
"device_timezone": "Europe/Moscow",
|
|
"sign_id": "Test",
|
|
"sign_category": "Test",
|
|
"sign_subcategory": "Test",
|
|
"application": "Test application",
|
|
"source_ip": "127.0.0.1",
|
|
"source_mac": "00:50:56:c0:00:08",
|
|
"source_host": "localhost",
|
|
"source_port": 5000,
|
|
"message": "Suricata message",
|
|
"source_user": "root",
|
|
"source_timezone": "Europe/Moscow",
|
|
"source_software": "test server",
|
|
"source_action": "test_action",
|
|
"destination_ip": "127.0.0.1",
|
|
"destination_mac": "00:50:56:c0:00:10",
|
|
"destination_timezone": "Europe/Moscow",
|
|
"destination_software": "Apache server",
|
|
"destination_host": "localhost",
|
|
"destination_action": "connect",
|
|
"destination_port": 8000,
|
|
"destination_user": "user",
|
|
"cs1": 2563,
|
|
"cs1Label": "Signature",
|
|
"cs2": 12,
|
|
"cs2Label": "line_number",
|
|
"object_type": "webserver",
|
|
}
|
|
|
|
bulk = bulk.Add(elastic.NewBulkIndexRequest().Index(index).Id(event.GetString(EventID)).Doc(event))
|
|
}
|
|
|
|
bulkResponse, err := el.ExecuteBulk(bulk)
|
|
if err != nil {
|
|
log.Errorf("Can't index documents: %v", err)
|
|
return err
|
|
}
|
|
|
|
if bulkResponse.Errors {
|
|
log.Errorf("Got errors from bulk requset: %v", bulkResponse.Failed())
|
|
return fmt.Errorf("bulk error")
|
|
}
|
|
|
|
if len(bulkResponse.Indexed()) != count {
|
|
log.Errorf("Bad bulk index count. Got %v, expect %v", len(bulkResponse.Indexed()), count)
|
|
return fmt.Errorf("bad bulk count")
|
|
}
|
|
|
|
// wait until elastic is ready
|
|
time.Sleep(time.Second)
|
|
|
|
return nil
|
|
}
|
|
|
|
// FillAggregatedEvents These events must not bee aggregated
|
|
func FillAggregatedEvents(index string, count int, el *es.Elastic) error {
|
|
bulk := el.NewBulkRequest()
|
|
|
|
for i := 0; i < count; i++ {
|
|
hash := uuid.NewString()
|
|
event := Event{
|
|
Hash: hash,
|
|
FirstEvent: time.Now().UTC().Add(time.Second * (-10)),
|
|
LastEvent: time.Now().UTC(),
|
|
EventCount: 0,
|
|
Created: time.Now().UTC().Add(time.Second * (-10)),
|
|
Tags: nil,
|
|
AggregatedId: hash,
|
|
CeleryDone: false,
|
|
"index": index,
|
|
"timestamp": time.Time{},
|
|
"type": "test",
|
|
"event_timestamp": time.Time{},
|
|
"event_id": uuid.NewString(),
|
|
"event_severity": uint8(i),
|
|
"event_src_msg": fmt.Sprintf("Test message %v", i),
|
|
"event_protocol": "TCP",
|
|
"device_vendor": "TestDevice",
|
|
"device_product": "TestProduct",
|
|
"device_version": "1.0",
|
|
"device_action": "Test",
|
|
"device_timezone": "Europe/Moscow",
|
|
"message": "Test message",
|
|
"sign_id": fmt.Sprintf("%v", i),
|
|
"sign_category": "Test",
|
|
"sign_subcategory": "Test",
|
|
"application": "Test application",
|
|
"source_ip": "127.0.0.1",
|
|
"source_mac": "00:50:56:c0:00:08",
|
|
"source_host": "localhost",
|
|
"source_port": uint32(i),
|
|
"source_user": "root",
|
|
"source_timezone": "Europe/Moscow",
|
|
"source_software": "test server",
|
|
"source_action": "test_action",
|
|
"destination_ip": "127.0.0.1",
|
|
"destination_mac": "00:50:56:c0:00:10",
|
|
"destination_timezone": "Europe/Moscow",
|
|
"destination_software": "Apache server",
|
|
"destination_host": "localhost",
|
|
"destination_action": "connect",
|
|
"destination_port": uint32(i),
|
|
"destination_user": "user",
|
|
"cs1": uint32(i * 5),
|
|
"cs1Label": "Signature",
|
|
"cs2": uint32(i + 20),
|
|
"cs2Label": "line_number",
|
|
"object_type": "webserver",
|
|
}
|
|
|
|
bulk = bulk.Add(elastic.NewBulkIndexRequest().Index(index).Id(event.GetString(AggregatedId)).Doc(event))
|
|
}
|
|
|
|
bulkResponse, err := el.ExecuteBulk(bulk)
|
|
if err != nil {
|
|
log.Errorf("Can't index documents: %v", err)
|
|
return err
|
|
}
|
|
|
|
if bulkResponse.Errors {
|
|
log.Errorf("Got errors from bulk requset: %v", bulkResponse.Failed())
|
|
return fmt.Errorf("bulk error")
|
|
}
|
|
|
|
if len(bulkResponse.Indexed()) != count {
|
|
log.Errorf("Bad bulk index count. Got %v, expect %v", len(bulkResponse.Indexed()), count)
|
|
return fmt.Errorf("bad bulk count")
|
|
}
|
|
|
|
// wait until elastic is ready
|
|
time.Sleep(time.Second)
|
|
|
|
return nil
|
|
}
|