Files
AiThingy/main.py
Hugo H 716a4c6148 Fixed signups and oauth buttons now only show if enabled
The signup form now works and the oauth buttons are only visible if they have been enabled in the settings.json file
2025-08-26 12:16:22 +01:00

373 lines
15 KiB
Python

from flask import Flask, jsonify, request, render_template
from pymongo import MongoClient
from bson.objectid import ObjectId
from datetime import datetime
from argon2 import PasswordHasher
import random
import string
import json
import requests
from urllib.parse import parse_qs
with open("config/settings.json", "r") as f:
settings = json.load(f)
appName = settings["branding"]["name"]
ph = PasswordHasher()
github_client_id = settings["github_oauth"]["client_id"]
github_client_secret = settings["github_oauth"]["client_secret"]
github_auth_endpoint = f"https://github.com/login/oauth/authorize?response_type=code&client_id={github_client_id}&scope=user"
github_token_endpoint = "https://github.com/login/oauth/access_token"
github_user_endpoint = "https://api.github.com/user"
mongoUser = 'root'
mongoPassword = settings["mongo_password"]
mongoHost = 'localhost'
mongoPort = '27017'
mongoDatabase = 'database'
mongoUri = f'mongodb://{mongoUser}:{mongoPassword}@{mongoHost}:{mongoPort}/'
print(mongoUri)
client = MongoClient(mongoUri)
mydb = client[mongoDatabase]
chatCollection = mydb.chats
usersCollection = mydb.users
try:
client.server_info()
print("Connected to MongoDB successfuly")
except Exception as e:
print("Error connecting to MongoDB:", e)
app = Flask(__name__)
# Chat Details Endpoint:
# Get or change details about a chat using the chatId
# Arguments: token (required), details (required), model, name
@app.route('/api/chat/<_id>/details', methods = ['GET', 'POST'])
def getChatHistory(_id):
# Get user auth token
token = request.json['token']
# Find the correct user token in user db
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
# If the user exists, continue, otherwise return fail
if (user):
# Convert _id to a string, python doesn't like ObjectId()
user['_id'] = str(user['_id'])
# Check if the token expiry is after the current date (Using unix timestamp, other mongodb Date datatype is a pain to use in python)
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
# Store the userId
userId = user['_id']
print(userId)
# Get the request details
details = request.json['details']
# If the user is trying to GET data
if (request.method == 'GET'):
# Get the chat from the chatId
returnedChat = chatCollection.find_one({'_id': ObjectId(_id)})
# Convert chatId into string
returnedChat['_id'] = str(returnedChat['_id'])
try:
returnedChat["permissions"][userId].index("view")
print("Chat " + _id + " has been found with token " + token)
# Check for detail type and return correct value from db
if (details == "history"):
return jsonify(returnedChat["messages"])
elif (details == "users"):
return jsonify(returnedChat["permissions"])
elif (details == "model"):
return jsonify(returnedChat["model"])
elif (details == "name"):
return jsonify(returnedChat["name"])
except:
return jsonify("Invalid Permissions")
else:
try:
returnedChat["permissions"][userId].index("edit")
# Check for the detail type and add data to db
if (details == "model"):
model = request.json['model']
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "model": model } })
if (details == "name"):
name = request.json['name']
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "name": name } })
return jsonify("Success")
except:
return jsonify("Invalid Permissions")
else:
return jsonify("User token is invalid")
else:
return jsonify("User token is invalid")
# Chat creation endpoint
# Create a new chat
# Arguments: token (required), name (required), model (required)
@app.route('/api/chat/create', methods = ['POST'])
def createChat():
# Get user auth token
token = request.json['token']
# Find the correct user token in user db
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
# If the user exists, continue, otherwise return fail
if (user):
# Convert _id to a string, python doesn't like ObjectId()
user['_id'] = str(user['_id'])
# Check if the token expiry is after the current date (Using unix timestamp, other mongodb Date datatype is a pain to use in python)
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
# Store the userId
userId = user['_id']
print(user)
print(user['permissions'])
if ("createChat" in user['permissions']):
print(userId)
name = request.json['name']
model = request.json['model']
chatCollection.insert_one(
{
"name":name,
"model":model,
"permissions": {
userId:[
"owner",
"view",
"message",
"edit"
]
},
"messages": [
]
}
)
return jsonify("Success")
else:
return jsonify("Incorrect permissions")
else:
return jsonify("User token is invalid")
else:
return jsonify("User token is invalid")
# Signup page
# Returns html signup page
@app.route('/signup', methods = ['GET'])
def signup():
token = request.cookies.get('auth_token', 'none')
if (token == 'none'):
return render_template('signup.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"])
# Index page
# If logged in return home menu (Or logout if token is expired),
# Otherwise return login screen
@app.route('/', methods = ['GET'])
def index():
token = request.cookies.get('auth_token', 'none')
if (token == 'none'):
return render_template('login.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"])
else:
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
if (user):
user['_id'] = str(user['_id'])
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
return render_template('home.html', appName=appName)
else:
render_template('logout.html', appName=appName)
else:
render_template('logout.html', appName=appName)
# Login endpoint
# Api backend for login screen, check for user and returns token
# Arguments: username (required), password (required)
@app.route('/api/login', methods = ['POST'])
def handleLogin():
try:
username = request.json['username'].lower()
user = usersCollection.find_one({"$or":[{"username":username},{"email":username}]})
passwordHash = user["password"]
password = request.json['password']
loggedin = ph.verify(passwordHash, password)
userId = user['_id']
except:
return jsonify("Incorrect username or password")
if (loggedin):
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
newExpiry = int(datetime.now().timestamp())
newExpiry = newExpiry + 2678400
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
return jsonify(newToken)
# Github callback endpoint
# Either logs in the user or returns extra details signup page
# Arguments: code (required) (provided by github)
@app.route('/api/github/authorized', methods = ['GET'])
def handleGithubLogin():
code = request.args.get('code')
res = requests.post(
github_token_endpoint,
data=dict(
client_id=github_client_id,
client_secret=github_client_secret,
code=code,
),
)
res = parse_qs(res.content.decode("utf-8"))
token = res["access_token"][0]
user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {token}"))
for i in user_email.json():
if(i["primary"]==True):
email = i["email"]
try:
user = usersCollection.find_one({"email":user_email})
userId = user['_id']
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
newExpiry = int(datetime.now().timestamp())
newExpiry = newExpiry + 2678400
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
return jsonify(newToken)
except:
return render_template("oauthsignup.html", appName = appName, provider="github", accessToken=token)
# Oauth2.0 Account signup endpoint
# Creates account using extra details and oauth access token
# Arguments: code (required) (access token provided by idp), username (required), signupcode, display (required)
@app.route('/api/oauthsignup', methods = ['POST'])
def handleOauthSignup():
try:
user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {request.json['code']}"))
for i in user_email.json():
if(i["primary"]==True):
email = i["email"]
# Set user details
username = request.json['username'].lower()
passwordHash = "a"
creationDate = int(datetime.now().timestamp())
accessCode = request.json['signupcode']
displayName = request.json['display']
# Check if details are taken
sameUsername = usersCollection.count_documents({"username":username})
sameEmail = usersCollection.count_documents({"email":email})
if (sameUsername != 0 ) or ( sameEmail != 0):
return jsonify("User already exists")
# Check for appropriate role
codeFound = False
if (settings["signup_mode"] == "none"):
return jsonify("Signups have been disabled")
elif (settings["signup_mode"] == "codeoptional"):
for i in settings["signup_codes"]:
if (i["code"] == accessCode):
codeFound = True
role = i["role"]
if (codeFound == False):
role = settings["default_role"]
elif (settings["signup_mode"] == "nocode"):
role = settings["default_role"]
elif (settings["signup_mode"] == "coderequired"):
for i in settings["signup_codes"]:
if (i["code"] == accessCode):
codeFound = True
role = i["role"]
if (codeFound == False):
return jsonify("Code not found")
# Create user
usersCollection.insert_one(
{
"_id": ObjectId(),
"name":displayName,
"username":username,
"email":email,
"permissions":settings["default_permissions"],
"role":role,
"password":passwordHash,
"passkeys": [],
"tokens": [],
"creation_date": creationDate
}
)
except:
return jsonify("An error occured")
user = usersCollection.find_one({"email":email})
userId = user['_id']
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
newExpiry = int(datetime.now().timestamp())
newExpiry = newExpiry + 2678400
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
return jsonify(newToken)
# Api signup endpoint
# Create account with user details and get permission based on access code
# Arguments: username (required), email, password (required), access_code, displayname (required)
@app.route('/api/signup', methods = ['POST'])
def handleSignup():
try:
# Set user details
username = request.json['username'].lower()
email = request.json['email'].lower()
password = request.json['password']
passwordHash = ph.hash(password)
creationDate = int(datetime.now().timestamp())
accessCode = request.json['access_code']
displayName = request.json['displayname']
# Check if details are taken
sameUsername = usersCollection.count_documents({"username":username})
sameEmail = usersCollection.count_documents({"email":email})
if (sameUsername != 0 ) or ( sameEmail != 0):
return jsonify("User already exists")
# Check for appropriate role
codeFound = False
if (settings["signup_mode"] == "none"):
return jsonify("Signups have been disabled")
elif (settings["signup_mode"] == "codeoptional"):
for i in settings["signup_codes"]:
if (i["code"] == accessCode):
codeFound = True
role = i["role"]
if (codeFound == False):
role = settings["default_role"]
elif (settings["signup_mode"] == "nocode"):
role = settings["default_role"]
elif (settings["signup_mode"] == "coderequired"):
for i in settings["signup_codes"]:
if (i["code"] == accessCode):
codeFound = True
role = i["role"]
if (codeFound == False):
return jsonify("Code not found")
# Create user
usersCollection.insert_one(
{
"_id": ObjectId(),
"name":displayName,
"username":username,
"email":email,
"permissions":settings["default_permissions"],
"role":role,
"password":passwordHash,
"passkeys": [],
"tokens": [],
"creation_date": creationDate
}
)
except:
return jsonify("An error occured")
# Logout endpoint
# Logs out user and removes token from db
# Arguments: auth_token (cookie) (required)
@app.route('/logout', methods = ['GET'])
def logout():
token = request.cookies.get('auth_token', 'none')
try:
token = request.json['remove_token']
except:
pass
user = usersCollection.update_one({'tokens.token': token}, {"$pull":{'tokens':{'token':token}}})
return render_template('logout.html', appName=appName)
if __name__ == '__main__':
app.run(debug = True)