Files
AiThingy/main.py

446 lines
17 KiB
Python

from flask import Flask, jsonify, request, render_template, Response
from flask_cors import CORS, cross_origin
from pymongo import MongoClient
from bson.objectid import ObjectId
from bson.json_util import dumps, loads
from datetime import datetime
from argon2 import PasswordHasher
import random
import string
import json
import requests
from urllib.parse import parse_qs
from ollama import chat
with open("config/settings.json", "r") as f:
settings = json.load(f)
appName = settings["branding"]["name"]
ph = PasswordHasher()
github_client_id = settings["github_oauth"]["client_id"]
github_client_secret = settings["github_oauth"]["client_secret"]
github_auth_endpoint = f"https://github.com/login/oauth/authorize?response_type=code&client_id={github_client_id}&scope=user"
github_token_endpoint = "https://github.com/login/oauth/access_token"
github_user_endpoint = "https://api.github.com/user"
mongoUser = settings["mongo_user"]
mongoPassword = settings["mongo_password"]
mongoHost = settings["mongo_host"]
mongoPort = int(settings["mongo_port"])
mongoDatabase = settings["mongo_db"]
mongoUri = f'mongodb://{mongoUser}:{mongoPassword}@{mongoHost}:{mongoPort}/'
print(mongoUri)
client = MongoClient(mongoUri)
mydb = client[mongoDatabase]
chatCollection = mydb.chats
usersCollection = mydb.users
try:
client.server_info()
print("Connected to MongoDB successfuly")
except Exception as e:
print("Error connecting to MongoDB:", e)
app = Flask(__name__)
CORS(app)
def checkUserPermission(token, permission):
# Find the correct user token in user db
user = usersCollection.find_one({'tokens.token': token}, {"_id":1,"tokens":{"$elemMatch": {"token":token}}, "permissions":1})
# If the user exists, continue, otherwise return fail
if (user):
# Convert _id to a string, python doesn't like ObjectId()
user['_id'] = str(user['_id'])
# Check if the token expiry is after the current date (Using unix timestamp, other mongodb Date datatype is a pain to use in python)
if (user['tokens'][0]['expiry'] > int(datetime.now().timestamp())):
# Store the userId
userId = user['_id']
if permission in user["permissions"]:
return True, userId
elif (permission == True):
return True, userId
else:
return False, "Incorrect permissions"
else:
return False, "Token is expired"
else:
return False, "Token doesn't exist"
def checkChatPermission(token, chatId, permission):
a, userId = checkUserPermission(token, True)
if (a == True):
# Get the chat from the chatId
returnedChat = chatCollection.find_one({'_id': ObjectId(chatId)})
# Convert chatId into string
returnedChat['_id'] = str(returnedChat['_id'])
if permission in returnedChat['permissions'][userId]:
return True, userId
elif (permission == True):
return True, userId
else:
return False, "Incorrect permissions"
else:
return False, "Invalid Token"
# Message Generation Endpoint
# Generate a new message on a specific chat
# Arguments: token (required), chatId (required), message
@app.route('/api/chat/<_id>/generate', methods =['POST'])
@cross_origin()
def generateMessage(_id):
# Get user auth token
token = request.headers['token']
a, userId = checkChatPermission(token, _id, "view")
if (a == True):
returnedChat = chatCollection.find_one({'_id': ObjectId(_id)})
message = request.json['message']
messages = returnedChat['messages']
messages.append({'role':'user', 'content':message})
stream = chat(
model=returnedChat['model'],
messages=messages,
stream=True
)
def generateStream():
response = ""
for chunk in stream:
print(chunk['message']['content'], end='', flush=True)
content = chunk['message']['content']
response += content
yield f"data: {response}\n\n"
return Response(generateStream(), mimetype='text/event-stream')
else:
return userId
# Chat List Endpoint:
# Get all the chats associated with a user
# Arguments: token (required)
@app.route('/api/user/chats', methods = ['GET'])
def getUserChats():
# Get user auth token
token = request.headers['token']
a, userId = checkUserPermission(token, True)
if (a == True):
returnedChats = list(chatCollection.find({'permissions.' + userId : "view"}))
chats = []
for doc in returnedChats:
if '_id' in doc and isinstance(doc['_id'], ObjectId):
doc['_id'] = str(doc['_id'])
chats.append(doc)
jsonChats = json.dumps(chats, indent=2)
return jsonChats
# Chat Details Endpoint:
# Get or change details about a chat using the chatId
# Arguments: token (required), details (required), model, name
@app.route('/api/chat/<_id>/details/<details>', methods = ['GET', 'POST'])
def getChatHistory(_id, details):
# Get user auth token
token = request.headers['token']
a, userId = checkChatPermission(token, _id, True)
if (a == True):
# If the user is trying to GET data
if (request.method == 'GET'):
# Get the chat from the chatId
returnedChat = chatCollection.find_one({'_id': ObjectId(_id)})
# Convert chatId into string
returnedChat['_id'] = str(returnedChat['_id'])
# Get chat permissions
a, userId = checkChatPermission(token, _id, "view")
if (a == True):
print("Chat " + _id + " has been found with token " + token)
# Check for detail type and return correct value from db
if (details == "history"):
return jsonify(returnedChat["messages"])
elif (details == "users"):
return jsonify(returnedChat["permissions"])
elif (details == "model"):
return jsonify(returnedChat["model"])
elif (details == "name"):
return jsonify(returnedChat["name"])
else:
return jsonify("Invalid Permissions")
else:
a, userId = checkChatPermission(token, _id, "view")
if (a == True):
# Check for the detail type and add data to db
if (details == "model"):
model = request.json['model']
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "model": model } })
if (details == "name"):
name = request.json['name']
chatCollection.update_one({'_id': ObjectId(_id)}, { "$set": { "name": name } })
return jsonify("Success")
else:
return jsonify("Invalid Permissions")
else:
return jsonify("User token is invalid")
# Chat creation endpoint
# Create a new chat
# Arguments: token (required), name (required), model (required)
@app.route('/api/chat/create', methods = ['POST'])
def createChat():
# Get user auth token
token = request.headers['token']
a, userId = checkUserPermission(token, "createChat")
if (a == True):
name = request.json['name']
model = request.json['model']
chatCollection.insert_one(
{
"name":name,
"model":model,
"permissions": {
userId:[
"owner",
"view",
"message",
"edit"
]
},
"messages": [
]
}
)
return jsonify("Success")
else:
return jsonify("User token is invalid")
# Signup page
# Returns html signup page
@app.route('/signup', methods = ['GET'])
def signup():
token = request.cookies.get('auth_token', 'none')
if (token == 'none'):
return render_template('signup.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"])
# Index page
# If logged in return home menu (Or logout if token is expired),
# Otherwise return login screen
@app.route('/', methods = ['GET'])
def index():
token = request.cookies.get('auth_token', 'none')
if (token == 'none'):
return render_template('login.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"])
else:
a, userId = checkUserPermission(token, True)
if (a == True):
return render_template('home.html', appName=appName)
else:
render_template('logout.html', appName=appName)
@app.route('/chat/<_id>', methods = ['GET'])
def chatPage(_id):
token = request.cookies.get('auth_token', 'none')
if (token == 'none'):
return render_template('login.html', appName=appName, githubUrl=github_auth_endpoint, githublogin=settings["github_oauth"]["enabled"], oauthlogin=settings["oauth_login"])
else:
a, userId = checkUserPermission(token, True)
if (a == True):
return render_template('chat.html', appName=appName, chatId=_id)
else:
render_template('logout.html', appName=appName)
# Login endpoint
# Api backend for login screen, check for user and returns token
# Arguments: username (required), password (required)
@app.route('/api/login', methods = ['POST'])
def handleLogin():
try:
username = request.json['username'].lower()
user = usersCollection.find_one({"$or":[{"username":username},{"email":username}]})
passwordHash = user["password"]
password = request.json['password']
loggedin = ph.verify(passwordHash, password)
userId = user['_id']
except:
return jsonify("Incorrect username or password")
if (loggedin):
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
newExpiry = int(datetime.now().timestamp())
newExpiry = newExpiry + 2678400
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
return jsonify(newToken)
# Github callback endpoint
# Either logs in the user or returns extra details signup page
# Arguments: code (required) (provided by github)
@app.route('/api/github/authorized', methods = ['GET'])
def handleGithubLogin():
code = request.args.get('code')
res = requests.post(
github_token_endpoint,
data=dict(
client_id=github_client_id,
client_secret=github_client_secret,
code=code,
),
)
res = parse_qs(res.content.decode("utf-8"))
token = res["access_token"][0]
user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {token}"))
for i in user_email.json():
if(i["primary"]==True):
email = i["email"]
try:
user = usersCollection.find_one({"email":user_email})
userId = user['_id']
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
newExpiry = int(datetime.now().timestamp())
newExpiry = newExpiry + 2678400
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
return jsonify(newToken)
except:
return render_template("oauthsignup.html", appName = appName, provider="github", accessToken=token)
# Oauth2.0 Account signup endpoint
# Creates account using extra details and oauth access token
# Arguments: code (required) (access token provided by idp), username (required), signupcode, display (required)
@app.route('/api/oauthsignup', methods = ['POST'])
def handleOauthSignup():
try:
user_email = requests.get("https://api.github.com/user/emails", headers=dict(Authorization=f"Bearer {request.json['code']}"))
for i in user_email.json():
if(i["primary"]==True):
email = i["email"]
# Set user details
username = request.json['username'].lower()
passwordHash = "a"
creationDate = int(datetime.now().timestamp())
accessCode = request.json['signupcode']
displayName = request.json['display']
# Check if details are taken
sameUsername = usersCollection.count_documents({"username":username})
sameEmail = usersCollection.count_documents({"email":email})
if (sameUsername != 0 ) or ( sameEmail != 0):
return jsonify("User already exists")
# Check for appropriate role
codeFound = False
if (settings["signup_mode"] == "none"):
return jsonify("Signups have been disabled")
elif (settings["signup_mode"] == "codeoptional"):
for i in settings["signup_codes"]:
if (i["code"] == accessCode):
codeFound = True
role = i["role"]
if (codeFound == False):
role = settings["default_role"]
elif (settings["signup_mode"] == "nocode"):
role = settings["default_role"]
elif (settings["signup_mode"] == "coderequired"):
for i in settings["signup_codes"]:
if (i["code"] == accessCode):
codeFound = True
role = i["role"]
if (codeFound == False):
return jsonify("Code not found")
# Create user
usersCollection.insert_one(
{
"_id": ObjectId(),
"name":displayName,
"username":username,
"email":email,
"permissions":settings["default_permissions"],
"role":role,
"password":passwordHash,
"passkeys": [],
"tokens": [],
"creation_date": creationDate
}
)
except:
return jsonify("An error occured")
user = usersCollection.find_one({"email":email})
userId = user['_id']
newToken = ''.join(random.choices(string.ascii_letters + string.digits, k=100))
newExpiry = int(datetime.now().timestamp())
newExpiry = newExpiry + 2678400
usersCollection.update_one({'_id':userId}, {"$addToSet":{'tokens':{'token':newToken, 'expiry':newExpiry}}})
return jsonify(newToken)
# Api signup endpoint
# Create account with user details and get permission based on access code
# Arguments: username (required), email, password (required), access_code, displayname (required)
@app.route('/api/signup', methods = ['POST'])
def handleSignup():
try:
# Set user details
username = request.json['username'].lower()
email = request.json['email'].lower()
password = request.json['password']
passwordHash = ph.hash(password)
creationDate = int(datetime.now().timestamp())
accessCode = request.json['access_code']
displayName = request.json['displayname']
# Check if details are taken
sameUsername = usersCollection.count_documents({"username":username})
sameEmail = usersCollection.count_documents({"email":email})
if (sameUsername != 0 ) or ( sameEmail != 0):
return jsonify("User already exists")
# Check for appropriate role
codeFound = False
if (settings["signup_mode"] == "none"):
return jsonify("Signups have been disabled")
elif (settings["signup_mode"] == "codeoptional"):
for i in settings["signup_codes"]:
if (i["code"] == accessCode):
codeFound = True
role = i["role"]
if (codeFound == False):
role = settings["default_role"]
elif (settings["signup_mode"] == "nocode"):
role = settings["default_role"]
elif (settings["signup_mode"] == "coderequired"):
for i in settings["signup_codes"]:
if (i["code"] == accessCode):
codeFound = True
role = i["role"]
if (codeFound == False):
return jsonify("Code not found")
# Create user
usersCollection.insert_one(
{
"_id": ObjectId(),
"name":displayName,
"username":username,
"email":email,
"permissions":settings["default_permissions"],
"role":role,
"password":passwordHash,
"passkeys": [],
"tokens": [],
"creation_date": creationDate
}
)
except:
return jsonify("An error occured")
# Logout endpoint
# Logs out user and removes token from db
# Arguments: auth_token (cookie) (required)
@app.route('/logout', methods = ['GET'])
def logout():
token = request.cookies.get('auth_token', 'none')
try:
token = request.headers['remove-token']
except:
pass
user = usersCollection.update_one({'tokens.token': token}, {"$pull":{'tokens':{'token':token}}})
return render_template('logout.html', appName=appName)
if __name__ == '__main__':
app.run(debug = True)