Elasticsearch, Bulk Uploading and the High-Level Java REST Client - Part 2

Published

Learn about how to use the BulkProcessor to bulk up all your Elasticsearch updates in this second part of our bulk upload series. Then learn about one neat trick that'll really speed up your uploads.

In the first part of this series, we put together a bulk upload application which uses the High-Level Java REST client for Elasticsearch. We created batches of inserts and when the count was high enough, we sent off the bulk request and sorted the results for errors. It's a little tedious in a short example to manage those requests and counts, so just imagine how complex it could get in a big production application. That's where the BulkProcessor comes in.

The BulkProcessor

The BulkProcessor is another option in the High-Level Java REST client, but its job is to batch up and manage a queue of database requests. You write your code so that it just sends its index, delete and other requests to an instance of the BulkProcessor and it will accumulate them until there's enough to form a bulk request.

For our uploader that makes the main code path super-simple. We create a BulkProcessor, we read from our data file, we wrap each line up in an IndexRequest as a JSON document and we add that to the BulkProcessor...

    BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build();

    BufferedReader br = new BufferedReader(new FileReader("enron.json"));

    String line;

    while ((line = br.readLine()) != null) {
        bulkProcessor.add(new IndexRequest(indexName, "mail").source(line, XContentType.JSON));
    }

You can find this code in the repository as BulkProcessorUpload.java. And we're done...

Wait till done

Well, not quite. We don't know if we've processed everything yet. The BulkProcessor has a count of items to hit before it sends anything off so the last items are probably still waiting to go. What we need to do it to get it to send those last items, make sure they were processed and then close the connection. The good news is there's one call to do all that, awaitClose(). Call it, telling it how long to wait and it will stop all the scheduled uploads and flush the current batch out to the server. It'll thenwait till that is done or it times out.

    boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

    if(!terminated) {
        System.out.println("Some requests have not been processed");
    }

awaitClose() returns true if it finished correctly or false if it timed out without confirming things.

Listening to the BulkProcessor

You may have noticed is that we're missing something. That something is how we check the results of the bulk upload. Back when we created the BulkProcessor we handed over a parameter listener. That's a BulkProcessor.Listener and it's there to let you intervene as appropriate in your queue management.

Counting the uploaded

Let's look at the one we're using in our example:

    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
        int count = 0;

        @Override
        public void beforeBulk(long l, BulkRequest bulkRequest) {
            count = count + bulkRequest.numberOfActions();
            System.out.println("Uploaded " + count + " so far");
        }

There are three methods you can override in a BulkProcessor.Listener and the first is the simplest. The beforeBulk method is called just before the BulkProcessor sends the current request. We're using it here to count how many records we've uploaded in total. We get the count of new records with the numberOfActions() method on the BulkRequest.

Managing failure

Next up we have un-exceptional errors to process.

    @Override
    public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
        if (bulkResponse.hasFailures()) {
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (bulkItemResponse.isFailed()) {
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                    System.out.println("Error " + failure.toString());
                    }
              }
         }
      }

The first afterBulk method is called after a bulk batch has been sent to the server and the server's response is received. Both the original bulkRequest and the new bulkResponse are handed over for post-processing.

We use the same style of processing that we used in our previous example; checking for failures with hasFailures() and stepping through the responses if there were any. We're only printing out the errors, but it would be possible to re-queue updates if needed. Another assumption we are making is that our uploader is the only generator of bulk requests and they are all index requests. The listener would, in a fuller application, get all the different queued requests including deletes and updates. That means your application would have to look at what it type of operation with getOpType() to see if it was an index, update or delete.

Managing the unexpected

So now we've handled the expected failures, what about the unexpected ones, the exceptions. Well, there's a second version of afterBulk which is invoked with the original bulk request and the exception that was thrown trying to process them. There are all sorts of strategies for handling this kind of potentially terminal problem. In our example, we're just going to print out that it happened:

    @Override
        public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
            System.out.println("Big errors " + throwable.toString());
        }
    };

With the listener taking care of the pre and post-processing of the queue, we're done.

Code and hints

