Elasticsearch, Bulk Uploading and the High-Level Java REST Client - Part 1

Published

In this short series of articles, we want to practically look at bulk uploading data to Elasticsearch and using the relatively new High-Level Java REST Client as a platform for our uploads.

Bulk uploading data into Elasticsearch is a common way for developers to seed their search databases. It can be hard to get good upload performance though which is where the Bulk API comes in. In this short series, we'll look at two approaches to writing bulk uploading applications and harnessing bulk operations. But first, let's talk about Java clients for Elasticsearch.

Java, REST and Elasticsearch

Being written in Java, Elasticsearch has always had native support for the language. When we say native we mean native; Java clients would often talk the internode protocol of Elasticsearch. But as Elasticsearch has evolved, so the transport client as its known has fallen out of favor. Other languages conversed with Elasticsearch through the REST API, and Java was off to join them as a first class REST citizen.

Up until relatively recently, there was only a low-level Java REST client for Elasticsearch to deploy which meant almost, but not quite having to write something close to raw REST requests. The low-level Java REST client helped out a bit though and it is the foundation stone to the next Java client.

With the release of Elasticsearch 6, Elastic also made the High-Level Java REST client generally available. This gives a much more rounded API for a developer to work with and, being built on top of the low-level REST client, it makes it easy to drop down to that API too. The High-Level Java Rest Client is the way forward for Java/Elasticsearch users so let's put it to work.

Uploading for analysis

For our example, we're going to use the Enron Email dataset which we've converted into a line-delimited JSON file. You can, of course, use any large dataset you have to hand; we just happen to have a this LD-JSON file to hand for testing.

What we want to do is, given an appropriate connection string, log into Elasticsearch, create an index for the dataset, populate it and exit. Let's start with BulkUpload.java, the first incarnation of our bulk uploader.

The Code

If you're like us, you'll want to see the code first. It's available in the Github repository compose-ex/elasticsearchupload. The code is prepared with Maven handling dependencies and is configured for Java 9.

Getting connected

We'll skip past the import preamble and start with getting that connection string and getting connected.

 public static void main(String[] args) {
    URL url = null;

    try {
        url = new URL(System.getenv("COMPOSE_ELASTICSEARCH_URL"));
    } catch (MalformedURLException me) {
        System.err.println("COMPOSE_ELASTICSEARCH_URL not present or malformed");
        System.exit(1);
    }

    String host = url.getHost();
    int port = url.getPort();
    String user = url.getUserInfo().split(":")[0];
    String password = url.getUserInfo().split(":")[1];
    String protocol = url.getProtocol();

Here we take the environment variable and parse it into its essential components. It's housekeeping but it's important as we go to connect. One thing we need is a basic CredentialsProvider with that user and password so we can authenticate.

     final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();


  credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(user, password));

We can now create the RestHighLevelClient for Elasticsearch. It takes as a parameter the RestClient, the low level version, which it'll use for all its interactions with the database. You aren't seeing double...

   RestHighLevelClient client = new RestHighLevelClient(
            RestClient.builder(
               new HttpHost(host, port, protocol))
                 .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));

This creates the low-level REST client with the host, port and protocol settings we parsed out earlier and sets it up so it calls on our credentials provider to log in. We are ready to start talking to our Elasticsearch database.

Index Creation

Ironically, the first thing we want to do with the High-Level REST client we can't do, and that is checking if an index exists so we can decide on whether to create it. The feature is coming in a future version, but till then, we can make do. It is simple to reach into the high-level client and get the low-level client it's using.

        String indexName="enron";
        Response response = client.getLowLevelClient().performRequest("HEAD", "/" + indexName);
        int statusCode = response.getStatusLine().getStatusCode();

Once we have the low-level client, we do a REST "HEAD" operation on our named index and get the status code back. If it comes back with a 404, we know it's not found and we need to create it.

        if (statusCode == 404) {
            CreateIndexRequest cireq=new CreateIndexRequest(indexName);
            CreateIndexResponse ciresp=client.indices().create(cireq);
            System.out.println("Created index");
        } else {
            System.out.println("Index exists");
        }

Creating the index gets us back to the high-level client with a CreateIndexRequest which we send off to to the database's indices API. With the index in place, it's time to start uploading.

Bulking up

The Bulk API uses a BulkRequest object as a container for requests. Whenever the program decides it is ready, it can send that bulk request through to the server. Let's create that and some housekeeping variables for it.

         BulkRequest request=new BulkRequest();
         int count=0;
         int batch=15000;

         BufferedReader br=new BufferedReader(new FileReader("enron.json"));

         String line;

We've also taken the opportunity to open up our newline delimited JSON file. We are about to dive into out bulk uploading loop.

         while((line=br.readLine())!=null) {
            request.add(new IndexRequest(indexName,"mail").source(line, XContentType.JSON));
            count++;

We read a line of JSON from our file and then we add a request to our bulk request. In this case, it's an IndexRequest which will insert our new record. We set the source to the read line and the content type to JSON and... well that's it for this particular record. The count gets bumped up by one too.

Sending the Bulk

Now it's time to see if we should send the bulk request.

            if(count%batch==0) {
                BulkResponse bulkresp=client.bulk(request);

If our count modulo our batch is 0, it's time to send. The sending is simple enough in that we just present the request to the bulk API on the database. But, there's still the possible errors to handle. For that we have this:

                if(bulkresp.hasFailures()) {
                    for (BulkItemResponse bulkItemResponse : bulkresp) {
                        if (bulkItemResponse.isFailed()) {
                            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                            System.out.println("Error "+failure.toString());
                        }
                    }
               } 
               System.out.println("Uploaded "+count+" so far");
               request=new BulkRequest();
            }
        }

If there are any problems with the batch, the responses hasFailures() method will let you know quickly. If there are, we can unpack the BulkItemResponse with an iterator which will reveal each response to every update. If any of them return true to isFailed() we can unpack the failure and respond to it as appropriate. Here, well, we just print it out for reference.

Whatever happens, we make a new BulkRequest and carry on uploading until we run out of lines and JSON documents in our source file at which point we fall out of the loop.

Finishing up

We may though, at this point have an unsent bulk request, so we should check for that by looking at the numberOfActions() in the remaining request. If there's any still to do, we send them in the same way as we did previously. And yes, we could DRY out this code, but we are looking to keep the example easy to follow.

      if(request.numberOfActions()>0) {
           BulkResponse bulkresp = client.bulk(request);
           if (bulkresp.hasFailures()) {
               ....
           }

      System.out.println("Total uploaded: "+count);
      client.close();

After printing out the final total uploaded, we close the client and we are done. This is the simplest bulk uploading code we'll cover. It does assume that we're only bulk uploading and its handling of failure is non-existent. But it does work.

Wrapping up

We've covered the simple case, but there are still things that may concern a developer. Is there an easier way to batch up things without counting? Can we centralize handling failures to update? In the next part, we'll look at an alternative approach available in the High-Level REST Client and put that to work.


Read more articles about Compose databases - use our Curated Collections Guide for articles on each database type. If you have any feedback about this or any other Compose article, drop the Compose Articles team a line at articles@compose.com. We're happy to hear from you.

attribution Paul Brennan

Dj Walker-Morgan
Dj Walker-Morgan was Compose's resident Content Curator, and has been both a developer and writer since Apples came in II flavors and Commodores had Pets. Love this article? Head over to Dj Walker-Morgan’s author page to keep reading.

Conquer the Data Layer

Spend your time developing apps, not managing databases.