dev-resources.site
for different kinds of informations.
Subscription Model for News and Stocks via MuleSoft and Kafka
Hi guys, Its a pleasure to show you guys this application/model, where I use MuleSoft and Kafka to develop a fairly simple subscription model. Please feel free to try the code out, I will be linking the GitHub links at the bottom of the post.
I would like to enter this project in Hackathon as Everyday is an API day
Prerequisites:
- Anypoint Platform Account (Deploy Mule Applications)
- Apache Kafka Instance (Used Confluent Cloud for Kafka Cluster)
- Slack Account (This is the client for our application)
- The Guardian Open Platform Account (To get News Content Via API)
- Alpha Vantage API (Get Real time Stocks via API)
Idea Phase:
My Idea is to have a model that enable users (in our context slack users), to subscribe their own daily news snippets, stock information, such as latest quote for the stock etc., right from a single app, which in this case will be Slack.
Thought Process
My first thought was to directly integrate the APIs and have the data stored in database and poll it every 3hr so that the news stays updated. Database poll will be heavy for I/O operation. I had done some R&D, and settle with Apache Kafka, since it is Low Latency in I/O operations, High Throughput and we can use it to design an architecture for our application to make it real time.
Architecture:
Configurations
Note: If you want to use the configuration I did, please skip till Slack Configuration
Confluent Cloud Kafka Cluster
- Login/Register a free account in Confluent Cloud here.
- Create a Kafka Cluster.
- Create two topics:
newstock-subscribe
andnewstock-unsubscribe
- Create Kafka cluster keys to connect to kafka via MuleSoft Kafka Connector
- Go to the Tools and Client Config section of cluster page, there open clients tab. Here you can see various language snippets for necessary credentials. Make not of them.
Guardian API
- Visit this page.
- Register for Developer key. It will be sent to your registered email.
Alpha Vantage API
- Visit this page and enter the requested info. Key will be displayed once you submitted the form.
Slack Configuration (Required)
- Create a Slack Workspace and register if you have not
- Visit (here)[https://api.slack.com/apps] to create an app
- Once App is created, click on Slash Commands. Click on Create Command as Shown below
- The below popup will be opened. Here you need to provide the following
- Slash Command: /subscribe
- Request URL: (My Application's Cloudhub urls are located below)
- Short Description: Subscribe to feed
- Hints: [News/Stocks] [Search Term] (This must be same in your config too)
- The above step should be done for /unsubscribe and /list-subscriptions
- Once all the slash commands are created, go to OAuth & Permissions. Add bot scopes to the app
chat:write
as shown below - Last thing you must to is to install the app in workspace. Click on Install App in the same page (It is in side menu). Click on Install to Workspace.
- A popup opens, give a name to app and select the workspace and click OK. Give consent, then take a note of Generated Bot Token.
By this all the configuration are done. I know this is configuration heavy, but reduce UI/UX development time. This only works for few use cases anyways!
Code Walkthrough
The subscription model contains two applications one is a subscriber and another one is a publisher
Real-Time News Subscriber
This API acts as an Experience API to Slack slash commands. The RAML for this API is as follows
#%RAML 1.0
title: Real-Time News Subscriber
version: v1
/subscribe:
post:
description: Subscribe to streams from Slack via slash commands
body:
application/x-www-form-urlencoded:
type: object
responses:
200:
body:
application/json:
example: {"message": "success"}
/subscriptions:
post:
description: List streams a user is subscribed to.
body:
application/x-www-form-urlencoded:
type: object
responses:
200:
body:
application/json:
example: {"message": "success"}
/unsubscribe:
post:
description: Unsubscribe to streams from Slack via slash commands
body:
application/x-www-form-urlencoded:
type: object
responses:
200:
body:
application/json:
example: {"message": "success"}
Essentially here we have three endpoints, each listening to one Slack slash command.
- /subscribe -
/subscribe
slash command - /subscriptions -
/list-subscriptions
- /unsubscribe -
/unsubscribe
slash command
Mule Flows
- All the three endpoints are implemented similarly with different operations. These endpoints will capture slash commands.
- From the Slash command's payload, the flow creates a body for Kafka and slack acknowledgement.
Note, Slash commands should be acknowledged by our application in 3 seconds
- When I publish message in the main thread, the slash command ack timeout and slack throws error to user, indicating the slash command failed. To avoid this the payload is published to Kafka topic asynchronously.
Real Time News Publisher:
- In this application, the Kafka topics are listened by a consumer every 10 secs to make sure the data from topics are pulled. This is done by Kafka Message Listener, which pulls one message at a time similar to an event.
- The message received from topic will be routed based on some criteria based on operation, i.e. either subscribe or unsubscribe.
- The app doesn't need a RAML Spec as it is just a Kafka listener and a scheduler.
- If the user is subscribing, the subscription data is saved to ObjectStore, to get the stream the data on demand or by a cron job.
- Similarly if user is unsubscribing, the subscription info will be removed from ObjectStore.
- If the operation is list of subscriptions, the subscription data is fetched and send to slack. ( this operation is real time, as there is no poll needs to be run for it to finish)
- For other two operations, the scheduler will poll for every 5hrs, reads the objectstore for subscriptions, for each subscription, the call to Guardian's Content API or Alpha Vintage's Search API is called based on the stream user is subscribed to.
- Once data is retrieved from any API, a slack message is built and calls slack
chat.postMessage
endpoint and sends a message in slack.
Git URL:
bhavanichandra / Real-Time-News-Subscription-Model
Integration project to stream news and stock feeds from web to Slack via MuleSoft and Kafka. This project is for MuleSoft Hackathon 2020
Application Demo
- Real-Time News Scheduler Cloudhub Base URL: http://real-time-news-subscriber.us-e2.cloudhub.io/api/
- Real Time News Publisher Cloudhub URL: http://real-time-news-producer.us-e2.cloudhub.io/triggerstreams
- Configure the Slash commands with the url in step 1
- Once configuration is finished. Go to the slack channel and hit the command in the chat as follows:
- Click Enter, the app registers and sends you the notification/acknowledgement as shown below
- The App is configured to stream the data every 5 hours. I have added a manual trigger which an be used in this case, to manually stream data. The url is http://real-time-news-producer.us-e2.cloudhub.io/triggerstreams
- Now the data is pushed to slack to APP's DM a follows
Update: Few updates to application
- Added request validation for slash commands.
- For anyone who doesn't want to configure all the things above, you can just configure Slack and use the urls I provided above. Only change you would need is while calling the manual trigger provide the slack token as query param, app will use this if you specify otherwise it will use the one I had in properties.
The url is: http://real-time-news-producer.us-e2.cloudhub.io/triggerstreams?slackToken=SLACK_TOKEN
Video Demo
About the author
I'm an Integration Developer at Standav Labs Private Limited
Follow me at:
Email: [email protected] / [email protected]
Linkedin: https://www.linkedin.com/in/v-a-s-r-s-bhavani-chandra-vajapeyayajula-4b962691/
Download JAR files here
https://drive.google.com/drive/folders/1CVh5i3v5tVab6Hgmv3euPhBD2aJ2ft6-?usp=sharing
Featured ones: