398 lines
15 KiB
Python
398 lines
15 KiB
Python
from flask import Flask, jsonify, request, render_template
|
|
from pymongo import MongoClient
|
|
from bson.objectid import ObjectId
|
|
from bson.json_util import dumps, loads
|
|
from datetime import datetime
|
|
from argon2 import PasswordHasher
|
|
import random
|
|
import string
|
|
import json
|
|
import requests
|
|
from urllib.parse import parse_qs
|
|
|
|
with open("config/settings.json", "r") as f:
|
|
settings = json.load(f)
|
|
|
|
appName = settings["branding"]["name"]
|
|
|
|
ph = PasswordHasher()
|
|
|
|
github_client_id = settings["github_oauth"]["client_id"]
|
|
github_client_secret = settings["github_oauth"]["client_secret"]
|
|
github_auth_endpoint = f"https://github.com/login/oauth/authorize?response_type=code&client_id={github_client_id}&scope=user"
|
|
github_token_endpoint = "https://github.com/login/oauth/access_token"
|
|
github_user_endpoint = "https://api.github.com/user"
|
|
|
|
mongoUser = 'root'
|
|
mongoPassword = settings["mongo_password"]
|
|
mongoHost = 'localhost'
|
|
mongoPort = '27017'
|
|
mongoDatabase = 'database'
|
|
mongoUri = f'mongodb://{mongoUser}:{mongoPassword}@{mongoHost}:{mongoPort}/'
|
|
|
|
print(mongoUri)
|
|
|
|
client = MongoClient(mongoUri)
|
|
mydb = client[mongoDatabase]
|
|
chatCollection = mydb.chats
|
|
usersCollection = mydb.users
|
|
|
|
try:
|
|
client.server_info()
|
|
print("Connected to MongoDB successfuly")
|
|
except Exception as e:
|
|
print("Error connecting to MongoDB:", e)
|
|
|
|
app = Flask(__name__)
|
|
|
|
def checkUserPermission(token, permission):
|
|
# Find the correct user token in user db
|
|
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}, "permissions":1})
|
|
# If the user exists, continue, otherwise return fail
|
|
if (user):
|
|
# Convert _id to a string, python doesn't like ObjectId()
|
|
user['_id'] = str(user['_id'])
|
|
# Check if the token expiry is after the current date (Using unix timestamp, other mongodb Date datatype is a pain to use in python)
|
|
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
|
|
# Store the userId
|
|
userId = user['_id']
|
|
if permission in user["permissions"]:
|
|
return True, userId
|
|
elif (permission == True):
|
|
return True, userId
|
|
else:
|
|
return False, "Incorrect permissions"
|
|
else:
|
|
return False, "Token is expired"
|
|
else:
|
|
return False, "Token doesn't exist"
|
|
|
|
def checkChatPermission(token, chatId, permission):
|
|
a, userId = checkUserPermission(token, True)
|
|
if (a == True):
|
|
# Get the chat from the chatId
|
|
returnedChat = chatCollection.find_one({'_id': ObjectId(chatId)})
|
|
# Convert chatId into string
|
|
returnedChat['_id'] = str(returnedChat['_id'])
|
|
if permission in returnedChat['permissions']:
|
|
return True, userId
|
|
else:
|
|
return False, "Incorrect permissions"
|
|
else:
|
|
return False, "Invalid Token"
|
|
|
|
# Chat List Endpoint:
|
|
# Get all the chats associated with a user
|
|
# Arguments: token (required)
|
|
@app.route('/api/user/chats', methods = ['GET'])
|
|
def getUserChats():
|
|
# Get user auth token
|
|
token = request.json['token']
|
|
a, userId = checkUserPermission(token, True)
|
|
if (a == True):
|
|
returnedChats = list(chatCollection.find({'permissions.' + userId : "view"}))
|
|
chats = []
|
|
for doc in returnedChats:
|
|
if '_id' in doc and isinstance(doc['_id'], ObjectId):
|
|
doc['_id'] = str(doc['_id'])
|
|
chats.append(doc)
|
|
|
|
jsonChats = json.dumps(chats, indent=2)
|
|
return jsonChats
|
|
|
|
# Chat Details Endpoint:
|
|
# Get or change details about a chat using the chatId
|
|
# Arguments: token (required), details (required), model, name
|
|
@app.route('/api/chat/<_id>/details', methods = ['GET', 'POST'])
|
|
def getChatHistory(_id):
|
|
# Get user auth token
|
|
token = request.json['token']
|
|
a, userId = checkUserPermission(token, True)
|
|
if (a == True):
|
|
# Get the request details
|
|
details = request.json['details']
|
|
# If the user is trying to GET data
|
|
if (request.method == 'GET'):
|
|
# Get the chat from the chatId
|
|
returnedChat = chatCollection.find_one({'_id': ObjectId(_id)})
|
|
# Convert chatId into string
|
|
returnedChat['_id'] = str(returnedChat['_id'])
|
|
try:
|
|
returnedChat["permissions"][userId].index("view")
|
|
print("Chat " + _id + " has been found with token " + token)
|
|
# Check for detail type and return correct value from db
|
|
if (details == "history"):
|
|
return jsonify(returnedChat["messages"])
|
|
elif (details == "users"):
|
|
return jsonify(returnedChat["permissions"])
|
|
elif (details == "model"):
|
|
return jsonify(returnedChat["model"])
|
|
elif (details == "name"):
|
|
return jsonify(returnedChat["name"])
|
|
except:
|
|
return jsonify("Invalid Permissions")
|
|
else:
|
|
try:
|
|
returnedChat["permissions"][userId].index("edit")
|
|
# Check for the detail type and add data to db
|
|
if (details == "model"):
|
|
model = request.json['model']
|
|
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "model": model } })
|
|
if (details == "name"):
|
|
name = request.json['name']
|
|
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "name": name } })
|
|
return jsonify("Success")
|
|
except:
|
|
return jsonify("Invalid Permissions")
|
|
else:
|
|
return jsonify("User token is invalid")
|
|
|
|
# Chat creation endpoint
|
|
# Create a new chat
|
|
# Arguments: token (required), name (required), model (required)
|
|
@app.route('/api/chat/create', methods = ['POST'])
|
|
def createChat():
|
|
# Get user auth token
|
|
token = request.json['token']
|
|
a, userId = checkUserPermission(token, "createChat")
|
|
if (a == True):
|
|
name = request.json['name']
|
|
model = request.json['model']
|
|
chatCollection.insert_one(
|
|
{
|
|
"name":name,
|
|
"model":model,
|
|
"permissions": {
|
|
userId:[
|
|
"owner",
|
|
"view",
|
|
"message",
|
|
"edit"
|
|
]
|
|
},
|
|
"messages": [
|
|
|
|
]
|
|
}
|
|
)
|
|
return jsonify("Success")
|
|
else:
|
|
return jsonify("User token is invalid")
|
|
|
|
# Signup page
|
|
# Returns html signup page
|
|
@app.route('/signup', methods = ['GET'])
|
|
def signup():
|
|
token = request.cookies.get('auth_token', 'none')
|
|
if (token == 'none'):
|
|
return render_template('signup.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"])
|
|
|
|
# Index page
|
|
# If logged in return home menu (Or logout if token is expired),
|
|
# Otherwise return login screen
|
|
@app.route('/', methods = ['GET'])
|
|
def index():
|
|
token = request.cookies.get('auth_token', 'none')
|
|
if (token == 'none'):
|
|
return render_template('login.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"])
|
|
else:
|
|
a, userId = checkUserPermission(token, True)
|
|
if (a == True):
|
|
return render_template('home.html', appName=appName)
|
|
else:
|
|
render_template('logout.html', appName=appName)
|
|
|
|
# Login endpoint
|
|
# Api backend for login screen, check for user and returns token
|
|
# Arguments: username (required), password (required)
|
|
@app.route('/api/login', methods = ['POST'])
|
|
def handleLogin():
|
|
try:
|
|
username = request.json['username'].lower()
|
|
user = usersCollection.find_one({"$or":[{"username":username},{"email":username}]})
|
|
passwordHash = user["password"]
|
|
password = request.json['password']
|
|
loggedin = ph.verify(passwordHash, password)
|
|
userId = user['_id']
|
|
except:
|
|
return jsonify("Incorrect username or password")
|
|
if (loggedin):
|
|
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
|
|
newExpiry = int(datetime.now().timestamp())
|
|
newExpiry = newExpiry + 2678400
|
|
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
|
|
return jsonify(newToken)
|
|
|
|
# Github callback endpoint
|
|
# Either logs in the user or returns extra details signup page
|
|
# Arguments: code (required) (provided by github)
|
|
@app.route('/api/github/authorized', methods = ['GET'])
|
|
def handleGithubLogin():
|
|
code = request.args.get('code')
|
|
res = requests.post(
|
|
github_token_endpoint,
|
|
data=dict(
|
|
client_id=github_client_id,
|
|
client_secret=github_client_secret,
|
|
code=code,
|
|
),
|
|
)
|
|
res = parse_qs(res.content.decode("utf-8"))
|
|
token = res["access_token"][0]
|
|
user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {token}"))
|
|
for i in user_email.json():
|
|
if(i["primary"]==True):
|
|
email = i["email"]
|
|
try:
|
|
user = usersCollection.find_one({"email":user_email})
|
|
userId = user['_id']
|
|
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
|
|
newExpiry = int(datetime.now().timestamp())
|
|
newExpiry = newExpiry + 2678400
|
|
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
|
|
return jsonify(newToken)
|
|
except:
|
|
return render_template("oauthsignup.html", appName = appName, provider="github", accessToken=token)
|
|
|
|
# Oauth2.0 Account signup endpoint
|
|
# Creates account using extra details and oauth access token
|
|
# Arguments: code (required) (access token provided by idp), username (required), signupcode, display (required)
|
|
@app.route('/api/oauthsignup', methods = ['POST'])
|
|
def handleOauthSignup():
|
|
try:
|
|
user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {request.json['code']}"))
|
|
for i in user_email.json():
|
|
if(i["primary"]==True):
|
|
email = i["email"]
|
|
# Set user details
|
|
username = request.json['username'].lower()
|
|
passwordHash = "a"
|
|
creationDate = int(datetime.now().timestamp())
|
|
accessCode = request.json['signupcode']
|
|
displayName = request.json['display']
|
|
|
|
# Check if details are taken
|
|
sameUsername = usersCollection.count_documents({"username":username})
|
|
sameEmail = usersCollection.count_documents({"email":email})
|
|
if (sameUsername != 0 ) or ( sameEmail != 0):
|
|
return jsonify("User already exists")
|
|
|
|
# Check for appropriate role
|
|
codeFound = False
|
|
if (settings["signup_mode"] == "none"):
|
|
return jsonify("Signups have been disabled")
|
|
elif (settings["signup_mode"] == "codeoptional"):
|
|
for i in settings["signup_codes"]:
|
|
if (i["code"] == accessCode):
|
|
codeFound = True
|
|
role = i["role"]
|
|
if (codeFound == False):
|
|
role = settings["default_role"]
|
|
elif (settings["signup_mode"] == "nocode"):
|
|
role = settings["default_role"]
|
|
elif (settings["signup_mode"] == "coderequired"):
|
|
for i in settings["signup_codes"]:
|
|
if (i["code"] == accessCode):
|
|
codeFound = True
|
|
role = i["role"]
|
|
if (codeFound == False):
|
|
return jsonify("Code not found")
|
|
|
|
# Create user
|
|
usersCollection.insert_one(
|
|
{
|
|
"_id": ObjectId(),
|
|
"name":displayName,
|
|
"username":username,
|
|
"email":email,
|
|
"permissions":settings["default_permissions"],
|
|
"role":role,
|
|
"password":passwordHash,
|
|
"passkeys": [],
|
|
"tokens": [],
|
|
"creation_date": creationDate
|
|
}
|
|
)
|
|
except:
|
|
return jsonify("An error occured")
|
|
user = usersCollection.find_one({"email":email})
|
|
userId = user['_id']
|
|
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
|
|
newExpiry = int(datetime.now().timestamp())
|
|
newExpiry = newExpiry + 2678400
|
|
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
|
|
return jsonify(newToken)
|
|
|
|
# Api signup endpoint
|
|
# Create account with user details and get permission based on access code
|
|
# Arguments: username (required), email, password (required), access_code, displayname (required)
|
|
@app.route('/api/signup', methods = ['POST'])
|
|
def handleSignup():
|
|
try:
|
|
# Set user details
|
|
username = request.json['username'].lower()
|
|
email = request.json['email'].lower()
|
|
password = request.json['password']
|
|
passwordHash = ph.hash(password)
|
|
creationDate = int(datetime.now().timestamp())
|
|
accessCode = request.json['access_code']
|
|
displayName = request.json['displayname']
|
|
# Check if details are taken
|
|
sameUsername = usersCollection.count_documents({"username":username})
|
|
sameEmail = usersCollection.count_documents({"email":email})
|
|
if (sameUsername != 0 ) or ( sameEmail != 0):
|
|
return jsonify("User already exists")
|
|
# Check for appropriate role
|
|
codeFound = False
|
|
if (settings["signup_mode"] == "none"):
|
|
return jsonify("Signups have been disabled")
|
|
elif (settings["signup_mode"] == "codeoptional"):
|
|
for i in settings["signup_codes"]:
|
|
if (i["code"] == accessCode):
|
|
codeFound = True
|
|
role = i["role"]
|
|
if (codeFound == False):
|
|
role = settings["default_role"]
|
|
elif (settings["signup_mode"] == "nocode"):
|
|
role = settings["default_role"]
|
|
elif (settings["signup_mode"] == "coderequired"):
|
|
for i in settings["signup_codes"]:
|
|
if (i["code"] == accessCode):
|
|
codeFound = True
|
|
role = i["role"]
|
|
if (codeFound == False):
|
|
return jsonify("Code not found")
|
|
|
|
# Create user
|
|
usersCollection.insert_one(
|
|
{
|
|
"_id": ObjectId(),
|
|
"name":displayName,
|
|
"username":username,
|
|
"email":email,
|
|
"permissions":settings["default_permissions"],
|
|
"role":role,
|
|
"password":passwordHash,
|
|
"passkeys": [],
|
|
"tokens": [],
|
|
"creation_date": creationDate
|
|
}
|
|
)
|
|
except:
|
|
return jsonify("An error occured")
|
|
|
|
# Logout endpoint
|
|
# Logs out user and removes token from db
|
|
# Arguments: auth_token (cookie) (required)
|
|
@app.route('/logout', methods = ['GET'])
|
|
def logout():
|
|
token = request.cookies.get('auth_token', 'none')
|
|
try:
|
|
token = request.json['remove_token']
|
|
except:
|
|
pass
|
|
user = usersCollection.update_one({'tokens.token': token}, {"$pull":{'tokens':{'token':token}}})
|
|
return render_template('logout.html', appName=appName)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug = True) |