Compare commits

5 Commits

Author SHA1 Message Date
dea4b79014 Fix APIs and intergrate chat permission checking 2025-09-03 10:33:23 +01:00
694947f225 Fixed chat permissions lookup function 2025-09-02 19:55:24 +01:00
08cc5dd165 Added chat list endpoint
Returns all chats that are associated with a user
2025-09-02 17:45:44 +01:00
ef577c11f7 Added function to check permissions on specific chat.
The checkChatPermission function checks if a use is allowed to perform an action in a chat
2025-09-02 17:30:14 +01:00
89f9b6d270 Moved permissions checking to a function
Permissions checking now happens in another function, code is now much cleaner
2025-08-31 11:22:41 +01:00

204
main.py
View File

@@ -1,6 +1,7 @@
from flask import Flask, jsonify, request, render_template
from pymongo import MongoClient
from bson.objectid import ObjectId
from bson.json_util import dumps, loads
from datetime import datetime
from argon2 import PasswordHasher
import random
@@ -44,15 +45,9 @@ except Exception as e:
app = Flask(__name__)
# Chat Details Endpoint:
# Get or change details about a chat using the chatId
# Arguments: token (required), details (required), model, name
@app.route('/api/chat/<_id>/details', methods = ['GET', 'POST'])
def getChatHistory(_id):
# Get user auth token
token = request.json['token']
def checkUserPermission(token, permission):
# Find the correct user token in user db
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}, "permissions":1})
# If the user exists, continue, otherwise return fail
if (user):
# Convert _id to a string, python doesn't like ObjectId()
@@ -61,44 +56,95 @@ def getChatHistory(_id):
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
# Store the userId
userId = user['_id']
print(userId)
# Get the request details
details = request.json['details']
# If the user is trying to GET data
if (request.method == 'GET'):
# Get the chat from the chatId
returnedChat = chatCollection.find_one({'_id': ObjectId(_id)})
# Convert chatId into string
returnedChat['_id'] = str(returnedChat['_id'])
try:
returnedChat["permissions"][userId].index("view")
print("Chat " + _id + " has been found with token " + token)
# Check for detail type and return correct value from db
if (details == "history"):
return jsonify(returnedChat["messages"])
elif (details == "users"):
return jsonify(returnedChat["permissions"])
elif (details == "model"):
return jsonify(returnedChat["model"])
elif (details == "name"):
return jsonify(returnedChat["name"])
except:
return jsonify("Invalid Permissions")
if permission in user["permissions"]:
return True, userId
elif (permission == True):
return True, userId
else:
try:
returnedChat["permissions"][userId].index("edit")
# Check for the detail type and add data to db
if (details == "model"):
model = request.json['model']
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "model": model } })
if (details == "name"):
name = request.json['name']
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "name": name } })
return jsonify("Success")
except:
return jsonify("Invalid Permissions")
return False, "Incorrect permissions"
else:
return jsonify("User token is invalid")
return False, "Token is expired"
else:
return False, "Token doesn't exist"
def checkChatPermission(token, chatId, permission):
a, userId = checkUserPermission(token, True)
if (a == True):
# Get the chat from the chatId
returnedChat = chatCollection.find_one({'_id': ObjectId(chatId)})
# Convert chatId into string
returnedChat['_id'] = str(returnedChat['_id'])
if permission in returnedChat['permissions'][userId]:
return True, userId
elif (permission == True):
return True, userId
else:
return False, "Incorrect permissions"
else:
return False, "Invalid Token"
# Chat List Endpoint:
# Get all the chats associated with a user
# Arguments: token (required)
@app.route('/api/user/chats', methods = ['GET'])
def getUserChats():
# Get user auth token
token = request.json['token']
a, userId = checkUserPermission(token, True)
if (a == True):
returnedChats = list(chatCollection.find({'permissions.' + userId : "view"}))
chats = []
for doc in returnedChats:
if '_id' in doc and isinstance(doc['_id'], ObjectId):
doc['_id'] = str(doc['_id'])
chats.append(doc)
jsonChats = json.dumps(chats, indent=2)
return jsonChats
# Chat Details Endpoint:
# Get or change details about a chat using the chatId
# Arguments: token (required), details (required), model, name
@app.route('/api/chat/<_id>/details/<details>', methods = ['GET', 'POST'])
def getChatHistory(_id, details):
# Get user auth token
token = request.json['token']
a, userId = checkChatPermission(token, _id, True)
if (a == True):
# If the user is trying to GET data
if (request.method == 'GET'):
# Get the chat from the chatId
returnedChat = chatCollection.find_one({'_id': ObjectId(_id)})
# Convert chatId into string
returnedChat['_id'] = str(returnedChat['_id'])
# Get chat permissions
a, userId = checkChatPermission(token, _id, "view")
if (a == True):
print("Chat " + _id + " has been found with token " + token)
# Check for detail type and return correct value from db
if (details == "history"):
return jsonify(returnedChat["messages"])
elif (details == "users"):
return jsonify(returnedChat["permissions"])
elif (details == "model"):
return jsonify(returnedChat["model"])
elif (details == "name"):
return jsonify(returnedChat["name"])
else:
return jsonify("Invalid Permissions")
else:
a, userId = checkChatPermission(token, _id, "view")
if (a == True):
# Check for the detail type and add data to db
if (details == "model"):
model = request.json['model']
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "model": model } })
if (details == "name"):
name = request.json['name']
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "name": name } })
return jsonify("Success")
else:
return jsonify("Invalid Permissions")
else:
return jsonify("User token is invalid")
@@ -109,44 +155,28 @@ def getChatHistory(_id):
def createChat():
# Get user auth token
token = request.json['token']
# Find the correct user token in user db
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
# If the user exists, continue, otherwise return fail
if (user):
# Convert _id to a string, python doesn't like ObjectId()
user['_id'] = str(user['_id'])
# Check if the token expiry is after the current date (Using unix timestamp, other mongodb Date datatype is a pain to use in python)
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
# Store the userId
userId = user['_id']
print(user)
print(user['permissions'])
if ("createChat" in user['permissions']):
print(userId)
name = request.json['name']
model = request.json['model']
chatCollection.insert_one(
{
"name":name,
"model":model,
"permissions": {
userId:[
"owner",
"view",
"message",
"edit"
]
},
"messages": [
]
}
)
return jsonify("Success")
else:
return jsonify("Incorrect permissions")
else:
return jsonify("User token is invalid")
a, userId = checkUserPermission(token, "createChat")
if (a == True):
name = request.json['name']
model = request.json['model']
chatCollection.insert_one(
{
"name":name,
"model":model,
"permissions": {
userId:[
"owner",
"view",
"message",
"edit"
]
},
"messages": [
]
}
)
return jsonify("Success")
else:
return jsonify("User token is invalid")
@@ -167,13 +197,9 @@ def index():
if (token == 'none'):
return render_template('login.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"])
else:
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
if (user):
user['_id'] = str(user['_id'])
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
a, userId = checkUserPermission(token, True)
if (a == True):
return render_template('home.html', appName=appName)
else:
render_template('logout.html', appName=appName)
else:
render_template('logout.html', appName=appName)
@@ -363,7 +389,7 @@ def handleSignup():
def logout():
token = request.cookies.get('auth_token', 'none')
try:
token = request.json['remove_token']
token = request.headers['remove-token']
except:
pass
user = usersCollection.update_one({'tokens.token': token}, {"$pull":{'tokens':{'token':token}}})