from flask import Flask, render_template, request from azure.cosmos import PartitionKey import azure.cosmos.cosmos_client as cosmos_client import azure.cosmos.exceptions as exceptions from azure.cosmos.partition_key import PartitionKey from azure.servicebus import ServiceBusClient, ServiceBusMessage from threading import Thread import json import uuid import config HOST = config.settings['host'] MASTER_KEY = config.settings['master_key'] DATABASE_ID = config.settings['database_id'] CONTAINER_ID = config.settings['container_id'] SERVICE_BUS_CONNECTION_STRING = config.settings['service_bus_connection_string'] QUEUE_NAME = config.settings['queue_name'] servicebus_client = ServiceBusClient.from_connection_string(SERVICE_BUS_CONNECTION_STRING) app = Flask(__name__) # Initialize Cosmos DB client client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}, user_agent="CosmosDBPythonQuickstart", user_agent_overwrite=True) # setup database for this sample try: database = client.create_database(id=DATABASE_ID) print('Database with id \'{0}\' created'.format(DATABASE_ID)) except exceptions.CosmosResourceExistsError: database = client.get_database_client(DATABASE_ID) print('Database with id \'{0}\' was found'.format(DATABASE_ID)) # setup container for this sample try: container = database.create_container(id=CONTAINER_ID, partition_key=PartitionKey(path='/partitionKey')) print('Container with id \'{0}\' created'.format(CONTAINER_ID)) except exceptions.CosmosResourceExistsError: container = database.get_container_client(CONTAINER_ID) print('Container with id \'{0}\' was found'.format(CONTAINER_ID)) class Item: def __init__(self, id, name, shop, cost): self.id = id self.name = name self.shop = shop self.cost = cost def to_dict(self): return {"id": self.id, "name": self.name, "shop": self.shop, "cost": self.cost} def worker(): # Initialize a receiver client with servicebus_client: receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME) with receiver: # Continuously poll the queue for new messages for message in receiver: # Decode JSON item data from the message item_data = json.loads(str(message)) # Create the new item new_item = Item(**item_data) # Add the new item to the Cosmos DB container container.create_item(body=new_item.to_dict()) # Complete the message so that it's removed from the queue receiver.complete_message(message) # Start the worker in a new thread worker_thread = Thread(target=worker) worker_thread.start() @app.route("/") def main(): return render_template("index.html") @app.route("/add", methods=["POST"]) def add_item_from_form(): name = str(request.form["name"]) shop = str(request.form["shop"]) cost = str(request.form["cost"]) id = str(uuid.uuid4()) new_item = Item(id=id, name=name, shop=shop, cost=cost) # Create a Service Bus message with the item data message = ServiceBusMessage(json.dumps(new_item.to_dict())) # Add the message to the Service Bus queue with servicebus_client: sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) with sender: sender.send_messages(message) items = [item for item in container.read_all_items()] items_data = [(item["name"], item["shop"], item["cost"]) for item in items] headings = ("Name", "Shop", "Cost") return render_template("index.html", items=items_data, headings=headings) @app.route("/items", methods=["GET"]) def get_items(): items = [item for item in container.read_all_items()] items_data = [ f"Name: {item['name']}, Shop: {item['shop']}, Cost: {item['cost']}" for item in items ] return str(items_data) @app.route("/delete") def delete_items(): container.delete_all_items() return render_template("index.html") if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)