add user database code
This commit is contained in:
parent
f7b7918660
commit
15770f2879
13 changed files with 319 additions and 234 deletions
71
roc_fnb/website/database.py
Normal file
71
roc_fnb/website/database.py
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
from os import environ
|
||||
from typing import Optional, Self
|
||||
from pdb import set_trace as debugger
|
||||
|
||||
from bson.objectid import ObjectId
|
||||
import pymongo
|
||||
from pymongo import MongoClient
|
||||
|
||||
from roc_fnb.website.models.user import User
|
||||
|
||||
KEYLEN = 64
|
||||
with open('private-key.pem') as pk:
|
||||
PRIVATE_KEY = pk.read()
|
||||
with open('public-key.pem') as pk:
|
||||
PUBLIK_KEY = pk.read()
|
||||
|
||||
|
||||
class Database(MongoClient):
|
||||
@classmethod
|
||||
def from_env(cls) -> Self:
|
||||
return cls(
|
||||
host=environ.get('DATABASE_HOST', default='localhost'),
|
||||
port=int(environ.get('DATABASE_PORT', default=27017))
|
||||
)
|
||||
|
||||
@property
|
||||
def db(self) -> pymongo.database.Database:
|
||||
if (env := environ.get('ENV_MODE')) and env == 'production':
|
||||
return self.production
|
||||
else:
|
||||
return self.development
|
||||
|
||||
def store_user(self, user: User) -> User:
|
||||
"""Store the given user in the database, set _id on it, and return it"""
|
||||
result = self.db.users.insert_one(user.document)
|
||||
user._id = result.inserted_id
|
||||
return user
|
||||
|
||||
def get_user_by_email(self, email: str) -> Optional[User]:
|
||||
"""
|
||||
Return the user associated with the given email.
|
||||
|
||||
This does not imply authentiation
|
||||
"""
|
||||
if result := self.db.users.find_one({'email': email}):
|
||||
return User(**result)
|
||||
return None
|
||||
|
||||
def get_user_by_name(self, name: str) -> Optional[User]:
|
||||
if result := self.db.users.find_one({'name': name}):
|
||||
return User(**result)
|
||||
return None
|
||||
|
||||
def delete_user(self, _id: ObjectId):
|
||||
self.db.users.delete_one({'_id': _id})
|
||||
|
||||
def get_user_by_id(self, id: ObjectId) -> Optional[User]:
|
||||
if user := self.db.users.find_one({'_id': id}):
|
||||
return User(**user)
|
||||
return None
|
||||
|
||||
def get_user_from_token(self, token: str) -> Optional[User]:
|
||||
"""
|
||||
Verify a user and retreive a their full profile from the database.
|
||||
|
||||
This is like User.verify_jwt except it also fetches fields from the
|
||||
database which are not present in the client-visible token.
|
||||
"""
|
||||
if jwt_user := User.verify_jwt(token):
|
||||
return self.get_user_by_id(jwt_user._id)
|
||||
return None
|
||||
0
roc_fnb/website/models/__init__.py
Normal file
0
roc_fnb/website/models/__init__.py
Normal file
85
roc_fnb/website/models/test_user.py
Normal file
85
roc_fnb/website/models/test_user.py
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
import json
|
||||
from random import randbytes
|
||||
|
||||
from bson.objectid import ObjectId
|
||||
from pytest import fixture
|
||||
|
||||
from roc_fnb.util.base64 import base64_decode
|
||||
from roc_fnb.website.database import Database
|
||||
from roc_fnb.website.models.user import User
|
||||
|
||||
|
||||
@fixture
|
||||
def user() -> User:
|
||||
return User.create('test@t.co', 'name', 'monkey')
|
||||
|
||||
|
||||
@fixture
|
||||
def database() -> Database:
|
||||
return Database.from_env()
|
||||
|
||||
|
||||
def test_user_and_check_password(user):
|
||||
assert user.name == 'name'
|
||||
assert user.email == 'test@t.co'
|
||||
assert user._id is None
|
||||
assert user.check_password('monkey')
|
||||
|
||||
|
||||
def test_jwt(user):
|
||||
user._id = (_id := ObjectId(randbytes(12)))
|
||||
token = user.jwt
|
||||
header, payload, sig = (base64_decode(part.replace('.', ''))
|
||||
for part in token.split('.'))
|
||||
header = json.loads(header)
|
||||
payload = json.loads(payload)
|
||||
assert header['alg'] == 'RS256'
|
||||
assert header['typ'] == 'JWT'
|
||||
assert set(header.keys()) == {'alg', 'typ'}
|
||||
# Note that JWT contents are visible to the user: this can be useful but
|
||||
# must be done with caution
|
||||
assert payload['email'] == user.email
|
||||
assert payload['name'] == user.name
|
||||
assert ObjectId(base64_decode(payload['_id'])) == user._id == _id
|
||||
assert set(payload.keys()) == {'email', 'name', '_id', 'admin', 'moderator'}
|
||||
|
||||
result = user.verify_jwt(token)
|
||||
assert result.email == user.email
|
||||
assert result.name == user.name
|
||||
assert result._id == user._id == _id
|
||||
assert not result.admin
|
||||
assert not result.moderator
|
||||
|
||||
|
||||
def test_store_and_retreive(user: User, database: Database):
|
||||
try:
|
||||
database.store_user(user)
|
||||
assert user._id is not None
|
||||
retreived = database.get_user_by_email(user.email)
|
||||
assert retreived is not None
|
||||
assert retreived._id == user._id
|
||||
assert retreived == user
|
||||
finally:
|
||||
if id := user._id:
|
||||
database.delete_user(id)
|
||||
|
||||
|
||||
def test_store_and_retreive_by_id(user: User, database: Database):
|
||||
try:
|
||||
database.store_user(user)
|
||||
assert user._id is not None
|
||||
retreived = database.get_user_by_id(user._id)
|
||||
assert retreived == user
|
||||
finally:
|
||||
if id := user._id:
|
||||
database.delete_user(id)
|
||||
|
||||
def test_store_and_retreive_by_jwt(user: User, database: Database):
|
||||
try:
|
||||
token = database.store_user(user).jwt
|
||||
assert user._id is not None
|
||||
retreived = database.get_user_from_token(token)
|
||||
assert retreived == user
|
||||
finally:
|
||||
if id := user._id:
|
||||
database.delete_user(id)
|
||||
90
roc_fnb/website/models/user.py
Normal file
90
roc_fnb/website/models/user.py
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
from base64 import b64decode, b64encode
|
||||
from dataclasses import dataclass
|
||||
import json
|
||||
from random import randbytes
|
||||
from typing import Optional, Any
|
||||
|
||||
from bson.objectid import ObjectId
|
||||
import scrypt
|
||||
import jwt
|
||||
|
||||
from roc_fnb.util.base64 import base64_encode, base64_decode
|
||||
|
||||
with open('private-key.pem') as file:
|
||||
PRIVATE_KEY = file.read()
|
||||
|
||||
with open('public-key.pem') as file:
|
||||
PUBLIC_KEY = file.read()
|
||||
|
||||
@dataclass
|
||||
class JwtUser:
|
||||
_id: ObjectId
|
||||
email: str
|
||||
name: str
|
||||
moderator: bool
|
||||
admin: bool
|
||||
|
||||
@dataclass
|
||||
class User:
|
||||
_id: Optional[ObjectId]
|
||||
email: str
|
||||
name: str
|
||||
password_hash: bytes
|
||||
salt: bytes
|
||||
moderator: bool
|
||||
admin: bool
|
||||
|
||||
@classmethod
|
||||
def create(cls, email: str, name: str, password: str|bytes, moderator: bool = False, admin: bool = False):
|
||||
"""Alternate constructor which hashes a given password"""
|
||||
salt = randbytes(32)
|
||||
password_hash = scrypt.hash(password, salt)
|
||||
return cls(_id=None,
|
||||
email=email,
|
||||
name=name,
|
||||
password_hash=password_hash,
|
||||
salt=salt,
|
||||
moderator=moderator,
|
||||
admin=admin)
|
||||
|
||||
@property
|
||||
def document(self):
|
||||
doc = {
|
||||
"email": self.email,
|
||||
"name": self.name,
|
||||
"password_hash": self.password_hash,
|
||||
"salt": self.salt,
|
||||
"moderator": self.moderator,
|
||||
"admin": self.admin,
|
||||
}
|
||||
if self._id is not None:
|
||||
doc['_id'] = self._id
|
||||
return doc
|
||||
|
||||
@property
|
||||
def public_fields(self):
|
||||
return {
|
||||
'_id': base64_encode(self._id.binary),
|
||||
"email": self.email,
|
||||
"name": self.name,
|
||||
"moderator": self.moderator,
|
||||
"admin": self.admin,
|
||||
}
|
||||
|
||||
def check_password(self, password: str) -> bool:
|
||||
return self.password_hash == scrypt.hash(password, self.salt)
|
||||
|
||||
@property
|
||||
def jwt(self) -> str:
|
||||
return jwt.encode(self.public_fields, PRIVATE_KEY, algorithm='RS256')
|
||||
|
||||
@staticmethod
|
||||
def verify_jwt(token: str) -> JwtUser:
|
||||
verified = jwt.decode(token, PUBLIC_KEY, verify=True, algorithms=['RS256'])
|
||||
return JwtUser(
|
||||
_id=ObjectId(base64_decode(verified['_id'])),
|
||||
name=verified['name'],
|
||||
email=verified['email'],
|
||||
moderator=verified['moderator'],
|
||||
admin=verified['admin'],
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue