Files
Outpost/db.py

425 lines
11 KiB
Python

import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from argon2 import PasswordHasher
import secrets
def logEvent(action, details, user_id, user_ip, user_agent, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("""
INSERT INTO logs (action, details, user_id, user_ip, user_agent)
VALUES (%s, %s, %s, %s, %s)
""", (action, details, user_id, user_ip, user_agent))
conn.commit()
cur.close()
conn.close()
except Exception as e:
print(f"Error logging event: {e}")
raise
def createGroup(name, parent, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
if parent:
cur.execute("SELECT id FROM groups WHERE name = %s", (parent,))
parent_record = cur.fetchone()
if parent_record:
parent_id = parent_record[0]
else:
parent_id = None
else:
parent_id = None
cur.execute("""
INSERT INTO groups (name, parent)
VALUES (%s, %s) RETURNING id
""", (name, parent_id))
group_id = cur.fetchone()[0]
conn.commit()
cur.close()
conn.close()
return group_id
except Exception as e:
print(f"Error creating group: {e}")
raise
def getGroupByName(name, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("SELECT id, name, parent, creation_date FROM groups WHERE name = %s", (name,))
group_record = cur.fetchone()
cur.close()
conn.close()
if group_record:
return {
"id": group_record[0],
"name": group_record[1],
"parent": group_record[2],
"creation_date": group_record[3]
}
else:
return None
except Exception as e:
print(f"Error retrieving group: {e}")
raise
def getServiceById(id, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
print(id)
cur.execute("SELECT name, permissions, creation_date FROM services WHERE id = %s", (id,))
group_record = cur.fetchone()
cur.close()
conn.close()
if group_record:
return {
"name": group_record[0],
"permissions": group_record[1],
"creation_date": group_record[2]
}
else:
return None
except Exception as e:
print(f"Error retrieving group: {e}")
raise
def createUser(name, username, email, password, group, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
password_hash = PasswordHasher().hash(password)
cur.execute("""
INSERT INTO users (name, username, email, password_hash, group_id)
VALUES (%s, %s, %s, %s, %s) RETURNING id
""", (name, username, email, password_hash, group))
user_id = cur.fetchone()[0]
conn.commit()
cur.close()
conn.close()
return user_id
except Exception as e:
print(f"Error creating user: {e}")
raise
def createRequestToken(user_id, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
token_id = secrets.token_urlsafe(32)
cur.execute("""
INSERT INTO requestTokens (id, owner_id)
VALUES (%s, %s) RETURNING id, creation_date, expiration_date
""", (token_id, user_id))
token_data = cur.fetchone()
conn.commit()
cur.close()
conn.close()
return token_data[0]
except Exception as e:
print(f"Error creating token: {e}")
raise
def checkRequestToken(token_id, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("""
SELECT owner_id FROM requestTokens
WHERE id = %s AND expiration_date > CURRENT_TIMESTAMP
""", (token_id,))
token_record = cur.fetchone()
cur.close()
conn.close()
if token_record:
cur.execute("DELETE FROM requestTokens WHERE id = %s", (token_id,))
conn.commit()
return token_record[0]
else:
return None
except Exception as e:
print(f"Error verifying token: {e}")
return None
def getUserServiceData(user_id, service_id, record_key, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("""
SELECT permissions FROM services
WHERE id = %s
""", (service_id,))
service_perms = cur.fetchone()
print(service_perms)
cur.execute("""
SELECT value FROM userData
WHERE property = %s AND user_id = %s AND service_id = %s
""", (record_key, user_id, service_id))
service_record = cur.fetchone()
cur.close()
conn.close()
if service_record:
return service_record[0]
else:
return None
except Exception as e:
print(f"Error retrieving user service data: {e}")
raise
def createUserServiceData(key, value, service_id, user_id, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("""
INSERT INTO userData (user_id, service_id, property, value)
VALUES (%s, %s) RETURNING id
""", (user_id, service_id, key, value))
row = cur.fetchone()
conn.commit()
cur.close()
conn.close()
return "Success"
except Exception as e:
print(f"Error creating service: {e}")
raise
def createService(name, permissions, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("""
INSERT INTO services (name, permissions)
VALUES (%s, %s) RETURNING id, key
""", (name, permissions))
row = cur.fetchone()
serviceData = {
"id": row[0],
"key": row[1]
}
conn.commit()
cur.close()
conn.close()
return serviceData
except Exception as e:
print(f"Error creating service: {e}")
raise
def createToken(user_id, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
# generate a secure random token id (URL-safe)
token_id = secrets.token_urlsafe(32)
cur.execute("""
INSERT INTO userTokens (id, owner_id)
VALUES (%s, %s) RETURNING id, creation_date, expiration_date
""", (token_id, user_id))
token_data = cur.fetchone()
conn.commit()
cur.close()
conn.close()
return token_data[0]
except Exception as e:
print(f"Error creating token: {e}")
raise
def loginUser(username, password, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("SELECT id, password_hash FROM users WHERE username = %s", (username,))
user_record = cur.fetchone()
cur.close()
conn.close()
if user_record:
user_id, password_hash = user_record
ph = PasswordHasher()
try:
ph.verify(password_hash, password)
return createToken(user_id, dbuser, dbpass, dbhost, dbname)
except:
return None
else:
return None
except Exception as e:
print(f"Error logging in user: {e}")
raise
def removeToken(token_id, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("DELETE FROM userTokens WHERE id = %s", (token_id,))
conn.commit()
cur.close()
conn.close()
return True
except Exception as e:
print(f"Error removing token: {e}")
return False
def verifyToken(token_id, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("""
SELECT owner_id FROM userTokens
WHERE id = %s AND expiration_date > CURRENT_TIMESTAMP
""", (token_id,))
token_record = cur.fetchone()
cur.close()
conn.close()
if token_record:
return token_record[0]
else:
return None
except Exception as e:
print(f"Error verifying token: {e}")
return None