Asynchronous Joins Using RabbitMQ

Published

Don Omondi, Campus Discounts' founder and CTO, talks about their use case of fetching data from multiple databases and joining them using RabbitMQ RPCs in this new Write Stuff article.

Technology keeps evolving and fast. Early last month, web browsing from mobile devices overtook that via desktop computers. For developers, this means optimizing our mobile users' experience and one of the best ways to do so is by minimizing the number of network requests needed to fetch data. But fetching data in a single payload might not be so simple because the nature of today’s data is unstructured.

It’s very common for apps to use multiple databases to store and query data. In addition, some apps need to query an external API endpoint such as weather or finance before displaying data to the end user. So how can we join all the needed data into one that's on demand? One way is by using RabbitMQ RPCs, and we can do so asynchronously using parallel RPCs.

Remote Procedure Call (RPC)

Remote Procedure Call (RPC) is a pattern where you run a function on a remote computer and wait for the result. RabbitMQ has documentation on how their implementation of RPC works and sample code in different programming languages on their website. Here is the structural overview.

We can use RabbitMQ RPC to send messages to remote servers with instructions to execute queries and respond with the results when done.

A Sample Use Case

Campus Discounts is a social network where students find and recommend discounts posted by businesses near campus. A discount post can have additional content such as comments, likes and recommendations. When constructing a discount post’s UI, we show a few of the latest discounts in tabs below it. Our backend is PHP, a synchronous language by default, hence to achieve this would potentially mean fetching the content one after the other.

<?php  
………
   $discount = $service->queryDiscount($discount_id); // ~ 5ms
   $comments = $service->queryComments($discount_id); // ~ 12ms per comment
   $likes = $service->queryLikes($discount_id); // ~ 12ms per like
   $recommendations = $service->queryRecommendations($discount_id); // ~ 10ms per each
   $response = array(
     ‘discount’ => $discount,
     ‘comments’ => $comments,
     ‘likes’ => $likes,
     ‘recommendations’ => $recommendations
   );
   return $response;
………
?>

Here, PHP will perform 4 synchronous blocking tasks. First to fetch the discount that takes about 5ms, then fetch its comments, likes and recommendations. For a discount with 10 comments, 10 likes, and 10 recommendations, it would take 340ms plus 5ms for the actual discount. Querying 10 discounts would take about 3.4 seconds. We can optimize this process by fetching the data asynchronously using parallel RabbitMQ RPCs.

In the above image, the ‘C’ is the backend server that responds to the client’s API request. It is also the same server that initiates an RPC. The ‘S’ are servers that process remote queries received via RPC.

With parallel RPCs, we construct a single message on the client and queue it. RabbitMQ will then deconstruct it and simultaneously send each part of the message to different awaiting servers. Each server reads the message and performs the query action contained in it and sends back the result. RabbitMQ will wait for all parts to return before constructing them into a single message and send it back to the client.

We now replace our blocking query tasks with a single combined RPC call.

<?php  
………
   $start = time();
   $client = new Thumper\RpcClient($registry->getConnection());
   $client->initClient();
   $client->addRequest($discount_id, 'discount', 'discount_' . $discount_id);
   $client->addRequest($discount_id, 'comments', 'comments_' . $discount_id);
   $client->addRequest($discount_id, 'likes', 'likes_' . $discount_id);
   $client->addRequest($discount_id, 'recommendations', 'recommendations_' .     $discount_id);
   $replies = $client->getReplies(); // This part ensures that our code blocks until we get all responses.
   $discount = $replies['discount_' . $discount_id];
   $comments = $replies['comments_' . $discount_id];
   $likes = $replies['likes_' . $discount_id];
   $recommendations = $replies['recommendations_' . $discount_id];
   $total_time = time() - $start; // ~ 120ms longest time for 10 comments or likes
   $response = array(
     ‘discount’ => $discount,
     ‘comments’ => $comments,
     ‘likes’ => $likes,
     ‘recommendations’ => $recommendations
   );
   return $response;
………
?>

$client->InitClient() initializes the RabbitMQ RPC connection while the $client->addRequest() adds the various parts into the RPC message. $client->getReplies() ensures that no further PHP code is processed beyond that line until RabbitMQ responds to the RPC call.

With RabbitMQ RPC, the total query times are now down to about 120ms, which is the longest time taken to query at least 10 comments or likes. How awesome is that, but wait, there’s still more.

For schema flexibility and horizontal scalability, discount comments and likes are embedded in respective MongoDB documents. However, the actual data containing the users or apps used to create the content is stored in MariaDB (cached by Redis) and would have to be fetched at runtime.

