103 lines
3.8 KiB
Python
103 lines
3.8 KiB
Python
from datetime import datetime, UTC
|
|
from os import environ
|
|
from random import randbytes
|
|
from typing import Optional, Self
|
|
from pdb import set_trace as debugger
|
|
|
|
from bson.objectid import ObjectId
|
|
import pymongo
|
|
from pymongo import MongoClient
|
|
|
|
from roc_fnb.util.base64 import base64_encode
|
|
from roc_fnb.util import log
|
|
from roc_fnb.website.models.user import User, JwtUser
|
|
|
|
KEYLEN = 64
|
|
with open('private-key.pem') as pk:
|
|
PRIVATE_KEY = pk.read()
|
|
with open('public-key.pem') as pk:
|
|
PUBLIK_KEY = pk.read()
|
|
|
|
|
|
class Database(MongoClient):
|
|
@classmethod
|
|
def from_env(cls) -> Self:
|
|
return cls(
|
|
host=environ.get('DATABASE_HOST', default='localhost'),
|
|
port=int(environ.get('DATABASE_PORT', default=27017))
|
|
)
|
|
|
|
@property
|
|
def db(self) -> pymongo.database.Database:
|
|
if (env := environ.get('ENV_MODE')) and env == 'production':
|
|
return self.production
|
|
else:
|
|
return self.development
|
|
|
|
def store_user(self, user: User) -> User:
|
|
"""Store the given user in the database, set _id on it, and return it"""
|
|
result = self.db.users.insert_one(user.document)
|
|
user._id = result.inserted_id
|
|
return user
|
|
|
|
def get_user_by_email(self, email: str) -> Optional[User]:
|
|
"""
|
|
Return the user associated with the given email.
|
|
|
|
This does not imply authentiation
|
|
"""
|
|
if result := self.db.users.find_one({'email': email}):
|
|
return User(**result)
|
|
return None
|
|
|
|
def get_user_by_name(self, name: str) -> Optional[User]:
|
|
if result := self.db.users.find_one({'name': name}):
|
|
return User(**result)
|
|
return None
|
|
|
|
def delete_user(self, _id: ObjectId):
|
|
self.db.users.delete_one({'_id': _id})
|
|
|
|
def get_user_by_id(self, id: ObjectId) -> Optional[User]:
|
|
if user := self.db.users.find_one({'_id': id}):
|
|
return User(**user)
|
|
return None
|
|
|
|
def invite_new_user(self, inviter: JwtUser | User) -> bytes:
|
|
"""Save a new invite code to the database and return its raw bytes."""
|
|
if inviter_id := inviter._id:
|
|
invite_code = randbytes(32)
|
|
self.db.invitations.insert_one({
|
|
'created_at': datetime.now(UTC).timestamp(),
|
|
'invited_user_id': None,
|
|
'invite_code': invite_code,
|
|
'invited_by': inviter_id,
|
|
})
|
|
return invite_code
|
|
raise ValueError('unsaved user cannot create an invite')
|
|
|
|
def store_new_invitee(self, invite_code: bytes, invitee: User) -> User:
|
|
"""Store the new user if the given invite_code exists and is not already used."""
|
|
if invite := self.db.invitations.find_one({'invite_code': invite_code}):
|
|
if invite.get('invited_user_id') is None:
|
|
self.store_user(invitee)
|
|
self.db.invitations.update_one({'_id': invite['_id']}, {'$set': {'invited_user_id': invitee._id}})
|
|
return invitee
|
|
raise InvitationClaimed(invite, invitee)
|
|
raise InvitationNotFound(invite_code, invitee)
|
|
#
|
|
class InvitationClaimed(ValueError):
|
|
def __init__(self, invitation: dict, invitee: User):
|
|
self.invitation = invitation
|
|
self.invitiee = invitee
|
|
emsg = f'{invitee.name} tried to sign up with a used invite code: {base64_encode(invitation["invite_code"])}'
|
|
log.error(emsg, invitation=self.invitation, invitee=invitee)
|
|
super().__init__(emsg)
|
|
|
|
class InvitationNotFound(ValueError):
|
|
def __init__(self, invitation_code: bytes, invitee: User):
|
|
self.invitation_code = base64_encode(invitation_code)
|
|
self.invitiee = invitee
|
|
emsg = f'{invitee.name} tried to sign up with an invalid invite code: {self.invitation_code}'
|
|
log.error(emsg, invitation_code=self.invitation_code, invitee=invitee)
|
|
super().__init__(emsg) |