308 lines
7.3 KiB
Go
308 lines
7.3 KiB
Go
package es
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"regexp"
|
|
"time"
|
|
|
|
"github.com/olivere/elastic/v7"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/spf13/viper"
|
|
"iwarma.ru/console/correlator/config"
|
|
)
|
|
|
|
/*
|
|
For testing purpose https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html
|
|
Prepare docker pull docker.elastic.co/elasticsearch/elasticsearch:7.12.0
|
|
Run docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms2g -Xmx2g" docker.elastic.co/elasticsearch/elasticsearch:7.12.0
|
|
*/
|
|
|
|
// Elastic Struct to encapsulate elastic context
|
|
type Elastic struct {
|
|
log *log.Entry
|
|
client *elastic.Client
|
|
ctx context.Context
|
|
}
|
|
|
|
func NewClient() (*elastic.Client, error) {
|
|
var elasticClient *elastic.Client
|
|
options := make([]elastic.ClientOptionFunc, 2)
|
|
options[0] = elastic.SetURL(viper.GetString(config.ElasticUrl))
|
|
options[1] = elastic.SetBasicAuth(
|
|
viper.GetString(config.ElasticUsername),
|
|
viper.GetString(config.ElasticPassword))
|
|
|
|
if viper.GetBool(config.ElasticIgnoreSSLErrors) {
|
|
transport := &http.Transport{
|
|
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
|
}
|
|
httpClient := &http.Client{Transport: transport}
|
|
options = append(options, elastic.SetHttpClient(httpClient))
|
|
log.Info("Disable SSL validation")
|
|
}
|
|
|
|
gotHttps, err := regexp.MatchString("^https://", viper.GetString(config.ElasticUrl))
|
|
if err != nil {
|
|
log.Errorf("Can't parse elasticsearch url: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
if gotHttps {
|
|
log.Debug("Set HTTPS scheme")
|
|
options = append(options, elastic.SetScheme("https"))
|
|
}
|
|
|
|
log.Info("Connecting to elastic")
|
|
for i := 0; i < viper.GetInt(config.ElasticRetryCount); i++ {
|
|
client, err := elastic.NewClient(options...)
|
|
if err == nil {
|
|
elasticClient = client
|
|
break
|
|
}
|
|
|
|
log.Infof("No connection. Retry in %v. Attempt %v from %v",
|
|
viper.GetDuration(config.ElasticConnectionTimeout),
|
|
i+1,
|
|
viper.GetInt(config.ElasticRetryCount))
|
|
|
|
time.Sleep(viper.GetDuration(config.ElasticConnectionTimeout))
|
|
} // End elastic connection
|
|
if elasticClient == nil {
|
|
return nil, fmt.Errorf("can't connect to elasticsearch")
|
|
}
|
|
return elasticClient, nil
|
|
}
|
|
|
|
// NewElastic Create new Elastic and connect it to server
|
|
// This function can be slow due to elastic connection timeout
|
|
func NewElastic() (*Elastic, error) {
|
|
result := &Elastic{
|
|
log: log.WithField("part", "elastic"),
|
|
ctx: context.Background(),
|
|
}
|
|
client, err := NewClient()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't connect to elasticsearch")
|
|
}
|
|
|
|
result.client = client
|
|
|
|
result.log.Debug("Elastic client is ready")
|
|
return result, nil
|
|
}
|
|
|
|
// CheckAndCreateIndex Check that index exist and create if not
|
|
func (el *Elastic) CheckAndCreateIndex(index ...string) error {
|
|
el.log.Debugf("Checking index %v", index)
|
|
mapping := `{
|
|
"settings":{
|
|
"number_of_shards":1,
|
|
"number_of_replicas":0
|
|
},
|
|
"mappings":{
|
|
"properties": {
|
|
"sign_name": {
|
|
"type": "text",
|
|
"fields": {
|
|
"keyword": {"type": "keyword"}
|
|
}
|
|
},
|
|
"source_ip": {
|
|
"type": "text",
|
|
"fields": {
|
|
"keyword": {"type": "keyword"}
|
|
}
|
|
},
|
|
"destination_ip": {
|
|
"type": "text",
|
|
"fields": {
|
|
"keyword": {"type": "keyword"}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}`
|
|
for _, cur := range index {
|
|
exists, err := el.client.IndexExists(cur).Do(el.ctx)
|
|
if err != nil {
|
|
el.log.Errorf("%+v", err)
|
|
return err
|
|
}
|
|
|
|
if !exists {
|
|
createIndex, err := el.client.CreateIndex(cur).BodyString(mapping).ErrorTrace(true).Do(el.ctx)
|
|
if err != nil {
|
|
el.log.Errorf("%+v", err)
|
|
return err
|
|
}
|
|
|
|
if !createIndex.Acknowledged {
|
|
return fmt.Errorf("can't create index %v", cur)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Query Run elasticsearch query
|
|
func (el *Elastic) Query(index string, query elastic.Query) (chan *elastic.SearchHit, chan error) {
|
|
hits := make(chan *elastic.SearchHit)
|
|
errs := make(chan error, 1)
|
|
scrollSize := viper.GetInt(config.ScrollSize)
|
|
go func() {
|
|
defer close(hits)
|
|
defer close(errs)
|
|
|
|
scroll := el.client.Scroll(index).Query(query)
|
|
scroll.Size(scrollSize)
|
|
|
|
// We need to call scroll.Do until we got an EOF
|
|
for {
|
|
res, err := scroll.Do(el.ctx)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
|
|
if err != nil {
|
|
el.log.Errorf("Got error from scroll: %v", err)
|
|
errs <- err
|
|
break
|
|
}
|
|
|
|
for _, hit := range res.Hits.Hits {
|
|
select {
|
|
case hits <- hit:
|
|
case <-el.ctx.Done():
|
|
{
|
|
errs <- el.ctx.Err()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Need to clear scroll
|
|
err := scroll.Clear(el.ctx)
|
|
if err != nil {
|
|
el.log.Errorf("%v", err)
|
|
errs <- err
|
|
}
|
|
}()
|
|
|
|
return hits, errs
|
|
}
|
|
|
|
func (el *Elastic) DeleteIndex(index ...string) error {
|
|
res, err := el.client.DeleteIndex(index...).Do(el.ctx)
|
|
if err != nil {
|
|
el.log.Errorf("Can't delete index %v: %v", index, err)
|
|
return err
|
|
}
|
|
|
|
if !res.Acknowledged {
|
|
el.log.Errorf("Can't delete index %v", index)
|
|
return fmt.Errorf("can't delete index")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (el *Elastic) CheckIndex(index string) (bool, error) {
|
|
res, err := el.client.IndexExists(index).Do(el.ctx)
|
|
if err != nil {
|
|
el.log.Errorf("Can't check index %v exist: %v", index, err)
|
|
return false, err
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (el *Elastic) NewBulkRequest() *elastic.BulkService {
|
|
return el.client.Bulk()
|
|
}
|
|
|
|
func (el *Elastic) ExecuteBulk(bulk *elastic.BulkService) (*elastic.BulkResponse, error) {
|
|
return bulk.Do(el.ctx)
|
|
}
|
|
|
|
func (el *Elastic) NewBulkProcessor(workers int, actions int, flush time.Duration, stats bool) (*elastic.BulkProcessor, error) {
|
|
processor := el.client.BulkProcessor()
|
|
processor.Workers(workers)
|
|
processor.BulkActions(actions)
|
|
processor.FlushInterval(flush)
|
|
processor.Stats(stats)
|
|
|
|
return processor.Do(el.ctx)
|
|
}
|
|
|
|
// DebugQuery Print query to log
|
|
func (el *Elastic) DebugQuery(index string, query elastic.Query) {
|
|
if viper.GetBool(config.ElasticLogQuery) {
|
|
go func() {
|
|
source, err := query.Source()
|
|
if err != nil {
|
|
el.log.Errorf("Got error while getting source: %v", err)
|
|
return
|
|
}
|
|
|
|
bytes, err := json.Marshal(source)
|
|
if err != nil {
|
|
el.log.Errorf("Got error while query marshal: %v", err)
|
|
el.log.Debugf("Query origin: %v", source)
|
|
return
|
|
}
|
|
|
|
if viper.GetBool(config.ElasticLogEncodeQuery) {
|
|
encoded := base64.StdEncoding.EncodeToString(bytes)
|
|
clog := el.log.WithFields(log.Fields{"query": encoded, "index": index})
|
|
clog.Info("Query ready")
|
|
} else {
|
|
clog := el.log.WithFields(log.Fields{"query": string(bytes), "index": index})
|
|
clog.Info("Query ready")
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (el *Elastic) RefreshIndex(index ...string) error {
|
|
res, err := el.client.Refresh(index...).Do(el.ctx)
|
|
if err != nil {
|
|
el.log.Errorf("Can't refresh index: %v", err)
|
|
return err
|
|
}
|
|
|
|
var hasErrors bool
|
|
|
|
if res.Shards.Failed > 0 {
|
|
hasErrors = true
|
|
for _, cur := range res.Shards.Failures {
|
|
if cur != nil {
|
|
reason, err := json.Marshal(cur.Reason)
|
|
if err != nil {
|
|
el.log.Errorf("Can't marshal index refresh reason: %v", err)
|
|
el.log.Debugf("Reason: %v", cur.Reason)
|
|
continue
|
|
}
|
|
el.log.Errorf("Error refresh index %v. Reason %v, status %v", cur.Index, string(reason), cur.Status)
|
|
}
|
|
}
|
|
}
|
|
|
|
if hasErrors {
|
|
return fmt.Errorf("got some errors")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (el *Elastic) CountDocuments(index ...string) (int64, error) {
|
|
return el.client.Count(index...).Do(el.ctx)
|
|
}
|