How Python Flask uses webhooks: Learn how to create streaming applications with real-time charts by using webhooks with the help of Flask, Redis, SocketIO, and other libraries in Python, including some examples of webhook usage.
Introduction to the tutorial on how to use webhooks
Webhooks can be thought of as an API that is driven by events rather than requests. A webhook is a service that allows one program to send data to another program as soon as a specific event occurs, rather than one application making a request to another application to receive a response.
Webhooks are sometimes referred to as reverse APIs because the communication is initiated by the application that sent the data, not the one that receives it. As web services become more interconnected, webhooks are looking at more actions as a lightweight solution for enabling real-time notifications and data updates without having to develop a full API.
Webhooks often act as messengers for smaller data. They help send messages, alerts, notifications, and real-time information from server-side applications to client applications.
For example, let’s say you want your app to be notified when a tweet that mentions an account and contains a specific hashtag is published. Unlike your app constantly asking Twitter for new posts that meet these criteria, it makes more sense for Twitter to only send notifications to your app when such an event occurs.
That’s the purpose of webhooks, without having to request data repeatedly (polling mechanism), the receiving application can sit down and get what it needs without having to send repeated requests to another system.
Webhooks can open up a lot of possibilities:
- You can use webhooks to connect your payment gateway with your email marketing software to send emails to users when a payment bounces.
- You can use webhooks to sync customer data across other applications. For example, if a user changes his email address, you can make sure that the change is also reflected in your CRM.
- You can also use webhooks to send information about events to an external database or data warehouse, such as Amazon’s Redshift or Google Big Query, for further analysis.
Range
In this tutorial, we’ll lay the groundwork for a webhooks-based streaming app with multiple components:
- A webhooks generator that simulates an internal or external service emitting tasks to a pre-configured webhook endpoint.
- Webhook listeners that receive notification messages for these events/tasks. Once received, these tickets are presented and converted into bar charts to generate valuable insights. Charts reduce the complexity of the data and make it easier for any user to understand.
We’ll be using several components such as Redis, Flask, SocketIO, and ChartJS to develop beautiful visualization tools for these components.
How Python Flask uses webhooks: processing flowcharts
Prerequisite
Depending on our requirements, the following components come into play:
- Redis is an open-source, high-level key-value store that is a suitable solution for building high-performance, scalable web applications. Redis has three main characteristics that set it apart:
- Redis keeps its database entirely in memory and uses only disk for persistence.
- Compared to many other key-value data stores, Redis has a relatively rich set of data types.
- Redis can replicate data to any number of slaves. Installing Redis is outside the scope of this tutorial, but you can check out this tutorial to install it on Windows.
- Socket.IO is a JavaScript library for real-time web applications. It enables real-time, bi-directional communication between web clients and servers. It has two parts: a client-side library that runs in the browser and a server-side library.
- Faker is a Python package that generates fake data for you. Whether you need to bootstrap a database, create beautiful XML documents, populate persistence to stress test it, or anonymize data fetched from a production service, Fake is the right choice for you.
- ChartJS is an open-source Javascript library that allows you to draw different types of charts using HTML5 canvas elements. HTML5 elements provide a simple yet powerful way to draw graphics using Javascript. The library supports 8 different types of graphics: Line, Bar, Donuts, Pie, Radar, Polar Regions, Bubbles, and Scatter.
- Flask is a miniature web framework written in Python.
If this tutorial piqued your interest and made you want to dive into the code right away, you can check out this repository to see the code used in this article.
Related: Using Celery’s asynchronous tasks in Python.
Webhook Usage Tutorial: Settings
Setting up a package is very simple and straightforward. Of course, you’ll need to install Python 3 on your system, and it’s highly recommended to set up a virtual environment where we’ll install the required libraries:
$ pip install Faker==8.2.0 Flask==1.1.2 Flask-SocketIO==5.0.1 redis==3.5.3 requests==2.25.1
By the end of this tutorial, our folder structure will look like this:
Let’s start writing the actual code. First, let’s define the configuration parameters for our application:config.py
#Application configuration File
################################
#Secret key that will be used by Flask for securely signing the session cookie
# and can be used for other security related needs
SECRET_KEY = 'SECRET_KEY'
#######################################
#Minimum Number Of Tasks To Generate
MIN_NBR_TASKS = 1
#Maximum Number Of Tasks To Generate
MAX_NBR_TASKS = 100
#Time to wait when producing tasks
WAIT_TIME = 1
#Webhook endpoint Mapping to the listener
WEBHOOK_RECEIVER_URL = 'http://localhost:5001/consumetasks'
#######################################
#Map to the REDIS Server Port
BROKER_URL = 'redis://localhost:6379'
#######################################
Next, create an initialization file for our task and webhooks producer:init_producer.py
# init_producer.py
from flask import Flask
#Create a Flask instance
app = Flask(__name__)
#Load Flask configurations from config.py
app.secret_key = app.config['SECRET_KEY']
app.config.from_object("config")
Now let’s write the code needed to generate the task using the Faker module:
# tasks_producer.py
import random
from faker.providers import BaseProvider
from faker import Faker
import config
import time
import requests
import json
import uuid
# Define a TaskProvider
class TaskProvider(BaseProvider):
def task_priority(self):
severity_levels = [
'Low', 'Moderate', 'Major', 'Critical'
]
return severity_levels[random.randint(0, len(severity_levels)-1)]
# Create a Faker instance and seeding to have the same results every time we execute the script
# Return data in English
fakeTasks = Faker('en_US')
# Seed the Faker instance to have the same results every time we run the program
fakeTasks.seed_instance(0)
# Assign the TaskProvider to the Faker instance
fakeTasks.add_provider(TaskProvider)
# Generate A Fake Task
def produce_task(batchid, taskid):
# Message composition
message = {
'batchid': batchid, 'id': taskid, 'owner': fakeTasks.unique.name(), 'priority': fakeTasks.task_priority()
# ,'raised_date':fakeTasks.date_time_this_year()
# ,'description':fakeTasks.text()
}
return message
def send_webhook(msg):
"""
Send a webhook to a specified URL
:param msg: task details
:return:
"""
try:
# Post a webhook message
# default is a function applied to objects that are not serializable = it converts them to str
resp = requests.post(config.WEBHOOK_RECEIVER_URL, data=json.dumps(
msg, sort_keys=True, default=str), headers={'Content-Type': 'application/json'}, timeout=1.0)
# Returns an HTTPError if an error has occurred during the process (used for debugging).
resp.raise_for_status()
except requests.exceptions.HTTPError as err:
#print("An HTTP Error occurred",repr(err))
pass
except requests.exceptions.ConnectionError as err:
#print("An Error Connecting to the API occurred", repr(err))
pass
except requests.exceptions.Timeout as err:
#print("A Timeout Error occurred", repr(err))
pass
except requests.exceptions.RequestException as err:
#print("An Unknown Error occurred", repr(err))
pass
except:
pass
else:
return resp.status_code
# Generate A Bunch Of Fake Tasks
def produce_bunch_tasks():
"""
Generate a Bunch of Fake Tasks
"""
n = random.randint(config.MIN_NBR_TASKS, config.MAX_NBR_TASKS)
batchid = str(uuid.uuid4())
for i in range(n):
msg = produce_task(batchid, i)
resp = send_webhook(msg)
time.sleep(config.WAIT_TIME)
print(i, "out of ", n, " -- Status", resp, " -- Message = ", msg)
yield resp, n, msg
if __name__ == "__main__":
for resp, total, msg in produce_bunch_tasks():
pass
How does Python Flask use webhooks? The above code leverages the Faker module to create a fictitious stream of random tasks and sends a webhook for each generated task to the endpoint we defined earlier in the config file.WEBHOOK_RECEIVER_URL
config.py
The number of tasks generated in each batch will be a random number controlled by the threshold and defined in .MIN_NBR_TASKS
MAX_NBR_TASKS
config.py
The webhook JSON message consists of the following attributes: , , , and .batchid
taskid
owner
priority
Each batch of tasks generated will be identified by a unique reference named .batchid
Task priority will be limited to pre-selected options: Low, Medium, High, and Severe.
Webhook usage example: The main purpose of the above code is a function, which is a generator that produces the following:produce_bunch_tasks()
- The status of the emitted webhook.
- The total number of tasks generated.
- The resulting webhook message.
Before digging any further, let’s test our program:tasks_producer.py
$ python tasks_producer.py
You should see an output similar to the following:
Now let’s build a Flask application that simulates a service generation task:
#app_producer.py
from flask import Response, render_template
from init_producer import app
import tasks_producer
def stream_template(template_name, **context):
app.update_template_context(context)
t = app.jinja_env.get_template(template_name)
rv = t.stream(context)
rv.enable_buffering(5)
return rv
@app.route("/", methods=['GET'])
def index():
return render_template('producer.html')
@app.route('/producetasks', methods=['POST'])
def producetasks():
print("producetasks")
return Response(stream_template('producer.html', data= tasks_producer.produce_bunch_tasks() ))
if __name__ == "__main__":
app.run(host="localhost",port=5000, debug=True)
In this FLASK application, we define two main routes:
"/"
: Rendering Template Web Pages (producer.html)"/producetasks"
: Invoke the function and stream the resulting task to the Flask application.produce_bunch_tasks()
The server sends a Server Send Event (SSE), which is a server push mechanism where clients are notified whenever a new event occurs on the server. Next, we’ll define the template file:producer.html
<!doctype html>
<html>
<head>
<title>Tasks Producer</title>
<style>
.content {
width: 100%;
}
.container{
max-width: none;
}
</style>
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
</head>
<body class="container">
<div class="content">
<form method='post' id="produceTasksForm" action = "/producetasks">
<button style="height:20%x;width:100%" type="submit" id="produceTasks">Produce Tasks</button>
</form>
</div>
<div class="content">
<div id="Messages" class="content" style="height:400px;width:100%; border:2px solid gray; overflow-y:scroll;"></div>
{% for rsp,total, msg in data: %}
<script>
var rsp = "{{ rsp }}";
var total = "{{ total }}";
var msg = "{{ msg }}";
var lineidx = "{{ loop.index }}";
//If the webhook request succeeds color it in blue else in red.
if (rsp == '200') {
rsp = rsp.fontcolor("blue");
}
else {
rsp = rsp.fontcolor("red");
}
//Add the details of the generated task to the Messages section.
document.getElementById('Messages').innerHTML += "<br>" + lineidx + " out of " + total + " -- "+ rsp + " -- " + msg;
</script>
{% endfor %}
</div>
</body>
</html>
Three variables are passed to this template file:
total
: indicates the total number of tasks generated.status
: Indicates the status of the scheduled webhook.msg
: webhook JSON message.
The template file contains a Javascript that traverses the received streams and displays them when a webhook/task is received.
Now that our program is ready, let’s test it out and check the resulting output:
$ python app_producer.py
Access the link to run the Flask instance, press the button Produce Tasks, and you’ll see a continuous stream of random tasks that are automatically generated, as shown in the screen below:http://localhost:5000
You’ll notice that the response status of the scheduled webhook is equal and shown in red, indicating that the destination could not be reached. Later, when we activate the task consumer, you’ll outline that the response status of the assigned webhook is equal to and shown in blue, indicating a successful arrival at the webhook endpoint.None
200
How does Python Flask use webhooks? Now, let’s create the initialization file for our task consumer/handler:
# init_consumer.py
from flask import Flask
#Create a Flask instance
app = Flask(__name__)
#Load Flask configurations from config.py
app.secret_key = app.config['SECRET_KEY']
app.config.from_object("config")
#Setup the Flask SocketIO integration while mapping the Redis Server.
from flask_socketio import SocketIO
socketio = SocketIO(app,logger=True,engineio_logger=True,message_queue=app.config['BROKER_URL'])
Next, let’s build a Flask application to handle assigned webhooks/tasks. The first step in dealing with webhooks is to build a custom endpoint. This endpoint needs to receive the data via a POST request and confirm that it was successfully received:
#app_consumer.py
from flask import render_template, request,session
from flask_socketio import join_room
from init_consumer import app, socketio
import json
import uuid
#Render the assigned template file
@app.route("/", methods=['GET'])
def index():
return render_template('consumer.html')
# Sending Message through the websocket
def send_message(event, namespace, room, message):
# print("Message = ", message)
socketio.emit(event, message, namespace=namespace, room=room)
# Registers a function to be run before the first request to this instance of the application
# Create a unique session ID and store it within the application configuration file
@app.before_first_request
def initialize_params():
if not hasattr(app.config,'uid'):
sid = str(uuid.uuid4())
app.config['uid'] = sid
print("initialize_params - Session ID stored =", sid)
# Receive the webhooks and emit websocket events
@app.route('/consumetasks', methods=['POST'])
def consumetasks():
if request.method == 'POST':
data = request.json
if data:
print("Received Data = ", data)
roomid = app.config['uid']
var = json.dumps(data)
send_message(event='msg', namespace='/collectHooks', room=roomid, message=var)
return 'OK'
#Execute on connecting
@socketio.on('connect', namespace='/collectHooks')
def socket_connect():
# Display message upon connecting to the namespace
print('Client Connected To NameSpace /collectHooks - ', request.sid)
#Execute on disconnecting
@socketio.on('disconnect', namespace='/collectHooks')
def socket_connect():
# Display message upon disconnecting from the namespace
print('Client disconnected From NameSpace /collectHooks - ', request.sid)
#Execute upon joining a specific room
@socketio.on('join_room', namespace='/collectHooks')
def on_room():
if app.config['uid']:
room = str(app.config['uid'])
# Display message upon joining a room specific to the session previously stored.
print(f"Socket joining room {room}")
join_room(room)
#Execute upon encountering any error related to the websocket
@socketio.on_error_default
def error_handler(e):
# Display message on error.
print(f"socket error: {e}, {str(request.event)}")
#Run using port 5001
if __name__ == "__main__":
socketio.run(app,host='localhost', port=5001,debug=True)
Webhook Usage Tutorial: In a nutshell, we did the following:
- We’ve added a function that runs once before the first request to the application and is ignored on subsequent requests. In this function, we create a unique session ID and store it in a configuration file, which will be used to assign an exclusive space to each user when handling web socket communication.
@app.before_first_request
- We define a webhook listener that receives JSON data via a POST request, and once received, it emits a web socket event at the same time.
"/consumetasks"
- In order to effectively manage our connections via the network socker:
- We’ll set the value for the namespace (which is used to separate the server logic with a single shared connection).
/collectHooks
- We will assign a dedicated room to each user session (a room is a subdivision or subchannel of a namespace).
- We’ll set the value for the namespace (which is used to separate the server logic with a single shared connection).
Webhook usage example: Once we’ve done all of this, let’s write the front-end code for our web application, create and copy the following code in a folder:consumer.html
templates
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Tasks Consumer</title>
<link rel="stylesheet" href="{{url_for('static',filename='css/bootstrap.min.css')}}">
<link rel="stylesheet" href="{{url_for('static',filename='css/Chart.min.css')}}">
</head>
<body>
<div class="content">
<div id="Messages" class="content" style="height:200px;width:100%; border:1px solid gray; overflow-y:scroll;"></div>
</div>
<div class="container">
<div class="row">
<div class="col-12">
<div class="card">
<div class="card-body">
<canvas id="canvas"></canvas>
</div>
</div>
</div>
</div>
</div>
<!-- import the jquery library -->
<script src="{{%20url_for('static',filename='js/jquery.min.js')%20}}"></script>
<!-- import the socket.io library -->
<script src="{{%20url_for('static',filename='js/socket.io.js')%20}}"></script>
<!-- import the bootstrap library -->
<script src="{{%20url_for('static',filename='js/bootstrap.min.js')%20}}"></script>
<!-- import the Chart library -->
<script src="{{%20url_for('static',filename='js/Chart.min.js')%20}}"></script>
<script>
$(document).ready(function(){
const config = {
//Type of the chart - Bar Chart
type: 'bar',
//Data for our chart
data: {
labels: ['Low','Moderate','Major','Critical'],
datasets: [{
label: "Count Of Tasks",
//Setting a color for each bar
backgroundColor: ['green','blue','yellow','red'],
borderColor: 'rgb(255, 99, 132)',
data: [0,0,0,0],
fill: false,
}],
},
//Configuration options
options: {
responsive: true,
title: {
display: true,
text: 'Tasks Priority Matrix'
},
tooltips: {
mode: 'index',
intersect: false,
},
hover: {
mode: 'nearest',
intersect: true
},
scales: {
xAxes: [{
display: true,
scaleLabel: {
display: true,
labelString: 'Priority'
}
}],
yAxes: [{
display: true
,ticks: {
beginAtZero: true
}
,scaleLabel: {
display: true,
labelString: 'Total'
}
}]
}
}
};
const context = document.getElementById('canvas').getContext('2d');
//Creating the bar chart
const lineChart = new Chart(context, config);
//Reserved for websocket manipulation
var namespace='/collectHooks';
var url = 'http://' + document.domain + ':' + location.port + namespace;
var socket = io.connect(url);
//When connecting to the socket join the room
socket.on('connect', function() {
socket.emit('join_room');
});
//When receiving a message
socket.on('msg' , function(data) {
var msg = JSON.parse(data);
var newLine = $('<li>'+ 'Batch ID. = ' + msg.batchid + ' -- Task ID. = ' + msg.id + ' -- Owner = ' + msg.owner + ' -- Priority = ' + msg.priority +'</li>');
newLine.css("color","blue");
$("#Messages").append(newLine);
//Retrieve the index of the priority of the received message
var lindex = config.data.labels.indexOf(msg.priority);
//Increment the value of the priority of the received message
config.data.datasets[0].data[lindex] += 1;
//Update the chart
lineChart.update();
});
});
</script>
</body>
</html>
How does Python Flask use webhooks? The above template includes the following:
- A portion of the message that displays the details of the task or webhook received.
- A bar chart showing the total number of tasks received by each priority in the entire web socket event. The steps to build a chart are as follows:
- Place a canvas element where the diagram is displayed.
- Specify the priority in the label property, which indicates the name of the instance to be compared.
- Initialize a dataset property, which defines an array of objects, each containing the data we want to compare.
- Whenever a new webhook is transferred and received over a web socket, the bar chart is updated synchronously.
Now let’s test our program, follow these steps:
- Open the terminal and run: Copy
app_producer.py
$ python app_producer.py
- Start the Redis server and make sure that the Redis instance is running on TCP port 6479.
- Open another terminal and run: Copy
app_consumer.py
$ python app_consumer.py
- Open your browser and access the link to visualize the task producer:
http://localhost:5000
Click the Generate Task button, a batch of tasks will be automatically generated and gradually displayed on the screen, as shown below:
Webhook usage example: Now open another tab in the browser and visit to visualize the task consumer, the task will gradually appear in the message section and the bar chart will automatically update whenever the webhook is received:http://localhost:5001
When you hover over any chart bar, a tooltip showing the total number of tasks is displayed:
How Python Flask uses webhooks
Summary of the webhook usage tutorial
Webhooks are an important part of the web, and they are becoming more and more popular. They allow your applications to exchange data instantly and seamlessly.
While webhooks are similar to APIs, they all play different roles, each with their own unique use case. Hopefully, this article has expanded your understanding and remembered that the key to getting the most out of webhooks is knowing when they’re the right choice for your application.