My last post talked about the importance of event-driven architecture and it endorsed having a hybrid architecture consisting of both RESTful and streaming APIs. In this post, I would like to walk you through a simple demo that shows benefit of using a broker, such as Solace PubSub+, that can handle multiple open protocols and APIs.
Besides allowing microservices to publish and consume events in real-time, a major benefit of using an event broker is integration. A typical enterprise has all kinds of in-house and vendor products with different APIs and it can be extremely painful to get these applications to talk to each other.
A modern event broker must support multiple open APIs and protocols without having to install new plug-ins or manage proxies. For example, Solace’s PubSub+ event broker supports open protocols like AMQP, MQTT, REST and WebSockets and APIs in multiple languages which means you can have one application publishing data via REST and downstream applications consuming that data via AMQP and/or WebSockets. And if tomorrow, you onboard a new vendor application that supports MQTT only then you can easily use MQTT to stream data to the event broker.
Hybrid Architecture
In the previous post, we looked at our hybrid architecture where we are publishing data to the event broker via REST and the broker is responsible for translating that event to the protocol desired by the downstream services.
In our demo today, we will have the following components:
- Solace PubSub+ event broker
- Publishers
- REST publisher – simple REST POST command(s) via cURL
- Java publisher – using streaming API
- Consumers
- REST WebHooks – an endpoint will be invoked every time a message is received/enqueued.
- Java consumer – using MQTT protocol
Here is what it will look like:
As you can see in the diagram, we have some objects created in the event broker as well. There are two types of delivery modes supported by PubSub+: direct and guaranteed. In direct messaging, there is no persistence. It is popularly used for high-throughput and low latency usecases where you can afford some data loss. For critical data, where you want zero message loss, you need guaranteed messaging.
In this example, we will use guaranteed messaging and apply persistence via a queue. Our two publishers will publish messages to a well defined topic with multiple levels (i.e. prices/java/stocks/{ticker}
). This allows our subscribers to consume the messages via topic subscriptions mapped to queues. For example, we have created a queue called queue_java_consumer
for our java consumer and added a topic subscription: prices/>
. This will allow messages published by both REST and Java publishers to be enqueued in this queue.
Similarly, we have created a separate queue for our REST consumer called queue_rest_consumer
with a different topic subscription prices/rest/>
. This means only messages published by the REST publisher will be enqueued in this queue.
Queues
Before we begin setting up our publishers and consumers, let’s create the queues that will hold the messages based on topic subscriptions mapped to them.
We will create two queues:
queue_java_consumer
with topicprices/>
mapped to itqueue_rest_consumer
with topicprices/rest/>
mapped to it
You can create a queue by going to Queues tab on PubSub+ web UI and clicking on + Queue. Once the queue has been created, click on it and add the appropriate subscription.
Publishers
REST publisher
We can simply publish messages to the broker via cURL commands. You will need to execute a POST
command against a URL which will contain the host and port of the broker and REST service running on the broker. It will also contain the topic address you want to publish the message to. For example, if I want to publish a message to a local broker running on my laptop to the topic: prices/rest/stocks/aapl
, here is the command I will run:
curl -X POST http://127.0.0.1:9000/prices/rest/stocks/aapl --header "Content-Type: application/json" -d '{"name":"aapl", "price":"130"}'
The URL that’s used contains the host, port, and topic address in this syntax: http(s)://<host>:<port>/<topic-address>
You can find more information about REST publishers here.
Java publisher
Depending on the protocol you want to use (SMF, MQTT, AMQP), you can use the corresponding API. Solace has a command-line utility tool for demos and tests, called sdkperf, freely available for download here. It comes in different flavors based on the protocol you want to use. For our usecase, I am going to use sdkperf-jcsmp that uses Solace’s SMF portocol. Here is a guide on how to use sdkperf.
Here is the command I will run to start our Java publisher which will connect to our local broker and publish 10,000 messages at the rate of 1 message per second to the topic: prices/java/stocks/aapl
.
> bash sdkperf_java.sh -ptl="prices/java/stocks/aapl" -mt=persistent -mn=10000 -mr=1 -msa=100 -cip=localhost:55555
Consumers
REST consumer
Our REST consumer will be a webservice endpoint which is invoked every time our queue receives a message. We will first need to have a webservice which accepts POST
commands.
Webservice
Let’s spin up a webservice which will expose a REST endpoint. This endpoint will just echo the arguments once invoked. Here is sample python code to spin up the webservice:
from flask import Flask, json, request
from flask import *
companies = [{"name": "ibm", "price": 134}, {"name": "msft", "price": 255}]
api = Flask(__name__)
@api.route('/', methods=['GET'])
def get_companies():
return json.dumps(companies)
@api.route('/', methods=['POST'])
def post_companies():
name = request.args.get("name")
price = request.args.get("price")
print(json.dumps(request.json))
return json.dumps(request.json), 201
if __name__ == '__main__':
api.run(host='0.0.0.0')
I ran the script on an EC2 instance and issued a GET
request against post 5000
and here is the output:
[{"name": "ibm", "price": 134}, {"name": "msft", "price": 255}]
This means our webservice is working. Let’s issue a POST
command:
$ curl -X POST -H "Content-Type: application/json" -d '{"name":"nvda","price":"609"}' http://ec2-34-201-735.compute-1.amazonaws.com:5000
{"name": "nvda", "price": "609"}
Great, our POST
command works as well.
REST consumer, queue binding, and RDP
As you can tell from the architecture diagram shown earlier, there is something different about our REST consumer. And the reason is we need to create some additional objects on the broker to setup our REST consumer. These objects are: REST consumer
, queue binding
, and REST delivery point (RDP)
.
A REST consumer with Solace is not just a microservice that polls the broker with GET
commands. That wouldn’t be appropriate given that Solace PubSub+ is all about event-driven architecture. Instead, PubSub+ utilizes REST WebHooks via POST
commands so that the updates are pushed in real-time.
When the publisher publishes a message to a topic, depending on the topic subscriptions mapped to queues, they will be routed to the appropriate queues. For a REST endpoint to be invoked, we need to create a REST delivery endpoint (RDP) on the broker. The RDP will consist of a REST consumer which will contain information about the HTTP endpoint to invoke among other things and a queue binding which will bind a queue to the consumer. This queue binding will ensure that when a message is enqueued in the queue, the broker will invoke the endpoint with the correct method (POST
or PUT
).
Creating an RDP
To create a REST delivery endpoint on the broker, go to the PubSub+ web UI, click on the message VPN (i.e. default) and click on the Client Connections tab. On the following page, click on REST and then, click on + Rest Delivery Point.
Give your RDP a name, enable it and click on apply.
Creating a REST consumer
Now that we have an RDP, click on it and then, click on the REST Consumers tab. Then, click on + REST Consumer and give it a name. On the following page, enable the consumer and add host/port of the endpoint you wish to invoke when a message is received. In our case, that would be http://ec2-34-201-77-35.compute-1.amazonaws.com:5000/
.
Select the appropriate method to invoke. In our case, it is POST
. There are a lot of other settings you can configure but for now, we will leave them as default.
Creating a queue binding
Go to Client Connections > Queue Bindings and click on + Queue Binding. Select the queue_rest_consumer
queue and click on Create.
Since our specific endpoint is /
, set Post Request Target
to that.
All the necessary objects have now been created on the broker and we are ready to publish our first message via REST.
Java Consumer
We will be using sdkperf-jcsmp to spin up a Java consumer quickly via this command:
> bash sdkperf_java.sh -cip=localhost:55555 -sql=queue_java_consumer -md -pe
It will connect to our broker and bind to the queue_java_consumer
queue we had created earlier.
Running the Demo
Phew, everything is set and we are ready to run the demo. Best practice is to run the consumers first. Our REST consumer is already running so let’s run the Java consumer with the command shown above.
Then, we will start the Java publisher which will start publishing to our topic prices/java/stocks/aapl
.
As soon as you run the Java publisher, you will notice that your Java consumer will start picking up the messages.
We can also see that the messages are making it to our queue: queue_java_consumer
Now, let’s publish a message via REST using cURL.
As you can see, we immediately see the output on the webservice logs. This means that the message was published to the broker on the topic: prices/rest/stocks/aapl
and was enqueued in the queue_rest_consumer
queue. As soon as the message was enqueued, the configured endpoint was invoked which resulted in the webservice logging that output.
Now, to make things interesting, I can change the topic that the Java publisher is publishing to so that those messages make it to the webservice as well. However, because I am not setting a JSON payload for my sdkperf Java publisher, the logged output might not be pretty. Let’s see what happens!
I will use the same Java publisher but change the topic to: prices/rest/stocks/aapl
As you can see in the above screenshot, my messages published by Java publisher are not only making it to the Java consumer (as before) but also to the webservice!
Wrap-up
As you saw in the demo above, our architecture had 2 publishers and 2 consumers. They were using REST and streaming APIs. You can get more creative as well and add publishers and consumers that use MQTT or AMQP.
The beauty of using an event broker is that it allows you to decouple your microservices. You can easily modify your architecture tomorrow by adding new microservices or removing old ones and it would have no impact on downstream consumers as long as they get the messages they are interested in consuming.
As you evolve your architecture from a batch-driven monolith to an event-driven architecture consisting of microservices, remember that you might still need RESTful services (at least in the intermediate phase) and that means you need to pick a broker that supports multiple protocols.