another man's ramblings on code and tech

Performing Bulk Operations on Elasticsearch Databases


Elasticsearch is a powerful database technology that uses unique ideas with its restful API for queries. However when it comes to updating and reindexing Elasticsearch has no built in functionality for performing these operations in bulk. This means to do tasks like this one must make their own script composed of various API requests. Additionally, making requests is slow; we want to reduce the amount of requests as much as possible. The Elasticsearch team recognized this and included the scan and scroll options and bulk API for such situations. This post will discuss performing mass updates and reindexing using both of these APIs. We will use Python here for simplicity, but this logic can be applied to any language.

Scan and Scroll

Before discussing the update or bulk API calls we should focus on the scan and scroll options in Elasticsearch. Including a scroll search type parameter in your search API call makes Elasticsearch ignore sorting and rankings when returning the results of your query. This speeds up the whole process by allowing Elasticsearch to simply dump all results in no particular order. The scroll option (which is mandatory for scan search types) defines how long Elasticsearch should wait for the next scan query to come in as it internally keeps track of the last batch of results returned. So a scroll of 1m means it should wait at most 1 minute before clearing data about this search. An example API endpoint using scan and scroll would be:

'http://localhost:9200/logstash*/_search?search_type=scan&scroll=1m'

The first call to that endpoint with a POSTed query will return a "scroll_id" field (among other data regarding the query, such as total hits). It will not, however, return any of the actual hits of that query. To start receiving hits one must extract the "scroll_id" field and send it as POST data to the above link. Then you will start receiving lists of hits matching that query. To perform an operation like this in Python would look like:

import requests
import json

# Define url and query (swap "index" for whatever index you're working with)
url = 'http://localhost:9200/index/_search?search_type=scan&scroll=1m&pretty'
# Example query that gets results between Aug 20 and 22
# Note that we define what fields to grab; this is important
query = """
 {
    "fields": ["_id", "_type", "_index"],
    "query": {
        "range" : {
             "@timestamp" : {
                 "gte" : "2015-08-20T00:00:00.000Z",
                 "lte" : "2015-08-22T00:00:00.000Z"
             }
         }
     }
 }
"""

# Make request posting query data
r = requests.get(url, data=query)

# Extract scroll ID
data = json.loads(r.text)
scrollId = data["_scroll_id"]

# Make a new request using the scroll ID
url = 'http://localhost:9200/_search/scroll?scroll=1m'
r = requests.get(url, data=scrollId)

# And extract hits
data = json.loads(r.text)
hits = data["hits"]["hits"]

This example will make the first request to get the scroll ID, extract the scroll ID, and then make another request to actually begin getting hits. Note that in the query we use the "fields" array to grab only the fields we need in the query; this is important, as you want to reduce unnecessary JSON parsing as much as possible. Now we can loop around this request extracting results until we receive no more hits.

The Bulk API

Elasticsearch's bulk API is relatively easy to learn. Essentially you compile a list of JSON objects containing the type of operation and what to perform it on separated by newlines. If an operation would require more data than what to do and on what document (for example, an update) then you include this data as a line under the original operation. This means an update would look like:

{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

This translates to updating the field "field2" in the document which has an ID of "1", a type of "type1", and the index "index1". The update API will either add "field2" if it does not exist or modify its contents if it does already. Any more operations we need would be appended to this list. For example, two updates would look like:

{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }
{ "update" : {"_id" : "2", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field3" : "value3"} }

We would then post this string to:

'http://localhost:9200/_bulk'

And receive back a list operation returns for each update (success, failure, etc). To emulate this sort of functionality in code we would use:

# Generate update string
updatecall = '{ "update" : { "_index" : "' + result["_index"] + '", "_type" : "' + result["_type"] + '", "_id" : "' + result["_id"] + '" } }'

# Generate update data
updatedata = '{ "doc" : {"field" : "test"} }'

# Append to poststring
poststring += updatecall + "\n" + updatedata + "\n"

# Send update request to bulk api
url = 'http://localhost:9200/_bulk'
r = requests.post(url, data=poststring)

Putting it all Together

So now that we know how the scan-scroll search type, bulk API, and update command for the bulk API works we can put it all together to create a loop which takes in results and updates data accordingly. For example:

import requests
import json

# Define url and query (swap "index" for whatever index you're working with)
url = 'http://localhost:9200/index/_search?search_type=scan&scroll=1m&pretty'

# Example query that gets results between Aug 20 and 22
# Note that we define what fields to grab; this is important
query =  """
                {
                "fields": ["_id", "_type", "_index"],
                "query": { 
                    "range" : {
                            "@timestamp" : {
                                "gte" : "2015-08-20T00:00:00.000Z",
                                "lte" : "2015-08-22T00:00:00.000Z"
                            }
                        }
                    }
                }
        """

# Make request posting query data
r = requests.get(url, data=query)

# Extract scroll ID
data = json.loads(r.text)
scrollId = data["_scroll_id"]

# Make a new request using the scroll ID
url = 'http://localhost:9200/_search/scroll?scroll=1m'
r = requests.get(url, data=scrollId)

# And extract hits
data = json.loads(r.text)
hits = data["hits"]["hits"]

# Loop for all hits
while len(hits) > 0:
    for result in hits:
     # Generate update string
     updatecall = '{ "update" : { "_index" : "' + result["_index"] + '", "_type" : "' + result["_type"] + '", "_id" : "' + result["_id"] + '" } }'

     # Generate update data
     updatedata = '{ "doc" : {"field" : "test"} }'

     # Append to poststring
     poststring += updatecall + "\n" + updatedata + "\n"    

    # Send update request to bulk api
    url = 'http://localhost:9200/_bulk'
    r = requests.post(url, data=poststring)

    # Request next set of data
    url = 'http://localhost:9200/_search/scroll?scroll=1m'
    r = requests.get(url, data=scrollId)

    # Extract hits and reset poststring
    data = json.loads(r.text)
    hits = data["hits"]["hits"]
    poststring = ""

This program would loop around the hits returned from the search, generating update calls for each, post the changes to the bulk API, and request the next list of hits until there were no more results to process. This demonstrates the basic logic of working with Elasticsearch changes in bulk. A similar strategy can be used for reindexing; simply take a result, delete it with the bulk API delete command and then insert the same one to a different index, reindexing all results. I hope this has helped you understand Elasticsearch in better depth!

Date: Aug 25 2015

PREVIOUS NEXT