settings.py
# URL for our broker used for connecting to the Kafka cluster
KAFKA_BROKER_URL = "localhost:9092"
# name of the topic hosting the transactions to be processed and requiring processing
TRANSACTIONS_TOPIC = "queuing.transactions"
# these 2 variables will control the amount of transactions automatically generated
TRANSACTIONS_PER_SECOND = float("2.0")
SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND
# name of the topic hosting the legitimate transactions
LEGIT_TOPIC = "queuing.legit"
# name of the topic hosting the suspicious transactions
FRAUD_TOPIC = "queuing.fraud"
transactions.py
from random import choices, randint
from string import ascii_letters, digits
account_chars: str = digits + ascii_letters
def _random_account_id() -> str:
"""Return a random account number made of 12 characters"""
return "".join(choices(account_chars,k=12))
def _random_amount() -> float:
"""Return a random amount between 1.00 and 1000.00"""
return randint(100,1000000)/100
def create_random_transaction() -> dict:
"""Create a fake randomised transaction."""
return {
"source":_random_account_id()
,"target":_random_account_id()
,"amount":_random_amount()
,"currency":"EUR"
}
producer.py
import os
import json
from time import sleep
from kafka import KafkaProducer
# import initialization parameters
from settings import *
from transactions import create_random_transaction
if __name__ == "__main__":
producer = KafkaProducer(bootstrap_servers = KAFKA_BROKER_URL
#Encode all values as JSON
,value_serializer = lambda value: json.dumps(value).encode()
,)
while True:
transaction: dict = create_random_transaction()
producer.send(TRANSACTIONS_TOPIC, value= transaction)
print(transaction) #DEBUG
sleep(SLEEP_TIME)
detector.py
import os
import json
from kafka import KafkaConsumer, KafkaProducer
from settings import *
def is_suspicious(transaction: dict) -> bool:
"""Simple condition to determine whether a transaction is suspicious."""
return transaction["amount"] >= 900
if __name__ == "__main__":
consumer = KafkaConsumer(
TRANSACTIONS_TOPIC
,bootstrap_servers=KAFKA_BROKER_URL
,value_deserializer = lambda value: json.loads(value)
,
)
for message in consumer:
transaction: dict = message.value
topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
print(topic,transaction) #DEBUG
appdetector.py
from flask import Flask, Response, stream_with_context, render_template, json, url_for
from kafka import KafkaConsumer
from settings import *
# create the flask object app
app = Flask(__name__)
def stream_template(template_name, **context):
print('template name =',template_name)
app.update_template_context(context)
t = app.jinja_env.get_template(template_name)
rv = t.stream(context)
rv.enable_buffering(5)
return rv
def is_suspicious(transaction: dict) -> bool:
"""Determine whether a transaction is suspicious."""
return transaction["amount"] >= 900
# this router will render the template named index.html and will pass the following parameters to it:
# title and Kafka stream
@app.route('/')
def index():
def g():
consumer = KafkaConsumer(
TRANSACTIONS_TOPIC
, bootstrap_servers=KAFKA_BROKER_URL
, value_deserializer=lambda value: json.loads(value)
,
)
for message in consumer:
transaction: dict = message.value
topic = FRAUD_TOPIC if is_suspicious(transaction) else LEGIT_TOPIC
print(topic, transaction) # DEBUG
yield topic, transaction
return Response(stream_template('index.html', title='Fraud Detector / Kafka',data=g()))
if __name__ == "__main__":
app.run(host="localhost" , debug=True)
templates/index.html
<!doctype html>
<title> Send Javascript with template demo </title>
<html>
<head>
</head>
<body>
<div class="container">
<h1>{{title}}</h1>
</div>
<div id="data"></div>
{% for topic, transaction in data: %}
<script>
var topic = "{{ topic }}";
var transaction = "{{ transaction }}";
if (topic.search("fraud") > 0) {
topic = topic.fontcolor("red")
} else {
topic = topic.fontcolor("green")
}
document.getElementById('data').innerHTML += "<br>" + topic + " " + transaction;
</script>
{% endfor %}
</body>
</html>