package es import ( "context" "crypto/tls" "encoding/base64" "encoding/json" "fmt" "io" "net/http" "regexp" "time" "github.com/olivere/elastic/v7" log "github.com/sirupsen/logrus" "github.com/spf13/viper" "iwarma.ru/console/correlator/config" ) /* For testing purpose https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html Prepare docker pull docker.elastic.co/elasticsearch/elasticsearch:7.12.0 Run docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms2g -Xmx2g" docker.elastic.co/elasticsearch/elasticsearch:7.12.0 */ // Elastic Struct to encapsulate elastic context type Elastic struct { log *log.Entry client *elastic.Client ctx context.Context } func NewClient() (*elastic.Client, error) { var elasticClient *elastic.Client options := make([]elastic.ClientOptionFunc, 2) options[0] = elastic.SetURL(viper.GetString(config.ElasticUrl)) options[1] = elastic.SetBasicAuth( viper.GetString(config.ElasticUsername), viper.GetString(config.ElasticPassword)) if viper.GetBool(config.ElasticIgnoreSSLErrors) { transport := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } httpClient := &http.Client{Transport: transport} options = append(options, elastic.SetHttpClient(httpClient)) log.Info("Disable SSL validation") } gotHttps, err := regexp.MatchString("^https://", viper.GetString(config.ElasticUrl)) if err != nil { log.Errorf("Can't parse elasticsearch url: %v", err) return nil, err } if gotHttps { log.Debug("Set HTTPS scheme") options = append(options, elastic.SetScheme("https")) } log.Info("Connecting to elastic") for i := 0; i < viper.GetInt(config.ElasticRetryCount); i++ { client, err := elastic.NewClient(options...) if err == nil { elasticClient = client break } log.Infof("No connection. Retry in %v. Attempt %v from %v", viper.GetDuration(config.ElasticConnectionTimeout), i+1, viper.GetInt(config.ElasticRetryCount)) time.Sleep(viper.GetDuration(config.ElasticConnectionTimeout)) } // End elastic connection if elasticClient == nil { return nil, fmt.Errorf("can't connect to elasticsearch") } return elasticClient, nil } // NewElastic Create new Elastic and connect it to server // This function can be slow due to elastic connection timeout func NewElastic() (*Elastic, error) { result := &Elastic{ log: log.WithField("part", "elastic"), ctx: context.Background(), } client, err := NewClient() if err != nil { return nil, fmt.Errorf("can't connect to elasticsearch") } result.client = client result.log.Debug("Elastic client is ready") return result, nil } // CheckAndCreateIndex Check that index exist and create if not func (el *Elastic) CheckAndCreateIndex(index ...string) error { el.log.Debugf("Checking index %v", index) mapping := `{ "settings":{ "number_of_shards":1, "number_of_replicas":0 }, "mappings":{ "properties": { "sign_name": { "type": "text", "fields": { "keyword": {"type": "keyword"} } }, "source_ip": { "type": "text", "fields": { "keyword": {"type": "keyword"} } }, "destination_ip": { "type": "text", "fields": { "keyword": {"type": "keyword"} } } } } } }` for _, cur := range index { exists, err := el.client.IndexExists(cur).Do(el.ctx) if err != nil { el.log.Errorf("%+v", err) return err } if !exists { createIndex, err := el.client.CreateIndex(cur).BodyString(mapping).ErrorTrace(true).Do(el.ctx) if err != nil { el.log.Errorf("%+v", err) return err } if !createIndex.Acknowledged { return fmt.Errorf("can't create index %v", cur) } } } return nil } // Query Run elasticsearch query func (el *Elastic) Query(index string, query elastic.Query) (chan *elastic.SearchHit, chan error) { hits := make(chan *elastic.SearchHit) errs := make(chan error, 1) scrollSize := viper.GetInt(config.ScrollSize) go func() { defer close(hits) defer close(errs) scroll := el.client.Scroll(index).Query(query) scroll.Size(scrollSize) // We need to call scroll.Do until we got an EOF for { res, err := scroll.Do(el.ctx) if err == io.EOF { break } if err != nil { el.log.Errorf("Got error from scroll: %v", err) errs <- err break } for _, hit := range res.Hits.Hits { select { case hits <- hit: case <-el.ctx.Done(): { errs <- el.ctx.Err() break } } } } // Need to clear scroll err := scroll.Clear(el.ctx) if err != nil { el.log.Errorf("%v", err) errs <- err } }() return hits, errs } func (el *Elastic) DeleteIndex(index ...string) error { res, err := el.client.DeleteIndex(index...).Do(el.ctx) if err != nil { el.log.Errorf("Can't delete index %v: %v", index, err) return err } if !res.Acknowledged { el.log.Errorf("Can't delete index %v", index) return fmt.Errorf("can't delete index") } return nil } func (el *Elastic) CheckIndex(index string) (bool, error) { res, err := el.client.IndexExists(index).Do(el.ctx) if err != nil { el.log.Errorf("Can't check index %v exist: %v", index, err) return false, err } return res, nil } func (el *Elastic) NewBulkRequest() *elastic.BulkService { return el.client.Bulk() } func (el *Elastic) ExecuteBulk(bulk *elastic.BulkService) (*elastic.BulkResponse, error) { return bulk.Do(el.ctx) } func (el *Elastic) NewBulkProcessor(workers int, actions int, flush time.Duration, stats bool) (*elastic.BulkProcessor, error) { processor := el.client.BulkProcessor() processor.Workers(workers) processor.BulkActions(actions) processor.FlushInterval(flush) processor.Stats(stats) return processor.Do(el.ctx) } // DebugQuery Print query to log func (el *Elastic) DebugQuery(index string, query elastic.Query) { if viper.GetBool(config.ElasticLogQuery) { go func() { source, err := query.Source() if err != nil { el.log.Errorf("Got error while getting source: %v", err) return } bytes, err := json.Marshal(source) if err != nil { el.log.Errorf("Got error while query marshal: %v", err) el.log.Debugf("Query origin: %v", source) return } if viper.GetBool(config.ElasticLogEncodeQuery) { encoded := base64.StdEncoding.EncodeToString(bytes) clog := el.log.WithFields(log.Fields{"query": encoded, "index": index}) clog.Info("Query ready") } else { clog := el.log.WithFields(log.Fields{"query": string(bytes), "index": index}) clog.Info("Query ready") } }() } } func (el *Elastic) RefreshIndex(index ...string) error { res, err := el.client.Refresh(index...).Do(el.ctx) if err != nil { el.log.Errorf("Can't refresh index: %v", err) return err } var hasErrors bool if res.Shards.Failed > 0 { hasErrors = true for _, cur := range res.Shards.Failures { if cur != nil { reason, err := json.Marshal(cur.Reason) if err != nil { el.log.Errorf("Can't marshal index refresh reason: %v", err) el.log.Debugf("Reason: %v", cur.Reason) continue } el.log.Errorf("Error refresh index %v. Reason %v, status %v", cur.Index, string(reason), cur.Status) } } } if hasErrors { return fmt.Errorf("got some errors") } return nil } func (el *Elastic) CountDocuments(index ...string) (int64, error) { return el.client.Count(index...).Do(el.ctx) }