Newer
Older
from azure.cosmos import PartitionKey
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as exceptions
from azure.cosmos.partition_key import PartitionKey
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from threading import Thread
import json
import uuid
import config
HOST = config.settings['host']
MASTER_KEY = config.settings['master_key']
DATABASE_ID = config.settings['database_id']
CONTAINER_ID = config.settings['container_id']
SERVICE_BUS_CONNECTION_STRING = config.settings['service_bus_connection_string']
QUEUE_NAME = config.settings['queue_name']
servicebus_client = ServiceBusClient.from_connection_string(SERVICE_BUS_CONNECTION_STRING)
# Initialize Cosmos DB client
client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}, user_agent="CosmosDBPythonQuickstart", user_agent_overwrite=True)
# setup database for this sample
try:
database = client.create_database(id=DATABASE_ID)
print('Database with id \'{0}\' created'.format(DATABASE_ID))
except exceptions.CosmosResourceExistsError:
database = client.get_database_client(DATABASE_ID)
print('Database with id \'{0}\' was found'.format(DATABASE_ID))
# setup container for this sample
try:
container = database.create_container(id=CONTAINER_ID, partition_key=PartitionKey(path='/partitionKey'))
print('Container with id \'{0}\' created'.format(CONTAINER_ID))
except exceptions.CosmosResourceExistsError:
container = database.get_container_client(CONTAINER_ID)
print('Container with id \'{0}\' was found'.format(CONTAINER_ID))
class Item:
def __init__(self, id, name, shop, cost):
self.id = id
self.name = name
self.shop = shop
self.cost = cost
def to_dict(self):
return {"id": self.id, "name": self.name, "shop": self.shop, "cost": self.cost}
def worker():
# Initialize a receiver client
with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
with receiver:
# Continuously poll the queue for new messages
for message in receiver:
# Decode JSON item data from the message
item_data = json.loads(str(message))
# Add the new item to the Cosmos DB container
container.create_item(body=new_item.to_dict())
# Complete the message so that it's removed from the queue
receiver.complete_message(message)
# Start the worker in a new thread
worker_thread = Thread(target=worker)
worker_thread.start()
@app.route("/")
name = str(request.form["name"])
shop = str(request.form["shop"])
cost = str(request.form["cost"])
id = str(uuid.uuid4())
new_item = Item(id=id, name=name, shop=shop, cost=cost)
# Create a Service Bus message with the item data
message = ServiceBusMessage(json.dumps(new_item.to_dict()))
# Add the message to the Service Bus queue
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
sender.send_messages(message)
items = [item for item in container.read_all_items()]
items_data = [(item["name"], item["shop"], item["cost"]) for item in items]
return render_template("index.html", items=items_data, headings=headings)
items = [item for item in container.read_all_items()]
items_data = [
f"Name: {item['name']}, Shop: {item['shop']}, Cost: {item['cost']}"
for item in items
]
return str(items_data)
container.delete_all_items()
return render_template("index.html")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)