The signup form now works and the oauth buttons are only visible if they have been enabled in the settings.json file
373 lines
15 KiB
Python
373 lines
15 KiB
Python
from flask import Flask, jsonify, request, render_template
|
|
from pymongo import MongoClient
|
|
from bson.objectid import ObjectId
|
|
from datetime import datetime
|
|
from argon2 import PasswordHasher
|
|
import random
|
|
import string
|
|
import json
|
|
import requests
|
|
from urllib.parse import parse_qs
|
|
|
|
with open("config/settings.json", "r") as f:
|
|
settings = json.load(f)
|
|
|
|
appName = settings["branding"]["name"]
|
|
|
|
ph = PasswordHasher()
|
|
|
|
github_client_id = settings["github_oauth"]["client_id"]
|
|
github_client_secret = settings["github_oauth"]["client_secret"]
|
|
github_auth_endpoint = f"https://github.com/login/oauth/authorize?response_type=code&client_id={github_client_id}&scope=user"
|
|
github_token_endpoint = "https://github.com/login/oauth/access_token"
|
|
github_user_endpoint = "https://api.github.com/user"
|
|
|
|
mongoUser = 'root'
|
|
mongoPassword = settings["mongo_password"]
|
|
mongoHost = 'localhost'
|
|
mongoPort = '27017'
|
|
mongoDatabase = 'database'
|
|
mongoUri = f'mongodb://{mongoUser}:{mongoPassword}@{mongoHost}:{mongoPort}/'
|
|
|
|
print(mongoUri)
|
|
|
|
client = MongoClient(mongoUri)
|
|
mydb = client[mongoDatabase]
|
|
chatCollection = mydb.chats
|
|
usersCollection = mydb.users
|
|
|
|
try:
|
|
client.server_info()
|
|
print("Connected to MongoDB successfuly")
|
|
except Exception as e:
|
|
print("Error connecting to MongoDB:", e)
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Chat Details Endpoint:
|
|
# Get or change details about a chat using the chatId
|
|
# Arguments: token (required), details (required), model, name
|
|
@app.route('/api/chat/<_id>/details', methods = ['GET', 'POST'])
|
|
def getChatHistory(_id):
|
|
# Get user auth token
|
|
token = request.json['token']
|
|
# Find the correct user token in user db
|
|
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
|
|
# If the user exists, continue, otherwise return fail
|
|
if (user):
|
|
# Convert _id to a string, python doesn't like ObjectId()
|
|
user['_id'] = str(user['_id'])
|
|
# Check if the token expiry is after the current date (Using unix timestamp, other mongodb Date datatype is a pain to use in python)
|
|
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
|
|
# Store the userId
|
|
userId = user['_id']
|
|
print(userId)
|
|
# Get the request details
|
|
details = request.json['details']
|
|
# If the user is trying to GET data
|
|
if (request.method == 'GET'):
|
|
# Get the chat from the chatId
|
|
returnedChat = chatCollection.find_one({'_id': ObjectId(_id)})
|
|
# Convert chatId into string
|
|
returnedChat['_id'] = str(returnedChat['_id'])
|
|
try:
|
|
returnedChat["permissions"][userId].index("view")
|
|
print("Chat " + _id + " has been found with token " + token)
|
|
# Check for detail type and return correct value from db
|
|
if (details == "history"):
|
|
return jsonify(returnedChat["messages"])
|
|
elif (details == "users"):
|
|
return jsonify(returnedChat["permissions"])
|
|
elif (details == "model"):
|
|
return jsonify(returnedChat["model"])
|
|
elif (details == "name"):
|
|
return jsonify(returnedChat["name"])
|
|
except:
|
|
return jsonify("Invalid Permissions")
|
|
else:
|
|
try:
|
|
returnedChat["permissions"][userId].index("edit")
|
|
# Check for the detail type and add data to db
|
|
if (details == "model"):
|
|
model = request.json['model']
|
|
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "model": model } })
|
|
if (details == "name"):
|
|
name = request.json['name']
|
|
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "name": name } })
|
|
return jsonify("Success")
|
|
except:
|
|
return jsonify("Invalid Permissions")
|
|
else:
|
|
return jsonify("User token is invalid")
|
|
else:
|
|
return jsonify("User token is invalid")
|
|
|
|
# Chat creation endpoint
|
|
# Create a new chat
|
|
# Arguments: token (required), name (required), model (required)
|
|
@app.route('/api/chat/create', methods = ['POST'])
|
|
def createChat():
|
|
# Get user auth token
|
|
token = request.json['token']
|
|
# Find the correct user token in user db
|
|
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
|
|
# If the user exists, continue, otherwise return fail
|
|
if (user):
|
|
# Convert _id to a string, python doesn't like ObjectId()
|
|
user['_id'] = str(user['_id'])
|
|
# Check if the token expiry is after the current date (Using unix timestamp, other mongodb Date datatype is a pain to use in python)
|
|
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
|
|
# Store the userId
|
|
userId = user['_id']
|
|
print(user)
|
|
print(user['permissions'])
|
|
if ("createChat" in user['permissions']):
|
|
print(userId)
|
|
name = request.json['name']
|
|
model = request.json['model']
|
|
chatCollection.insert_one(
|
|
{
|
|
"name":name,
|
|
"model":model,
|
|
"permissions": {
|
|
userId:[
|
|
"owner",
|
|
"view",
|
|
"message",
|
|
"edit"
|
|
]
|
|
},
|
|
"messages": [
|
|
|
|
]
|
|
}
|
|
)
|
|
return jsonify("Success")
|
|
else:
|
|
return jsonify("Incorrect permissions")
|
|
else:
|
|
return jsonify("User token is invalid")
|
|
else:
|
|
return jsonify("User token is invalid")
|
|
|
|
# Signup page
|
|
# Returns html signup page
|
|
@app.route('/signup', methods = ['GET'])
|
|
def signup():
|
|
token = request.cookies.get('auth_token', 'none')
|
|
if (token == 'none'):
|
|
return render_template('signup.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"])
|
|
|
|
# Index page
|
|
# If logged in return home menu (Or logout if token is expired),
|
|
# Otherwise return login screen
|
|
@app.route('/', methods = ['GET'])
|
|
def index():
|
|
token = request.cookies.get('auth_token', 'none')
|
|
if (token == 'none'):
|
|
return render_template('login.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"])
|
|
else:
|
|
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
|
|
if (user):
|
|
user['_id'] = str(user['_id'])
|
|
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
|
|
return render_template('home.html', appName=appName)
|
|
else:
|
|
render_template('logout.html', appName=appName)
|
|
else:
|
|
render_template('logout.html', appName=appName)
|
|
|
|
# Login endpoint
|
|
# Api backend for login screen, check for user and returns token
|
|
# Arguments: username (required), password (required)
|
|
@app.route('/api/login', methods = ['POST'])
|
|
def handleLogin():
|
|
try:
|
|
username = request.json['username'].lower()
|
|
user = usersCollection.find_one({"$or":[{"username":username},{"email":username}]})
|
|
passwordHash = user["password"]
|
|
password = request.json['password']
|
|
loggedin = ph.verify(passwordHash, password)
|
|
userId = user['_id']
|
|
except:
|
|
return jsonify("Incorrect username or password")
|
|
if (loggedin):
|
|
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
|
|
newExpiry = int(datetime.now().timestamp())
|
|
newExpiry = newExpiry + 2678400
|
|
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
|
|
return jsonify(newToken)
|
|
|
|
# Github callback endpoint
|
|
# Either logs in the user or returns extra details signup page
|
|
# Arguments: code (required) (provided by github)
|
|
@app.route('/api/github/authorized', methods = ['GET'])
|
|
def handleGithubLogin():
|
|
code = request.args.get('code')
|
|
res = requests.post(
|
|
github_token_endpoint,
|
|
data=dict(
|
|
client_id=github_client_id,
|
|
client_secret=github_client_secret,
|
|
code=code,
|
|
),
|
|
)
|
|
res = parse_qs(res.content.decode("utf-8"))
|
|
token = res["access_token"][0]
|
|
user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {token}"))
|
|
for i in user_email.json():
|
|
if(i["primary"]==True):
|
|
email = i["email"]
|
|
try:
|
|
user = usersCollection.find_one({"email":user_email})
|
|
userId = user['_id']
|
|
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
|
|
newExpiry = int(datetime.now().timestamp())
|
|
newExpiry = newExpiry + 2678400
|
|
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
|
|
return jsonify(newToken)
|
|
except:
|
|
return render_template("oauthsignup.html", appName = appName, provider="github", accessToken=token)
|
|
|
|
# Oauth2.0 Account signup endpoint
|
|
# Creates account using extra details and oauth access token
|
|
# Arguments: code (required) (access token provided by idp), username (required), signupcode, display (required)
|
|
@app.route('/api/oauthsignup', methods = ['POST'])
|
|
def handleOauthSignup():
|
|
try:
|
|
user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {request.json['code']}"))
|
|
for i in user_email.json():
|
|
if(i["primary"]==True):
|
|
email = i["email"]
|
|
# Set user details
|
|
username = request.json['username'].lower()
|
|
passwordHash = "a"
|
|
creationDate = int(datetime.now().timestamp())
|
|
accessCode = request.json['signupcode']
|
|
displayName = request.json['display']
|
|
|
|
# Check if details are taken
|
|
sameUsername = usersCollection.count_documents({"username":username})
|
|
sameEmail = usersCollection.count_documents({"email":email})
|
|
if (sameUsername != 0 ) or ( sameEmail != 0):
|
|
return jsonify("User already exists")
|
|
|
|
# Check for appropriate role
|
|
codeFound = False
|
|
if (settings["signup_mode"] == "none"):
|
|
return jsonify("Signups have been disabled")
|
|
elif (settings["signup_mode"] == "codeoptional"):
|
|
for i in settings["signup_codes"]:
|
|
if (i["code"] == accessCode):
|
|
codeFound = True
|
|
role = i["role"]
|
|
if (codeFound == False):
|
|
role = settings["default_role"]
|
|
elif (settings["signup_mode"] == "nocode"):
|
|
role = settings["default_role"]
|
|
elif (settings["signup_mode"] == "coderequired"):
|
|
for i in settings["signup_codes"]:
|
|
if (i["code"] == accessCode):
|
|
codeFound = True
|
|
role = i["role"]
|
|
if (codeFound == False):
|
|
return jsonify("Code not found")
|
|
|
|
# Create user
|
|
usersCollection.insert_one(
|
|
{
|
|
"_id": ObjectId(),
|
|
"name":displayName,
|
|
"username":username,
|
|
"email":email,
|
|
"permissions":settings["default_permissions"],
|
|
"role":role,
|
|
"password":passwordHash,
|
|
"passkeys": [],
|
|
"tokens": [],
|
|
"creation_date": creationDate
|
|
}
|
|
)
|
|
except:
|
|
return jsonify("An error occured")
|
|
user = usersCollection.find_one({"email":email})
|
|
userId = user['_id']
|
|
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
|
|
newExpiry = int(datetime.now().timestamp())
|
|
newExpiry = newExpiry + 2678400
|
|
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
|
|
return jsonify(newToken)
|
|
|
|
# Api signup endpoint
|
|
# Create account with user details and get permission based on access code
|
|
# Arguments: username (required), email, password (required), access_code, displayname (required)
|
|
@app.route('/api/signup', methods = ['POST'])
|
|
def handleSignup():
|
|
try:
|
|
# Set user details
|
|
username = request.json['username'].lower()
|
|
email = request.json['email'].lower()
|
|
password = request.json['password']
|
|
passwordHash = ph.hash(password)
|
|
creationDate = int(datetime.now().timestamp())
|
|
accessCode = request.json['access_code']
|
|
displayName = request.json['displayname']
|
|
# Check if details are taken
|
|
sameUsername = usersCollection.count_documents({"username":username})
|
|
sameEmail = usersCollection.count_documents({"email":email})
|
|
if (sameUsername != 0 ) or ( sameEmail != 0):
|
|
return jsonify("User already exists")
|
|
# Check for appropriate role
|
|
codeFound = False
|
|
if (settings["signup_mode"] == "none"):
|
|
return jsonify("Signups have been disabled")
|
|
elif (settings["signup_mode"] == "codeoptional"):
|
|
for i in settings["signup_codes"]:
|
|
if (i["code"] == accessCode):
|
|
codeFound = True
|
|
role = i["role"]
|
|
if (codeFound == False):
|
|
role = settings["default_role"]
|
|
elif (settings["signup_mode"] == "nocode"):
|
|
role = settings["default_role"]
|
|
elif (settings["signup_mode"] == "coderequired"):
|
|
for i in settings["signup_codes"]:
|
|
if (i["code"] == accessCode):
|
|
codeFound = True
|
|
role = i["role"]
|
|
if (codeFound == False):
|
|
return jsonify("Code not found")
|
|
|
|
# Create user
|
|
usersCollection.insert_one(
|
|
{
|
|
"_id": ObjectId(),
|
|
"name":displayName,
|
|
"username":username,
|
|
"email":email,
|
|
"permissions":settings["default_permissions"],
|
|
"role":role,
|
|
"password":passwordHash,
|
|
"passkeys": [],
|
|
"tokens": [],
|
|
"creation_date": creationDate
|
|
}
|
|
)
|
|
except:
|
|
return jsonify("An error occured")
|
|
|
|
# Logout endpoint
|
|
# Logs out user and removes token from db
|
|
# Arguments: auth_token (cookie) (required)
|
|
@app.route('/logout', methods = ['GET'])
|
|
def logout():
|
|
token = request.cookies.get('auth_token', 'none')
|
|
try:
|
|
token = request.json['remove_token']
|
|
except:
|
|
pass
|
|
user = usersCollection.update_one({'tokens.token': token}, {"$pull":{'tokens':{'token':token}}})
|
|
return render_template('logout.html', appName=appName)
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug = True) |