Discover Space on Twitter with Elasticsearch and Redis

Published

Monitoring, filtering, and analyzing streaming data can be tedious without the right tools. In the second part of Discovering Space on Twitter with Elasticsearch, we'll add on Compose Redis to keep track of percolator query hits and unique users who tweet using sorted sets and HyperLogLog.

In the previous article, we used Twitter and the hashtag #NASA to monitor what people were tweeting about. Using Elasticsearch's percolator feature, we stored queries containing words that we wanted Elasticsearch to look for in each tweet. = Once a hit was found, it would return the tweet with the query highlighted.

In this part, we'll go a little further to and save the percolated tweets back into Elasticsearch with the username, tweet and the time it was published using Node-RED. We'll then show how to use the Redis node in Node-RED to keep track of the percolated queries in a leaderboard using a sorted set that will count the number of times each percolator query has been hit. We'll also use Redis to count the number of unique users per percolated tweet using HyperLogLog.

First thing's first, let's speed up the flow of tweets coming into the application and do a little modification of the percolator query ...

Modifying and Fetching the Query Results

Where we left off in the previous article was displaying the highlighted portions of the tweets that were retrieved from the Elasticsearch percolator queries. In the application, between the Twitter and function nodes, we set up a delay node to only accept 30 tweets a minute. To analyze more data, let's delete the delay node and connect the Twitter and function nodes together so that we're analyzing a steady stream of tweets as they're published.

Next, open up that function node containing the percolator query and delete the portion where it says highlight. We won't need that. The query will now be:

msg.payload = {  
    "query": {     
        "percolate": {       
            "field": "query",       
            "document_type": "space_tweets",       
            "document": {
                "tweet": msg.payload
            }     
        }   
    }
}
return msg;  

The other change we'll make is within the next function node. That is the node that only returns tweets that are percolated by the indexed queries. We'll have to change some of the code because we are no longer expecting percolated tweets with highlighting. That'll will now look like the following:

if (msg.payload.hits.total > 0) {  
    var message = [];
    msg.payload = {
        username: msg.tweet.user.screen_name,
        tweets: message
    }
    node.send(msg);
}

Now, we'll need to add several pieces of code to this node to start feeding in information to Elasticsearch and Redis. So, let's start with creating objects from the percolated tweets to be saved as newly indexed documents in Elasticsearch.

Saving data to Elasticsearch

Using the function node above, we'll modify the msg.payload object to contain only the user associated with the tweet, the tweet, and the time it was published. Well grab all of this from the msg.tweet object that's included with each tweet and only tweets that have been percolated by the indexed queries will be accepted.

if (msg.payload.hits.total > 0) {  
    msg.payload = {
        username: msg.tweet.user.screen_name,
        tweets: msg.tweet.text,
        time: msg.tweet.created_at
    }
    node.send(msg);
}

Now, let's send msg.payload to Elasticsearch. Find another http response node and drag it onto the Node-RED canvas. The set up of this node is similar to the http response node we added in the previous article that sent an HTTP POST request to Elasticsearch.

However, this time, we'll send an HTTP POST request to the same index but append a type called "nasa_tweets" to the connection URI. We don't need to do any setup for the type since it will be dynamically created for us.

https://username:password@portal2-2.superior-elastic-search-2.compose-2.composedb.com:22222/space_exploration/nasa_tweets  

Connect the function node with the tweet payload to the http response. Add a debug node to the http response node to view the HTTP POST response we'll get back from Elasticsearch. You'll know if the tweet has successfully been stored when you get 201 as the statusCode, and you'll see the document's meta-fields like _type and _id that have all been created to see the "nasa_tweets" type used with an id that's automatically generated.

Using Kibana or the Compose Elasticsearch browser, you can query any tweet that you see in your debug panel to see what has been stored. For example, querying at the following:

GET space_exploration/nasa_tweets/_search  
{
  "query": {
    "match": {
      "username": "Jourmix"
    }
  }
}

We'll get:

{
  ...
    "hits": [
      {
        "_index": "space_exploration",
        "_type": "nasa_tweets",
        "_id": "AV9w0JS-eauxgZ0U3PX0",
        "_score": 1.89712,
        "_source": {
          "username": "Jourmix",
          "tweet": "Dark Matter in a Simulated Universe  #space #nasa https://t.co/gxPyzTgPUt",
          "time": "Tue Oct 31 05:03:57 +0000 2017"
        }
      }
    ]
  }
}

That's all that's needed to save the percolated tweets back into the Elasticsearch.

Adding Redis for Statistics

Using streaming tweets, there are a number of databases that we could choose from to aggregate the data and gain some insights about the number of documents hitting each percolator query and the number of unique users who are tweeting about #NASA.

For this, Redis is a very good choice because it's fast and efficient, especially for time-series data ingestion and processing. Also, it comes with the data structures we'll need like sorted sets and HyperLogLog to keep track of the tally's for queries hit and unique users.

