Files
Outpost/db.py
2025-12-18 15:16:15 +00:00

236 lines
6.1 KiB
Python

import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from argon2 import PasswordHasher
import secrets
def logEvent(action, details, user_id, user_ip, user_agent, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("""
INSERT INTO logs (action, details, user_id, user_ip, user_agent)
VALUES (%s, %s, %s, %s, %s)
""", (action, details, user_id, user_ip, user_agent))
conn.commit()
cur.close()
conn.close()
except Exception as e:
print(f"Error logging event: {e}")
raise
def createGroup(name, parent, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
if parent:
cur.execute("SELECT id FROM groups WHERE name = %s", (parent,))
parent_record = cur.fetchone()
if parent_record:
parent_id = parent_record[0]
else:
parent_id = None
else:
parent_id = None
cur.execute("""
INSERT INTO groups (name, parent)
VALUES (%s, %s) RETURNING id
""", (name, parent_id))
group_id = cur.fetchone()[0]
conn.commit()
cur.close()
conn.close()
return group_id
except Exception as e:
print(f"Error creating group: {e}")
raise
def getGroupByName(name, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("SELECT id, name, parent, creation_date FROM groups WHERE name = %s", (name,))
group_record = cur.fetchone()
cur.close()
conn.close()
if group_record:
return {
"id": group_record[0],
"name": group_record[1],
"parent": group_record[2],
"creation_date": group_record[3]
}
else:
return None
except Exception as e:
print(f"Error retrieving group: {e}")
raise
def createUser(name, username, email, password, group, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
password_hash = PasswordHasher().hash(password)
cur.execute("""
INSERT INTO users (name, username, email, password_hash, group_id)
VALUES (%s, %s, %s, %s, %s) RETURNING id
""", (name, username, email, password_hash, group))
user_id = cur.fetchone()[0]
conn.commit()
cur.close()
conn.close()
return user_id
except Exception as e:
print(f"Error creating user: {e}")
raise
def createToken(user_id, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
# generate a secure random token id (URL-safe)
token_id = secrets.token_urlsafe(32)
cur.execute("""
INSERT INTO userTokens (id, owner_id)
VALUES (%s, %s) RETURNING id, creation_date, expiration_date
""", (token_id, user_id))
token_data = cur.fetchone()
conn.commit()
cur.close()
conn.close()
return token_data[0]
except Exception as e:
print(f"Error creating token: {e}")
raise
def loginUser(username, password, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("SELECT id, password_hash FROM users WHERE username = %s", (username,))
user_record = cur.fetchone()
cur.close()
conn.close()
if user_record:
user_id, password_hash = user_record
ph = PasswordHasher()
try:
ph.verify(password_hash, password)
return createToken(user_id, dbuser, dbpass, dbhost, dbname)
except:
return None
else:
return None
except Exception as e:
print(f"Error logging in user: {e}")
raise
def removeToken(token_id, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("DELETE FROM userTokens WHERE id = %s", (token_id,))
conn.commit()
cur.close()
conn.close()
return True
except Exception as e:
print(f"Error removing token: {e}")
return False
def verifyToken(token_id, dbuser, dbpass, dbhost, dbname):
try:
conn = psycopg2.connect(
host=dbhost,
user=dbuser,
password=dbpass,
database=dbname
)
cur = conn.cursor()
cur.execute("""
SELECT owner_id FROM userTokens
WHERE id = %s AND expiration_date > CURRENT_TIMESTAMP
""", (token_id,))
token_record = cur.fetchone()
cur.close()
conn.close()
if token_record:
return token_record[0]
else:
return None
except Exception as e:
print(f"Error verifying token: {e}")
return None