{
 "_id": 1,
 "comments": {
 "0": {
 "_id": 1,
 "message": "The first comment",
 "user_id": 1,
 "written_on": some_date_string,
 "edited_on": some_date_string,
 "status ": 1,
 "via_app_id": 5,
 "total_replies": 0
 "replies": []
 },
 "1": {
 "_id": 1,
 "message": "The second comment",
 "user_id": 1,
 "written_on": some_date_string,
 "edited_on": some_date_string,
 "status ": 0 // deleted,
 "via_app_id": 5,
 "total_replies": 0,
 "replies": []
 },
 "2": {
 "_id": 1,
 "message": "The third comment",
 "user_id": 4,
 "written_on": some_date_string,
 "edited_on": some_date_string,
 "status ": 1,
 "via_app_id": 2,
 "total_replies": 0,
 "replies": []
 }

This MongoDB document contains comment details but only the id of the users and/or apps that made it. To get the actual ‘usernames’ or app names, we have to loop through them querying for each in a very similar manner to our first problem solved by RabbitMQ RPC.

<?php  
………
   $commentsDoc = $mongoService->fetchDoc($discount_id); // ~ 16ms
   $comments = $commentsDoc[‘comments’];
   foreach($comments as $comment){
     $user_id = comment[‘user_id’];
     $comment[‘user’] = $mariaCachableService->queryUser($user_id); // ~ 5ms
     $comment[‘viaapp] = array();
     $app_id = isset($comment[‘via_app_id’]) ? $comment[‘via_app_id’] : null;
     If($app_id){
      $comment[‘viaapp] = $ mariaCachableService ->queryApp($app_id); // ~ 5ms
     }
   }
   return $comments;
………
?>

Thus, fetching 10 discount comments could potentially mean one query for the MongoDB document and 10 queries to get the users and another 10 if they were created by apps. So a maximum of 21 queries.

Parallel RPCs Within Parallel RPCs

One of the best things about RPCs is that you can nest one in another. So, for example, our comments consumer can create two more parallel RPCs to asynchronously fetch the user and app responsible for creating the comment.

<?php  
………
   $start = time();
   $commentsDoc = $mongoService->fetchDoc($discount_id); // ~ 16ms
   $comments = $commentsDoc[‘comments’];
   $client = new Thumper\RpcClient($registry->getConnection());
   $client->initClient();
   foreach($comments as $comment){
     $user_id = comment[‘user_id’];
     $client->addRequest($user_id, 'user', 'user_' . $user_id);
     $comment[‘viaapp‘] = array();
     $app_id = isset($comment[‘via_app_id’]) ? $comment[‘via_app_id’] : null;
     If($app_id){
       $client->addRequest($app_id, 'app', 'app_' . $app_id);
     }
   }
   $replies = $client->getReplies();
   $total_time = time() - $start; // ~ 5ms longest query time for any user or app
   foreach($comments as $comment){
     $user_id = comment[‘user_id’];
     $comment[‘user’] = $replies['user_' . $user_id];
     $comment[‘viaapp‘] = array();
     $app_id = isset($comment[‘via_app_id’]) ? $comment[‘via_app_id’] : null;
     If($app_id){
       $comment[‘viaapp‘] = $replies['app_' . $app_id];
     }
   }
   return $comments;
?>

We must first loop through each comment to grab the user and/or app id and each as a ‘part’ of the RabbitMQ RPC. Once finished we queue it and wait for the response. Thereafter, we have to loop through each comment again and set the now full user and/or app object we got from the RPC call.

With nested parallel RPCs, fetching all the data to display one discount takes approximately 21ms, which is the longest time required to fetch the MongoDB document and at least one comment user.

Sometimes, though, parallelism can be counterproductive. In the implementation above, should a discount_id result in a 404 not found error, we would still have to incur the penalty for sending out parallel RPCs to query for comments, likes, and recommendations that definitely do not exist. To avoid wasting resources, we first query if the discount exists before sending out parallel RPCs.

There are other gotchas as well, like when and how long to maintain an open connection as well as ensuring the correlation of unique ids. At Campus Discounts, we decided to embed the aggregate results of each discount query, together with its comments, likes, and recommendations, into respective Elasticsearch documents. This also gave us the added benefit of deeply nested searches. Nonetheless, the parallel RPC setup described enables us to greatly improve the overall Elasticsearch re-indexing times.

The Main Advantages:

  1. You can process a lot of data from different sources asynchronously and join them together with good response times.
  2. You can split your backend API servers, using the beefy ones for data processing while keeping them more secure and closer to your database servers (RPC Servers). Whereas the lighter ones can be used to respond to requests which you can geo-distribute closer to your users (RPC Clients).
  3. Parallel computing maximizes resource utilization by minimizing idleness.

The Main Disadvantages:

  1. Complex stack to setup, debug, and maintain.
  2. Real benefits only manifest if the number of individual queries and time to process them is large enough.
  3. Parallel data processing can easily lead to unnecessary DB queries that would have been avoided synchronously.

Conclusion

So there you have it, an asynchronous remote task processing system that returns the aggregate results using RabbitMQ. Remember, RabbitMQ RPCs are not limited to DB queries. You can use them to do anything so long as the result can be represented as a string, like a lot of APIs, whether getting a real-time sentiment analysis via Watson or receiving financial information from Yahoo.

Don Omondi is a full-stack developer and the Founder and CTO of [Campus Discounts](https://campus-discounts.com/). Besides the typical coffee and code, he also loves old school music over a game of chess or checkers.

attribution Mike Wilson

This article is licensed with CC-BY-NC-SA 4.0 by Compose.