Added auth and basic chat system
This commit is contained in:
358
main.py
Normal file
358
main.py
Normal file
@@ -0,0 +1,358 @@
|
||||
from flask import Flask, jsonify, request, render_template
|
||||
from pymongo import MongoClient
|
||||
from bson.objectid import ObjectId
|
||||
from datetime import datetime
|
||||
from argon2 import PasswordHasher
|
||||
import random
|
||||
import string
|
||||
import json
|
||||
import requests
|
||||
from urllib.parse import parse_qs
|
||||
|
||||
with open("config/settings.json", "r") as f:
|
||||
settings = json.load(f)
|
||||
|
||||
appName = settings["branding"]["name"]
|
||||
|
||||
ph = PasswordHasher()
|
||||
|
||||
github_client_id = settings["github_oauth"]["client_id"]
|
||||
github_client_secret = settings["github_oauth"]["client_secret"]
|
||||
github_auth_endpoint = f"https://github.com/login/oauth/authorize?response_type=code&client_id={github_client_id}&scope=user"
|
||||
github_token_endpoint = "https://github.com/login/oauth/access_token"
|
||||
github_user_endpoint = "https://api.github.com/user"
|
||||
|
||||
mongoUser = 'root'
|
||||
mongoPassword = '39zj6bNT5gaXbmuOBAYn5pZRO'
|
||||
mongoHost = 'localhost'
|
||||
mongoPort = '27017'
|
||||
mongoDatabase = 'database'
|
||||
mongoUri = f'mongodb://{mongoUser}:{mongoPassword}@{mongoHost}:{mongoPort}/'
|
||||
|
||||
print(mongoUri)
|
||||
|
||||
client = MongoClient(mongoUri)
|
||||
mydb = client[mongoDatabase]
|
||||
chatCollection = mydb.chats
|
||||
usersCollection = mydb.users
|
||||
|
||||
try:
|
||||
client.server_info()
|
||||
print("Connected to MongoDB successfuly")
|
||||
except Exception as e:
|
||||
print("Error connecting to MongoDB:", e)
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
# Chat Details Endpoint:
|
||||
# Gets details about a chat using the chatId
|
||||
# Arguments: token (required), details (required), model, name
|
||||
@app.route('/api/chat/<_id>/details', methods = ['GET', 'POST'])
|
||||
def getChatHistory(_id):
|
||||
# Get user auth token
|
||||
token = request.json['token']
|
||||
# Find the correct user token in user db
|
||||
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
|
||||
# If the user exists, continue, otherwise return fail
|
||||
if (user):
|
||||
# Convert _id to a string, python doesn't like ObjectId()
|
||||
user['_id'] = str(user['_id'])
|
||||
# Check if the token expiry is after the current date (Using unix timestamp, other mongodb Date datatype is a pain to use in python)
|
||||
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
|
||||
# Store the userId
|
||||
userId = user['_id']
|
||||
print(userId)
|
||||
# Get the request details
|
||||
details = request.json['details']
|
||||
# If the user is trying to GET data
|
||||
if (request.method == 'GET'):
|
||||
# Get the chat from the chatId
|
||||
returnedChat = chatCollection.find_one({'_id': ObjectId(_id)})
|
||||
# Convert chatId into string
|
||||
returnedChat['_id'] = str(returnedChat['_id'])
|
||||
print("Chat " + _id + " has been found with token " + token)
|
||||
# Check for detail type and return correct value from db
|
||||
if (details == "history"):
|
||||
return jsonify(returnedChat["messages"])
|
||||
elif (details == "users"):
|
||||
return jsonify(returnedChat["permissions"])
|
||||
elif (details == "model"):
|
||||
return jsonify(returnedChat["model"])
|
||||
elif (details == "name"):
|
||||
return jsonify(returnedChat["name"])
|
||||
else:
|
||||
# Check for the detail type and add data to db
|
||||
if (details == "model"):
|
||||
model = request.json['model']
|
||||
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "model": model } })
|
||||
if (details == "name"):
|
||||
name = request.json['name']
|
||||
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "name": name } })
|
||||
return jsonify("Success")
|
||||
else:
|
||||
return jsonify("User token is invalid")
|
||||
else:
|
||||
return jsonify("User token is invalid")
|
||||
|
||||
# Chat creation endpoint
|
||||
# Create a new chat
|
||||
# Arguments: token (required), name (required), model (required)
|
||||
@app.route('/api/chat/create', methods = ['POST'])
|
||||
def createChat():
|
||||
token = request.json['token']
|
||||
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}, "permissions":1})
|
||||
if (user):
|
||||
user['_id'] = str(user['_id'])
|
||||
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
|
||||
userId = user['_id']
|
||||
print(user)
|
||||
print(user['permissions'])
|
||||
if ("createChat" in user['permissions']):
|
||||
print(userId)
|
||||
name = request.json['name']
|
||||
model = request.json['model']
|
||||
chatCollection.insert_one(
|
||||
{
|
||||
"name":name,
|
||||
"model":model,
|
||||
"permissions": {
|
||||
userId:[
|
||||
"owner",
|
||||
"view",
|
||||
"message"
|
||||
]
|
||||
},
|
||||
"messages": [
|
||||
|
||||
]
|
||||
}
|
||||
)
|
||||
return jsonify("Success")
|
||||
else:
|
||||
return jsonify("Incorrect permissions")
|
||||
else:
|
||||
return jsonify("User token is invalid")
|
||||
else:
|
||||
return jsonify("User token is invalid")
|
||||
|
||||
# Signup page
|
||||
# Returns html signup page
|
||||
@app.route('/signup', methods = ['GET'])
|
||||
def signup():
|
||||
pass
|
||||
|
||||
# Index page
|
||||
# If logged in return home menu (Or logout if token is expired),
|
||||
# Otherwise return login screen
|
||||
@app.route('/', methods = ['GET'])
|
||||
def index():
|
||||
token = request.cookies.get('auth_token', 'none')
|
||||
if (token == 'none'):
|
||||
return render_template('login.html', appName=appName, githubUrl=github_auth_endpoint)
|
||||
else:
|
||||
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}})
|
||||
if (user):
|
||||
user['_id'] = str(user['_id'])
|
||||
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
|
||||
return render_template('home.html', appName=appName)
|
||||
else:
|
||||
render_template('logout.html', appName=appName)
|
||||
else:
|
||||
render_template('logout.html', appName=appName)
|
||||
|
||||
# Login endpoint
|
||||
# Api backend for login screen, check for user and returns token
|
||||
# Arguments: username (required), password (required)
|
||||
@app.route('/api/login', methods = ['POST'])
|
||||
def handleLogin():
|
||||
try:
|
||||
username = request.json['username'].lower()
|
||||
user = usersCollection.find_one({"$or":[{"username":username},{"email":username}]})
|
||||
passwordHash = user["password"]
|
||||
password = request.json['password']
|
||||
loggedin = ph.verify(passwordHash, password)
|
||||
userId = user['_id']
|
||||
except:
|
||||
return jsonify("Incorrect username or password")
|
||||
if (loggedin):
|
||||
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
|
||||
newExpiry = int(datetime.now().timestamp())
|
||||
newExpiry = newExpiry + 2678400
|
||||
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
|
||||
return jsonify(newToken)
|
||||
|
||||
# Github callback endpoint
|
||||
# Either logs in the user or returns extra details signup page
|
||||
# Arguments: code (required) (provided by github)
|
||||
@app.route('/api/github/authorized', methods = ['GET'])
|
||||
def handleGithubLogin():
|
||||
code = request.args.get('code')
|
||||
res = requests.post(
|
||||
github_token_endpoint,
|
||||
data=dict(
|
||||
client_id=github_client_id,
|
||||
client_secret=github_client_secret,
|
||||
code=code,
|
||||
),
|
||||
)
|
||||
res = parse_qs(res.content.decode("utf-8"))
|
||||
token = res["access_token"][0]
|
||||
user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {token}"))
|
||||
for i in user_email.json():
|
||||
if(i["primary"]==True):
|
||||
email = i["email"]
|
||||
try:
|
||||
user = usersCollection.find_one({"email":user_email})
|
||||
userId = user['_id']
|
||||
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
|
||||
newExpiry = int(datetime.now().timestamp())
|
||||
newExpiry = newExpiry + 2678400
|
||||
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
|
||||
return jsonify(newToken)
|
||||
except:
|
||||
return render_template("oauthsignup.html", appName = appName, provider="github", accessToken=token)
|
||||
|
||||
# Oauth2.0 Account signup endpoint
|
||||
# Creates account using extra details and oauth access token
|
||||
# Arguments: code (required) (access token provided by idp), username (required), signupcode, display (required)
|
||||
@app.route('/api/oauthsignup', methods = ['POST'])
|
||||
def handleOauthSignup():
|
||||
try:
|
||||
user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {request.json['code']}"))
|
||||
for i in user_email.json():
|
||||
if(i["primary"]==True):
|
||||
email = i["email"]
|
||||
# Set user details
|
||||
username = request.json['username'].lower()
|
||||
passwordHash = "a"
|
||||
creationDate = int(datetime.now().timestamp())
|
||||
accessCode = request.json['signupcode']
|
||||
displayName = request.json['display']
|
||||
|
||||
# Check if details are taken
|
||||
sameUsername = usersCollection.count_documents({"username":username})
|
||||
sameEmail = usersCollection.count_documents({"email":email})
|
||||
if (sameUsername != 0 ) or ( sameEmail != 0):
|
||||
return jsonify("User already exists")
|
||||
|
||||
# Check for appropriate role
|
||||
codeFound = False
|
||||
if (settings["signup_mode"] == "none"):
|
||||
return jsonify("Signups have been disabled")
|
||||
elif (settings["signup_mode"] == "codeoptional"):
|
||||
for i in settings["signup_codes"]:
|
||||
if (i["code"] == accessCode):
|
||||
codeFound = True
|
||||
role = i["role"]
|
||||
if (codeFound == False):
|
||||
role = settings["default_role"]
|
||||
elif (settings["signup_mode"] == "nocode"):
|
||||
role = settings["default_role"]
|
||||
elif (settings["signup_mode"] == "coderequired"):
|
||||
for i in settings["signup_codes"]:
|
||||
if (i["code"] == accessCode):
|
||||
codeFound = True
|
||||
role = i["role"]
|
||||
if (codeFound == False):
|
||||
return jsonify("Code not found")
|
||||
|
||||
# Create user
|
||||
usersCollection.insert_one(
|
||||
{
|
||||
"_id": ObjectId(),
|
||||
"name":displayName,
|
||||
"username":username,
|
||||
"email":email,
|
||||
"permissions":settings["default_permissions"],
|
||||
"role":role,
|
||||
"password":passwordHash,
|
||||
"passkeys": [],
|
||||
"tokens": [],
|
||||
"creation_date": creationDate
|
||||
}
|
||||
)
|
||||
except:
|
||||
return jsonify("An error occured")
|
||||
user = usersCollection.find_one({"email":email})
|
||||
userId = user['_id']
|
||||
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
|
||||
newExpiry = int(datetime.now().timestamp())
|
||||
newExpiry = newExpiry + 2678400
|
||||
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
|
||||
return jsonify(newToken)
|
||||
|
||||
# Api signup endpoint
|
||||
# Create account with user details and get permission based on access code
|
||||
# Arguments: username (required), email, password (required), access_code, displayname (required)
|
||||
@app.route('/api/signup', methods = ['POST'])
|
||||
def handleSignup():
|
||||
try:
|
||||
# Set user details
|
||||
username = request.json['username'].lower()
|
||||
email = request.json['email'].lower()
|
||||
password = request.json['password']
|
||||
passwordHash = ph.hash(password)
|
||||
creationDate = int(datetime.now().timestamp())
|
||||
accessCode = request.json['access_code']
|
||||
displayName = request.json['displayname']
|
||||
|
||||
# Check if details are taken
|
||||
sameUsername = usersCollection.count_documents({"username":username})
|
||||
sameEmail = usersCollection.count_documents({"email":email})
|
||||
if (sameUsername != 0 ) or ( sameEmail != 0):
|
||||
return jsonify("User already exists")
|
||||
|
||||
# Check for appropriate role
|
||||
codeFound = False
|
||||
if (settings["signup_mode"] == "none"):
|
||||
return jsonify("Signups have been disabled")
|
||||
elif (settings["signup_mode"] == "codeoptional"):
|
||||
for i in settings["signup_codes"]:
|
||||
if (i["code"] == accessCode):
|
||||
codeFound = True
|
||||
role = i["role"]
|
||||
if (codeFound == False):
|
||||
role = settings["default_role"]
|
||||
elif (settings["signup_mode"] == "nocode"):
|
||||
role = settings["default_role"]
|
||||
elif (settings["signup_mode"] == "coderequired"):
|
||||
for i in settings["signup_codes"]:
|
||||
if (i["code"] == accessCode):
|
||||
codeFound = True
|
||||
role = i["role"]
|
||||
if (codeFound == False):
|
||||
return jsonify("Code not found")
|
||||
|
||||
# Create user
|
||||
usersCollection.insert_one(
|
||||
{
|
||||
"_id": ObjectId(),
|
||||
"name":displayName,
|
||||
"username":username,
|
||||
"email":email,
|
||||
"permissions":settings["default_permissions"],
|
||||
"role":role,
|
||||
"password":passwordHash,
|
||||
"passkeys": [],
|
||||
"tokens": [],
|
||||
"creation_date": creationDate
|
||||
}
|
||||
)
|
||||
except:
|
||||
return jsonify("An error occured")
|
||||
|
||||
# Logout endpoint
|
||||
# Logs out user and removes token from db
|
||||
# Arguments: auth_token (cookie) (required)
|
||||
@app.route('/logout', methods = ['GET'])
|
||||
def logout():
|
||||
token = request.cookies.get('auth_token', 'none')
|
||||
try:
|
||||
token = request.json['remove_token']
|
||||
except:
|
||||
pass
|
||||
user = usersCollection.update_one({'tokens.token': token}, {"$pull":{'tokens':{'token':token}}})
|
||||
return render_template('logout.html', appName=appName)
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug = True)
|
||||
Reference in New Issue
Block a user