To start using Redis in Node-RED you'll need to add the Redis node called node-red-contrib-redis via the Node-RED management palette. Also, you'll need a Compose Redis database, which we'll assume you've set up. To connect the Redis nodes to your Compose database, all you'll need is the Compose connection string that begins with redis:// provided on the Overview page of your Redis deployment.

Counting Unique Users with HyperLogLog

To count the number of unique users for each tweet that's percolated, we'll use what's called a HyperLogLog. What's a HyperLogLog? In a nutshell, it's an algorithm that approximates the number of unique items in a set.

To set up the Redis HyperLogLog command, drag the Redis command node onto the Node-RED canvas and double-click on it to start configuring it. Once it's opened, you can configure the server, name, topic, and select the Redis command you want to use from a command drop-down menu. To configure the connection, click on the pencil icon next to the Server field.

A new window will open and paste into the Host field your Compose Redis connection string. Then, delete all the text inside the Port, DB, and Password fields because all of that information is included in the connection string.

Once you've added that, click the Add button to add the connection. Back in the first window and in the Command field, find and select "PFADD" from the drop-down menu. "PFADD" is the command that Redis uses to create the HyperLogLog key and to insert data using that key. Then click Done.

The Redis node will be displayed on the canvas, but we need to connect it to a function node that will pass data in the format it requires. To do that, we'll add some code to the same function node we modified to create the msg.payload that's saved in Elasticsearch.

Open that node again and add the following code inside the If statement.

var redisUserCount = {payload:["users", msg.tweet.user.id]};  

Then, underneath the function window, change the Outputs field from "1" to "2" because we're returning not only the msg.payload for Elasticsearch but also the payload array in redisUserCount for Redis.

Redis will only accept an array with the HyperLogLog key as the first element, and the data that needs to be added as the second element. In this example, the key is users and the element is the Twitter ID of the user who published the tweet.

Now, to send both the msg.payload and redisUserCount to each of the two outputs, we'll need to place an array inside the node.send function.

node.send([msg, redisUserCount]);  

Inside the array, the order of each element is important because that's the order of the output. So, in this example, msg will be sent from the first output, while the payload from redisUserCount will be sent from the second.

After the function's been modified, click Done and connect the function node to the Redis command node.

You'll be able to deploy the nodes in Node-RED at this point - we'll hold off doing that because the output of this node will only return a 1 or null if an ID was successfully added or not. But, what we'll now do is get the HyperLogLog count from the users key every time an ID is updated. To do that, drag another function node onto the canvas, open it, and add the following code:

msg.payload = ['users'];  
return msg;  

Connect this function node to the output of the Redis node.

The purpose of this function node is to simply react to the response of the Redis node. Every time an ID is added to the users key, the msg.payload of this function node will set itself to an array containing the users key. The array will be returned in the msg object each time the HyperLogLog users key in Redis is updated. The array inside msg will be used to command Redis to return the HyperLogLog count of the users key.

To get that count, add another Redis command node onto the canvas and configure it using the same Redis server from the previous Redis command node. Then select "PFCOUNT" as the command.

After that, click Done. Add a debug node and connect all of the nodes together then click Deploy.

You'll get an error from Node-RED saying that you need to set up the port, database, and password fields up the Redis nodes. Just ignore the error because all of your database information is already in the Redis connection string URI that we passed into the Host field. Once it's deployed, you should see the users HyperLogLog count updating in real-time :

Creating a Query Leaderboard

The last part of the application will generate a real-time leaderboard of frequently hit percolator queries. To construct a leaderboard, we're going to use Redis's sorted set commands.

To identify whether a query is inside a Redis sorted set key, we'll use the ZSCORE command. Once it's been determined if a query is in a key or not, we'll either add it with a score of 1 to the sorted set with the ZADD command, or if it's been added to the sorted set, we'll increment the score by 1 with the ZINCRBY command. Finally, we'll display the entire leaderboard in descending order by score using the ZREVRANGEBYSCORE command with updates in real-time.

To start off, open the function node that we modified to return Elasticsearch and the Redis data. In that function, we'll need to update the Output field at the bottom of the function window to "3" since we'll now need three outputs: one for Elasticsearch and two for Redis. The code that we'll use to generate the output for the sorted set is:

var hits = msg.payload.hits.hits;  
var leaderboardItems = [];

for (var i = 0; i < hits.length; i++) {  
    var item = hits[i]._source.query.match.tweet;
    var check = ["leaderboard", item];  
    var addIncr = ["leaderboard", 1, item];
    leaderboardItems.push({payload:check, _payload: addIncr});
}

What the code does is look at the hits returned by the Elasticsearch percolator query.

var hits = msg.payload.hits.hits;  

Using the loop, we'll get each query that was returned by the percolator query and store it in a variable called item.