The code for this is in the BulkProcessorUpload.java file in the repository. It performs at roughly the same speed as our previous example but, code-wise, it's a much cleaner and flexible foundation for developing against. The BulkProcessor is also easier to configure. In our example, we've just let its defaults do the work - up to 1000 actions or 5MB of data triggers a bulk send.

Those settings can be configured with the .setBulkActions() and .setBulkSize() methods of the BulkProcessor, or disabled completely. There's also an option to timed flushing of the queue with .setFlushInterval(). Finally, there are options to control the number of concurrent requests in flight and set up the backoff policy for when there are retryable problems. Read more about the BulkProcessor in the documentation.

Replica Tricks

There is one final twist to this tale of bulk uploading. Did you know that when you are writing a lot of data to an Elasticsearch, the chances are that it is being replicated in the cluster as you write? That slows things down a bit and one common piece of advice for Elasticsearch is, when bulk uploading, turn off replication and turn it back on when you are done. How much faster? Well in this unscientific test (i.e. any test which traverses the internet), we saw a 25% improvement in bulk upload times.

So let's show you how to do this. We'll set the number of replicas to 0 when we create it, and when we're done, we'll set it back to 1. The first change has to come when we make the CreateIndexRequest.

        CreateIndexRequest cireq = new CreateIndexRequest(indexName);
                cireq.settings(Settings.builder().put("index.number_of_replicas" , 0));
        CreateIndexResponse ciresp = client.indices().create(cireq);

The cireq.settings() line is the new code, and it changes the settings on the create request to set the number_of_replicas to 0.

The actual bulk upload carries on as before but just before we close things down, we need to reset that number_of_replicas. Here's where there's another gap in the current High-level REST Java client and we have to drop down to the low-level client again.

Before doing that, we need to prepare our settings:

    String jsonString="{ \"index\": { \"number_of_replicas\": 1 } }";
    HttpEntity entity=new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
    Map<String, String> params = Collections.emptyMap();

We create a string with the JSON of our replica setting command and then encode that as an HTTP entity. We also need an empty parameters map because the next thing we are going to call it the low-level client's performRequest() method. That has a parameters map parameter which it demands in populated. Let's make that call:

       Response resetresponse = client.getLowLevelClient().performRequest("PUT", "/"+indexName+"/_settings",params,entity);

We get the low-level client from the high-level client and use that to call performRequest(). In this case, it's a "PUT" operation on the "/enron/_settings" with no parameters and a body that contains the JSON setting. You can, of course, check the response to ensure it has run correctly. Once that setting has been made, the Elasticsearch nodes will begin their work of creating replicas to match the new value.

The modified code for this example is in BulkProcessorUploadNoReplicas.java within the repository for reference. Remember that when there are no replicas, your data is more fragile on the servers as there is only one copy. If you're prepared to work with that, then its an option worth considering.

Closing Down

In this short series, we have looked at bulk uploading, through the Bulk API, both unassisted and assisted by the BulkProcessor. We also looked at adjusting replication for faster uploading. There was one more thing; where in the past we've recommended that users heavily, if temporarily, scale up their Elasticsearch deployments, using the API and Elasticsearch 6 resulted in us only needing to scale to up to a capacity that was sufficient for the final data.

With future revisions of the Elasticsearch High-Level Java REST Client looking to fill out the API coverage, it looks like it'll be an excellent replacement for the soon to be retired Transport Client and you can get going with it now. And remember, if you don't have an Elasticsearch database to hand, it's a matter on minutes to sign up for a Compose free trial and get one for 30 days.


Read more articles about Compose databases - use our Curated Collections Guide for articles on each database type. If you have any feedback about this or any other Compose article, drop the Compose Articles team a line at articles@compose.com. We're happy to hear from you.

attribution Patrick Baum

Dj Walker-Morgan
Dj Walker-Morgan is Compose's resident Content Curator, and has been both a developer and writer since Apples came in II flavors and Commodores had Pets. Love this article? Head over to Dj Walker-Morgan’s author page to keep reading.

Conquer the Data Layer

Spend your time developing apps, not managing databases.