from flask import Flask, jsonify, request, render_template from pymongo import MongoClient from bson.objectid import ObjectId from bson.json_util import dumps, loads from datetime import datetime from argon2 import PasswordHasher import random import string import json import requests from urllib.parse import parse_qs with open("config/settings.json", "r") as f: settings = json.load(f) appName = settings["branding"]["name"] ph = PasswordHasher() github_client_id = settings["github_oauth"]["client_id"] github_client_secret = settings["github_oauth"]["client_secret"] github_auth_endpoint = f"https://github.com/login/oauth/authorize?response_type=code&client_id={github_client_id}&scope=user" github_token_endpoint = "https://github.com/login/oauth/access_token" github_user_endpoint = "https://api.github.com/user" mongoUser = settings["mongo_user"] mongoPassword = settings["mongo_password"] mongoHost = settings["mongo_host"] mongoPort = int(settings["mongo_port"]) mongoDatabase = settings["mongo_db"] mongoUri = f'mongodb://{mongoUser}:{mongoPassword}@{mongoHost}:{mongoPort}/' print(mongoUri) client = MongoClient(mongoUri) mydb = client[mongoDatabase] chatCollection = mydb.chats usersCollection = mydb.users try: client.server_info() print("Connected to MongoDB successfuly") except Exception as e: print("Error connecting to MongoDB:", e) app = Flask(__name__) def checkUserPermission(token, permission): # Find the correct user token in user db user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}, "permissions":1}) # If the user exists, continue, otherwise return fail if (user): # Convert _id to a string, python doesn't like ObjectId() user['_id'] = str(user['_id']) # Check if the token expiry is after the current date (Using unix timestamp, other mongodb Date datatype is a pain to use in python) if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())): # Store the userId userId = user['_id'] if permission in user["permissions"]: return True, userId elif (permission == True): return True, userId else: return False, "Incorrect permissions" else: return False, "Token is expired" else: return False, "Token doesn't exist" def checkChatPermission(token, chatId, permission): a, userId = checkUserPermission(token, True) if (a == True): # Get the chat from the chatId returnedChat = chatCollection.find_one({'_id': ObjectId(chatId)}) # Convert chatId into string returnedChat['_id'] = str(returnedChat['_id']) if permission in returnedChat['permissions'][userId]: return True, userId elif (permission == True): return True, userId else: return False, "Incorrect permissions" else: return False, "Invalid Token" # Chat List Endpoint: # Get all the chats associated with a user # Arguments: token (required) @app.route('/api/user/chats', methods = ['GET']) def getUserChats(): # Get user auth token token = request.json['token'] a, userId = checkUserPermission(token, True) if (a == True): returnedChats = list(chatCollection.find({'permissions.' + userId : "view"})) chats = [] for doc in returnedChats: if '_id' in doc and isinstance(doc['_id'], ObjectId): doc['_id'] = str(doc['_id']) chats.append(doc) jsonChats = json.dumps(chats, indent=2) return jsonChats # Chat Details Endpoint: # Get or change details about a chat using the chatId # Arguments: token (required), details (required), model, name @app.route('/api/chat/<_id>/details/
', methods = ['GET', 'POST']) def getChatHistory(_id, details): # Get user auth token token = request.json['token'] a, userId = checkChatPermission(token, _id, True) if (a == True): # If the user is trying to GET data if (request.method == 'GET'): # Get the chat from the chatId returnedChat = chatCollection.find_one({'_id': ObjectId(_id)}) # Convert chatId into string returnedChat['_id'] = str(returnedChat['_id']) # Get chat permissions a, userId = checkChatPermission(token, _id, "view") if (a == True): print("Chat " + _id + " has been found with token " + token) # Check for detail type and return correct value from db if (details == "history"): return jsonify(returnedChat["messages"]) elif (details == "users"): return jsonify(returnedChat["permissions"]) elif (details == "model"): return jsonify(returnedChat["model"]) elif (details == "name"): return jsonify(returnedChat["name"]) else: return jsonify("Invalid Permissions") else: a, userId = checkChatPermission(token, _id, "view") if (a == True): # Check for the detail type and add data to db if (details == "model"): model = request.json['model'] chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "model": model } }) if (details == "name"): name = request.json['name'] chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "name": name } }) return jsonify("Success") else: return jsonify("Invalid Permissions") else: return jsonify("User token is invalid") # Chat creation endpoint # Create a new chat # Arguments: token (required), name (required), model (required) @app.route('/api/chat/create', methods = ['POST']) def createChat(): # Get user auth token token = request.json['token'] a, userId = checkUserPermission(token, "createChat") if (a == True): name = request.json['name'] model = request.json['model'] chatCollection.insert_one( { "name":name, "model":model, "permissions": { userId:[ "owner", "view", "message", "edit" ] }, "messages": [ ] } ) return jsonify("Success") else: return jsonify("User token is invalid") # Signup page # Returns html signup page @app.route('/signup', methods = ['GET']) def signup(): token = request.cookies.get('auth_token', 'none') if (token == 'none'): return render_template('signup.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"]) # Index page # If logged in return home menu (Or logout if token is expired), # Otherwise return login screen @app.route('/', methods = ['GET']) def index(): token = request.cookies.get('auth_token', 'none') if (token == 'none'): return render_template('login.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"]) else: a, userId = checkUserPermission(token, True) if (a == True): return render_template('home.html', appName=appName) else: render_template('logout.html', appName=appName) # Login endpoint # Api backend for login screen, check for user and returns token # Arguments: username (required), password (required) @app.route('/api/login', methods = ['POST']) def handleLogin(): try: username = request.json['username'].lower() user = usersCollection.find_one({"$or":[{"username":username},{"email":username}]}) passwordHash = user["password"] password = request.json['password'] loggedin = ph.verify(passwordHash, password) userId = user['_id'] except: return jsonify("Incorrect username or password") if (loggedin): newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100)) newExpiry = int(datetime.now().timestamp()) newExpiry = newExpiry + 2678400 usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}}) return jsonify(newToken) # Github callback endpoint # Either logs in the user or returns extra details signup page # Arguments: code (required) (provided by github) @app.route('/api/github/authorized', methods = ['GET']) def handleGithubLogin(): code = request.args.get('code') res = requests.post( github_token_endpoint, data=dict( client_id=github_client_id, client_secret=github_client_secret, code=code, ), ) res = parse_qs(res.content.decode("utf-8")) token = res["access_token"][0] user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {token}")) for i in user_email.json(): if(i["primary"]==True): email = i["email"] try: user = usersCollection.find_one({"email":user_email}) userId = user['_id'] newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100)) newExpiry = int(datetime.now().timestamp()) newExpiry = newExpiry + 2678400 usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}}) return jsonify(newToken) except: return render_template("oauthsignup.html", appName = appName, provider="github", accessToken=token) # Oauth2.0 Account signup endpoint # Creates account using extra details and oauth access token # Arguments: code (required) (access token provided by idp), username (required), signupcode, display (required) @app.route('/api/oauthsignup', methods = ['POST']) def handleOauthSignup(): try: user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {request.json['code']}")) for i in user_email.json(): if(i["primary"]==True): email = i["email"] # Set user details username = request.json['username'].lower() passwordHash = "a" creationDate = int(datetime.now().timestamp()) accessCode = request.json['signupcode'] displayName = request.json['display'] # Check if details are taken sameUsername = usersCollection.count_documents({"username":username}) sameEmail = usersCollection.count_documents({"email":email}) if (sameUsername != 0 ) or ( sameEmail != 0): return jsonify("User already exists") # Check for appropriate role codeFound = False if (settings["signup_mode"] == "none"): return jsonify("Signups have been disabled") elif (settings["signup_mode"] == "codeoptional"): for i in settings["signup_codes"]: if (i["code"] == accessCode): codeFound = True role = i["role"] if (codeFound == False): role = settings["default_role"] elif (settings["signup_mode"] == "nocode"): role = settings["default_role"] elif (settings["signup_mode"] == "coderequired"): for i in settings["signup_codes"]: if (i["code"] == accessCode): codeFound = True role = i["role"] if (codeFound == False): return jsonify("Code not found") # Create user usersCollection.insert_one( { "_id": ObjectId(), "name":displayName, "username":username, "email":email, "permissions":settings["default_permissions"], "role":role, "password":passwordHash, "passkeys": [], "tokens": [], "creation_date": creationDate } ) except: return jsonify("An error occured") user = usersCollection.find_one({"email":email}) userId = user['_id'] newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100)) newExpiry = int(datetime.now().timestamp()) newExpiry = newExpiry + 2678400 usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}}) return jsonify(newToken) # Api signup endpoint # Create account with user details and get permission based on access code # Arguments: username (required), email, password (required), access_code, displayname (required) @app.route('/api/signup', methods = ['POST']) def handleSignup(): try: # Set user details username = request.json['username'].lower() email = request.json['email'].lower() password = request.json['password'] passwordHash = ph.hash(password) creationDate = int(datetime.now().timestamp()) accessCode = request.json['access_code'] displayName = request.json['displayname'] # Check if details are taken sameUsername = usersCollection.count_documents({"username":username}) sameEmail = usersCollection.count_documents({"email":email}) if (sameUsername != 0 ) or ( sameEmail != 0): return jsonify("User already exists") # Check for appropriate role codeFound = False if (settings["signup_mode"] == "none"): return jsonify("Signups have been disabled") elif (settings["signup_mode"] == "codeoptional"): for i in settings["signup_codes"]: if (i["code"] == accessCode): codeFound = True role = i["role"] if (codeFound == False): role = settings["default_role"] elif (settings["signup_mode"] == "nocode"): role = settings["default_role"] elif (settings["signup_mode"] == "coderequired"): for i in settings["signup_codes"]: if (i["code"] == accessCode): codeFound = True role = i["role"] if (codeFound == False): return jsonify("Code not found") # Create user usersCollection.insert_one( { "_id": ObjectId(), "name":displayName, "username":username, "email":email, "permissions":settings["default_permissions"], "role":role, "password":passwordHash, "passkeys": [], "tokens": [], "creation_date": creationDate } ) except: return jsonify("An error occured") # Logout endpoint # Logs out user and removes token from db # Arguments: auth_token (cookie) (required) @app.route('/logout', methods = ['GET']) def logout(): token = request.cookies.get('auth_token', 'none') try: token = request.headers['remove-token'] except: pass user = usersCollection.update_one({'tokens.token': token}, {"$pull":{'tokens':{'token':token}}}) return render_template('logout.html', appName=appName) if __name__ == '__main__': app.run(debug = True)