for (var i = 0; i < hits.length; i++) {  
    var item = hits[i]._source.query.match.tweet;

The next variable, check, is used to hold the array that will be consumed by the Redis ZSCORE command to identify whether the query exists in a set. What the format of the array is important because it asks Redis to look inside the "leaderboard" key for a query (i.e. item).

var check = ["leaderboard", item];  

The other variable addIncr is an array that'll be consumed by the Redis commands ZADD and ZINCRBY. These commands either add queries with a score of 1 to the sorted set if they're not present, or increments their score by 1 if they're in the sorted set. Again, the format here is important. The first element is the key, and the second element is the score that's attached to the third element, item, or the query.

var addIncr = ["leaderboard", 1, item];  

The arrays are then pushed into the leaderboardItems array, which is returned in the third output with the properies payload and _payload that include the arrays from the check and addIncr variables. Since the payload property doesn't persist, and we need to determine based on the response from ZSCORE whether we add or increment the score of a query, we'll need to create another property that will persist across node that we called _payload. This contains the array with the same query as the payload, but includes the score that will be added with the query to the set or incremented with an existing query.

var leaderboardItems = [];  
...
leaderboardItems.push({payload:check, _payload: addIncr});  

To set up the ZSCORE command to check if queries exist in the sorted set, drag onto the canvas another Redis command node. Use the same Server settings like the other Redis nodes, and then select "ZSCORE" from the Command field. Click Done when you're finished and connect this Redis node to the third output of the function node containing the code we added:

Now, add another function node onto the canvas. The code in this node will check whether the response from the "ZSCORE" Redis command node returns 1 or null signaling whether the query is in the sorted set or not. Set the function to have two outputs: one that will add increment a query if it's in the sorted set, and the other to add the query with a score if it is not. Then add the following code in the function window:

if (msg.payload !== null) {  
    msg.payload = msg._payload
    return [msg, null];
} else {
    msg.payload = msg._payload
    return [null, msg];
}

Using an If/Else statement, if we don't receive null from Redis, then the set the msg.payload object to msg._payload and return it to the first output. Otherwise, return set the msg.payload object to msg._payload and return it to the second output. Now, connect this node to the output of the Redis node.

Next, drag two Redis command nodes onto the canvas. Use the same Server settings and for one node, select the "ZINCRBY" command, and for the other, select the "ZADD" command. The node that's set to "ZINCRBY", connect it to the first output of the function node, and connect the second output to the other Redis node set to "ZADD".

As the results are added to the sorted set, we'll have to retrieve the updated sorted set scores as they are updated in Redis. To do that, we'll need to add another function node to the canvas and pass in an array. The format of the array is similar to those we've created above. The first element is the "leaderboard" key, the second and third elements are the maximum and minimum scores we want to retrieve, and the "WITHSCORES" option is added so that Redis returns the queries and their scores. Place the following code in the function:

delete msg._payload;  
msg.payload = ["leaderboard", "+inf", 0, "WITHSCORES"];  
return msg;  

What the code does is it deletes the _payload property because we've already used it. Then we create an array using the element format that the command requires and assign that to msg.payload, and finally, we return the msg object for the next node to consume. After that, connect both of the previous Redis nodes to the input of this function.

Lastly, we want to retrieve the leaderboard as it's updating. Place another Redis command node onto the canvas, use the same Server settings a before, and set the command to "ZREVRANGEBYSCORE". Connect this node to the function node, and connect a debug node to this Redis node. The output that we'll see is an array containing a query followed by its score. Every time a tweet is percolated using the query, the score increases and reflected in the new output.

The insights gained from the leaderboard is not just in the numbers. We might use it to determine whether queries like "space", for example, should not be used as a percolator query since #NASA and "space" are frequently used together, which would skew the results of the leaderboard. Or, we might use the leaderboard and the tweet timestamps to find out which of the queries are more likely to be hit on particular days or times.

Summing up

This is a good jump off point for this data filtering application. In the last two articles, we've provided a small example of how you can use the power of Elasticsearch's percolator query to retrieve the data that's important to you before inserting it into your database. We also looked at how you can leverage the power of Redis to monitor how Elasticsearch is percolating data and getting the number of users who are tweeting about #NASA at any time. The possibilities to extend this application are endless, so try it out and see what else you can do with Compose Elasticsearch, Redis, and Node-RED.


Read more articles about Compose databases - use our Curated Collections Guide for articles on each database type. If you have any feedback about this or any other Compose article, drop the Compose Articles team a line at articles@compose.com. We're happy to hear from you.

attribution Pixabay

Abdullah Alger
Abdullah Alger is a former University lecturer who likes to dig into code, show people how to use and abuse technology, talk about GIS, and fish when the conditions are right. Coffee is in his DNA. Love this article? Head over to Abdullah Alger’s author page to keep reading.

Conquer the Data Layer

Spend your time developing apps, not managing databases.