Skip to content
Snippets Groups Projects
Commit cb820fd4 authored by xkleman's avatar xkleman
Browse files

Use async Service Bus

parent 51939f86
No related branches found
No related tags found
No related merge requests found
......@@ -2,4 +2,6 @@ data
**/__pycache__/**
python.py
deploy*.sh
docker-compose.yml
\ No newline at end of file
docker-compose.yml
.vscode
app/config.py
\ No newline at end of file
......@@ -8,8 +8,6 @@ ENV FLASK_APP=app/flask_app.py
ENV FLASK_RUN_HOST=0.0.0.0
# ENV FLASK_ENV=development
COPY app/requirements.txt /app/requirements.txt
WORKDIR /app
......
import os
settings = {
'host': os.environ.get('ACCOUNT_HOST', 'https://xkleman.documents.azure.com:443/'),
'master_key': os.environ.get('ACCOUNT_KEY', 'IfMilYbrYTdhF1Ye4b5ybqkflDetoDryzYqkErbr0KGChGStKLhDo2nySZoN8EDxAkgXuUfhOPXvACDbkeukYA=='),
'database_id': os.environ.get('COSMOS_DATABASE', 'ToDoList'),
'container_id': os.environ.get('COSMOS_CONTAINER', 'Items'),
'service_bus_connection_string': os.environ.get('AZURE_SERVICE_BUS_CONN_STRING'),
'queue_name': os.environ.get('AZURE_SERVICE_BUS_QUEUE_NAME'),
}
\ No newline at end of file
from flask import Flask, render_template, request
from azure.cosmos import PartitionKey
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as exceptions
from azure.cosmos.partition_key import PartitionKey
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from threading import Thread
import json
import uuid
import config
HOST = config.settings['host']
MASTER_KEY = config.settings['master_key']
DATABASE_ID = config.settings['database_id']
CONTAINER_ID = config.settings['container_id']
SERVICE_BUS_CONNECTION_STRING = config.settings['service_bus_connection_string']
QUEUE_NAME = config.settings['queue_name']
servicebus_client = ServiceBusClient.from_connection_string(SERVICE_BUS_CONNECTION_STRING)
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = \
'postgresql://postgres:postgres@<backend_ip>:5432/my_flask_db'
# If you run postgres in container and app locally
# 'postgresql://postgres:postgres@localhost:5432/my_flask_db'
# Initialize Cosmos DB client
client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}, user_agent="CosmosDBPythonQuickstart", user_agent_overwrite=True)
# setup database for this sample
try:
database = client.create_database(id=DATABASE_ID)
print('Database with id \'{0}\' created'.format(DATABASE_ID))
except exceptions.CosmosResourceExistsError:
database = client.get_database_client(DATABASE_ID)
print('Database with id \'{0}\' was found'.format(DATABASE_ID))
# setup container for this sample
try:
container = database.create_container(id=CONTAINER_ID, partition_key=PartitionKey(path='/partitionKey'))
print('Container with id \'{0}\' created'.format(CONTAINER_ID))
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
except exceptions.CosmosResourceExistsError:
container = database.get_container_client(CONTAINER_ID)
print('Container with id \'{0}\' was found'.format(CONTAINER_ID))
from models import db, Item
class Item:
def __init__(self, id, name, shop, cost):
self.id = id
self.name = name
self.shop = shop
self.cost = cost
db.init_app(app)
def to_dict(self):
return {"id": self.id, "name": self.name, "shop": self.shop, "cost": self.cost}
with app.app_context():
db.create_all()
db.session.commit()
def worker():
# Initialize a receiver client
with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
with receiver:
# Continuously poll the queue for new messages
for message in receiver:
# Decode JSON item data from the message
item_data = json.loads(str(message))
# Create the new item
new_item = Item(**item_data)
@app.route('/')
# Add the new item to the Cosmos DB container
container.create_item(body=new_item.to_dict())
# Complete the message so that it's removed from the queue
receiver.complete_message(message)
# Start the worker in a new thread
worker_thread = Thread(target=worker)
worker_thread.start()
@app.route("/")
def main():
return render_template('index.html')
return render_template("index.html")
@app.route("/add", methods=['POST'])
@app.route("/add", methods=["POST"])
def add_item_from_form():
name = str(request.form['name'])
shop = str(request.form['shop'])
cost = str(request.form['cost'])
new_item = Item(name=name, shop=shop, cost=cost)
db.session.add(new_item)
db.session.commit()
data = Item.query.all()
items = ((item.name, item.shop, item.cost) for item in data)
print(type(items))
name = str(request.form["name"])
shop = str(request.form["shop"])
cost = str(request.form["cost"])
id = str(uuid.uuid4())
new_item = Item(id=id, name=name, shop=shop, cost=cost)
# Create a Service Bus message with the item data
message = ServiceBusMessage(json.dumps(new_item.to_dict()))
# Add the message to the Service Bus queue
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
sender.send_messages(message)
items = [item for item in container.read_all_items()]
items_data = [(item["name"], item["shop"], item["cost"]) for item in items]
headings = ("Name", "Shop", "Cost")
return render_template('index.html', items=items, headings=headings)
return render_template("index.html", items=items_data, headings=headings)
@app.route("/items", methods=['GET'])
@app.route("/items", methods=["GET"])
def get_items():
items = Item.query.all()
return str([str(item) + '\n' for item in items])
items = [item for item in container.read_all_items()]
items_data = [
f"Name: {item['name']}, Shop: {item['shop']}, Cost: {item['cost']}"
for item in items
]
return str(items_data)
@app.route("/delete")
def delete_items():
db.session.query(Item).delete()
return render_template('index.html')
container.delete_all_items()
return render_template("index.html")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Item(db.Model):
__tablename__ = "items"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String)
shop = db.Column(db.String)
cost = db.Column(db.String)
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from flask import current_app
class Item:
container_name = 'items'
def __init__(self, name, shop, cost):
self.id = None
self.name = name
self.shop = shop
self.cost = cost
self.cost = cost;
def save(self):
client = current_app.config['COSMOS_CLIENT']
database_name = current_app.config['COSMOS_DATABASE_NAME']
database = client.get_database_client(database_name)
container = database.get_container_client(self.container_name)
item = {
'name': self.name,
'shop': self.shop,
'cost': self.cost
}
if self.id:
try:
existing_item = container.read_item(item_id=self.id, partition_key=item['name'])
except CosmosResourceNotFoundError:
pass
else:
item['id'] = self.id
item['_etag'] = existing_item['_etag']
container.replace_item(item=item, item_id=self.id, partition_key=item['name'])
return
response = container.create_item(item=item)
self.id = response['id']
@classmethod
def get_all(cls):
client = current_app.config['COSMOS_CLIENT']
database_name = current_app.config['COSMOS_DATABASE_NAME']
database = client.get_database_client(database_name)
container = database.get_container_client(cls.container_name)
items = container.query_items(
query="SELECT * FROM items",
enable_cross_partition_query=True
)
return [Item(**item) for item in items]
def __repr__(self):
return "Name: {}, Shop: {}, Cost: {}\n".format(self.name, self.shop, self.cost)
\ No newline at end of file
@classmethod
def delete_all(cls):
client = current_app.config['COSMOS_CLIENT']
database_name = current_app.config['COSMOS_DATABASE_NAME']
database = client.get_database_client(database_name)
container = database.get_container_client(cls.container_name)
items = cls.get_all()
for item in items:
container.delete_item(item=item.id, partition_key=item.name)
Flask
Flask-SQLAlchemy
psycopg2-binary
\ No newline at end of file
psycopg2-binary
azure-core==1.1.1
azure-cosmos==4.1.0
certifi==2019.11.28
chardet==3.0.4
idna==2.8
requests==2.22.0
six==1.13.0
urllib3==1.25.7
virtualenv==16.7.9
virtualenv-clone==0.5.3
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment