config.py
#Application configuration File
################################
#Secret key that will be used by Flask for securely signing the session cookie
# and can be used for other security related needs
SECRET_KEY = 'SECRET_KEY'
#######################################
#Minimum Number Of Tasks To Generate
MIN_NBR_TASKS = 1
#Maximum Number Of Tasks To Generate
MAX_NBR_TASKS = 100
#Time to wait when producing tasks
WAIT_TIME = 1
#Webhook endpoint Mapping to the listener
WEBHOOK_RECEIVER_URL = 'http://localhost:5001/consumetasks'
#######################################
#Map to the REDIS Server Port
BROKER_URL = 'redis://localhost:6379'
#######################################
tasks_producer.py
import random
from faker.providers import BaseProvider
from faker import Faker
import config
import time
import requests
import json
import uuid
# Define a TaskProvider
class TaskProvider(BaseProvider):
def task_priority(self):
severity_levels = [
'Low', 'Moderate', 'Major', 'Critical'
]
return severity_levels[random.randint(0, len(severity_levels)-1)]
# Create a Faker instance and seeding to have the same results every time we execute the script
# Return data in English
fakeTasks = Faker('en_US')
# Seed the Faker instance to have the same results every time we run the program
fakeTasks.seed_instance(0)
# Assign the TaskProvider to the Faker instance
fakeTasks.add_provider(TaskProvider)
# Generate A Fake Task
def produce_task(batchid, taskid):
# Message composition
message = {
'batchid': batchid, 'id': taskid, 'owner': fakeTasks.unique.name(), 'priority': fakeTasks.task_priority()
# ,'raised_date':fakeTasks.date_time_this_year()
# ,'description':fakeTasks.text()
}
return message
def send_webhook(msg):
"""
Send a webhook to a specified URL
:param msg: task details
:return:
"""
try:
# Post a webhook message
# default is a function applied to objects that are not serializable = it converts them to str
resp = requests.post(config.WEBHOOK_RECEIVER_URL, data=json.dumps(
msg, sort_keys=True, default=str), headers={'Content-Type': 'application/json'}, timeout=1.0)
# Returns an HTTPError if an error has occurred during the process (used for debugging).
resp.raise_for_status()
except requests.exceptions.HTTPError as err:
#print("An HTTP Error occurred",repr(err))
pass
except requests.exceptions.ConnectionError as err:
#print("An Error Connecting to the API occurred", repr(err))
pass
except requests.exceptions.Timeout as err:
#print("A Timeout Error occurred", repr(err))
pass
except requests.exceptions.RequestException as err:
#print("An Unknown Error occurred", repr(err))
pass
except:
pass
else:
return resp.status_code
# Generate A Bunch Of Fake Tasks
def produce_bunch_tasks():
"""
Generate a Bunch of Fake Tasks
"""
n = random.randint(config.MIN_NBR_TASKS, config.MAX_NBR_TASKS)
batchid = str(uuid.uuid4())
for i in range(n):
msg = produce_task(batchid, i)
resp = send_webhook(msg)
time.sleep(config.WAIT_TIME)
print(i, "out of ", n, " -- Status", resp, " -- Message = ", msg)
yield resp, n, msg
if __name__ == "__main__":
for resp, total, msg in produce_bunch_tasks():
pass
init_producer.py
from flask import Flask
#Create a Flask instance
app = Flask(__name__)
#Load Flask configurations from config.py
app.secret_key = app.config['SECRET_KEY']
app.config.from_object("config")
app_producer.py
#Flask imports
from flask import Response, render_template
from init_producer import app
import tasks_producer
def stream_template(template_name, **context):
app.update_template_context(context)
t = app.jinja_env.get_template(template_name)
rv = t.stream(context)
rv.enable_buffering(5)
return rv
@app.route("/", methods=['GET'])
def index():
return render_template('producer.html')
@app.route('/producetasks', methods=['POST'])
def producetasks():
print("producetasks")
return Response(stream_template('producer.html', data= tasks_producer.produce_bunch_tasks() ))
if __name__ == "__main__":
app.run(host="localhost",port=5000, debug=True)
init_consumer.py
from flask import Flask
#Create a Flask instance
app = Flask(__name__)
#Load Flask configurations from config.py
app.secret_key = app.config['SECRET_KEY']
app.config.from_object("config")
#Setup the Flask SocketIO integration while mapping the Redis Server.
from flask_socketio import SocketIO
socketio = SocketIO(app,logger=True,engineio_logger=True,message_queue=app.config['BROKER_URL'])
app_consumer.py
#Flask imports
from flask import render_template, request,session
from flask_socketio import join_room
from init_consumer import app, socketio
import json
import uuid
#Render the assigned template file
@app.route("/", methods=['GET'])
def index():
return render_template('consumer.html')
# Sending Message through the websocket
def send_message(event, namespace, room, message):
# print("Message = ", message)
socketio.emit(event, message, namespace=namespace, room=room)
# Registers a function to be run before the first request to this instance of the application
# Create a unique session ID and store it within the application configuration file
@app.before_first_request
def initialize_params():
if not hasattr(app.config,'uid'):
sid = str(uuid.uuid4())
app.config['uid'] = sid
print("initialize_params - Session ID stored =", sid)
# Receive the webhooks and emit websocket events
@app.route('/consumetasks', methods=['POST'])
def consumetasks():
if request.method == 'POST':
data = request.json
if data:
print("Received Data = ", data)
roomid = app.config['uid']
var = json.dumps(data)
send_message(event='msg', namespace='/collectHooks', room=roomid, message=var)
return 'OK'
#Execute on connecting
@socketio.on('connect', namespace='/collectHooks')
def socket_connect():
# Display message upon connecting to the namespace
print('Client Connected To NameSpace /collectHooks - ', request.sid)
#Execute on disconnecting
@socketio.on('disconnect', namespace='/collectHooks')
def socket_connect():
# Display message upon disconnecting from the namespace
print('Client disconnected From NameSpace /collectHooks - ', request.sid)
#Execute upon joining a specific room
@socketio.on('join_room', namespace='/collectHooks')
def on_room():
if app.config['uid']:
room = str(app.config['uid'])
# Display message upon joining a room specific to the session previously stored.
print(f"Socket joining room {room}")
join_room(room)
#Execute upon encountering any error related to the websocket
@socketio.on_error_default
def error_handler(e):
# Display message on error.
print(f"socket error: {e}, {str(request.event)}")
#Run using port 5001
if __name__ == "__main__":
socketio.run(app,host='localhost', port=5001,debug=True)
templates/producer.html
<!doctype html>
<html>
<head>
<title>Tasks Producer</title>
<style>
.content {
width: 100%;
}
.container{
max-width: none;
}
</style>
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
</head>
<body class="container">
<div class="content">
<form method='post' id="produceTasksForm" action = "/producetasks">
<button style="height:20%x;width:100%" type="submit" id="produceTasks">Produce Tasks</button>
</form>
</div>
<div class="content">
<div id="Messages" class="content" style="height:400px;width:100%; border:2px solid gray; overflow-y:scroll;"></div>
{% for rsp,total, msg in data: %}
<script>
var rsp = "{{ rsp }}";
var total = "{{ total }}";
var msg = "{{ msg }}";
var lineidx = "{{ loop.index }}";
//If the webhook request succeeds color it in blue else in red.
if (rsp == '200') {
rsp = rsp.fontcolor("blue");
}
else {
rsp = rsp.fontcolor("red");
}
//Add the details of the generated task to the Messages section.
document.getElementById('Messages').innerHTML += "<br>" + lineidx + " out of " + total + " -- "+ rsp + " -- " + msg;
</script>
{% endfor %}
</div>
</body>
</html>
templates/consumer.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Tasks Consumer</title>
<link rel="stylesheet" href="{{url_for('static',filename='css/bootstrap.min.css')}}">
<link rel="stylesheet" href="{{url_for('static',filename='css/Chart.min.css')}}">
</head>
<body>
<div class="content">
<div id="Messages" class="content" style="height:200px;width:100%; border:1px solid gray; overflow-y:scroll;"></div>
</div>
<div class="container">
<div class="row">
<div class="col-12">
<div class="card">
<div class="card-body">
<canvas id="canvas"></canvas>
</div>
</div>
</div>
</div>
</div>
<!-- import the jquery library -->
<script src="{{ url_for('static',filename='js/jquery.min.js') }}"></script>
<!-- import the socket.io library -->
<script src="{{ url_for('static',filename='js/socket.io.js') }}"></script>
<!-- import the bootstrap library -->
<script src="{{ url_for('static',filename='js/bootstrap.min.js') }}"></script>
<!-- import the Chart library -->
<script src="{{ url_for('static',filename='js/Chart.min.js') }}"></script>
<script>
$(document).ready(function(){
const config = {
//Type of the chart - Bar Chart
type: 'bar',
//Data for our chart
data: {
labels: ['Low','Moderate','Major','Critical'],
datasets: [{
label: "Count Of Tasks",
//Setting a color for each bar
backgroundColor: ['green','blue','yellow','red'],
borderColor: 'rgb(255, 99, 132)',
data: [0,0,0,0],
fill: false,
}],
},
//Configuration options
options: {
responsive: true,
title: {
display: true,
text: 'Tasks Priority Matrix'
},
tooltips: {
mode: 'index',
intersect: false,
},
hover: {
mode: 'nearest',
intersect: true
},
scales: {
xAxes: [{
display: true,
scaleLabel: {
display: true,
labelString: 'Priority'
}
}],
yAxes: [{
display: true
,ticks: {
beginAtZero: true
}
,scaleLabel: {
display: true,
labelString: 'Total'
}
}]
}
}
};
const context = document.getElementById('canvas').getContext('2d');
//Creating the bar chart
const lineChart = new Chart(context, config);
//Reserved for websocket manipulation
var namespace='/collectHooks';
var url = 'http://' + document.domain + ':' + location.port + namespace;
var socket = io.connect(url);
//When connecting to the socket join the room
socket.on('connect', function() {
socket.emit('join_room');
});
//When receiving a message
socket.on('msg' , function(data) {
var msg = JSON.parse(data);
var newLine = $('<li>'+ 'Batch ID. = ' + msg.batchid + ' -- Task ID. = ' + msg.id + ' -- Owner = ' + msg.owner + ' -- Priority = ' + msg.priority +'</li>');
newLine.css("color","blue");
$("#Messages").append(newLine);
//Retrieve the index of the priority of the received message
var lindex = config.data.labels.indexOf(msg.priority);
//Increment the value of the priority of the received message
config.data.datasets[0].data[lindex] += 1;
//Update the chart
lineChart.update();
});
});
</script>
</body>
</html>