Source code for nginx_ldap_auth.app.models

from typing import ClassVar, cast

import bonsai
from bonsai.errors import (
    AuthenticationError,
    LDAPError,
)
from bonsai.utils import escape_filter_exp
from pydantic import BaseModel

from nginx_ldap_auth.ldap import (
    TimeLimitedAIOConnectionPool,
    TimeLimitedAIOLDAPConnection,
)
from nginx_ldap_auth.logging import logger
from nginx_ldap_auth.settings import Settings
from nginx_ldap_auth.types import LDAPObject


[docs]class UserManager: """ Manage users in the LDAP directory. """ #: The model class for users model: ClassVar[type["User"]] def __init__(self) -> None: #: The application settings self.settings = Settings() #: The LDAP connection pool self.pool: TimeLimitedAIOConnectionPool | None = None
[docs] def client(self) -> bonsai.LDAPClient: """ Return a new LDAP client instance. If :py:attr:`nginx_ldap_auth.settings.Settings.ldap_starttls` is ``True``, the client will be configured to use TLS. """ client = bonsai.LDAPClient( cast("str", self.settings.ldap_uri), tls=self.settings.ldap_starttls ) client.set_cert_policy("try" if self.settings.ldap_validate_cert else "never") client.set_ca_cert(self.settings.ldap_ca_cert_name) client.set_ca_cert_dir(str(self.settings.ldap_ca_cert_dir)) client.ignore_referrals = self.settings.ldap_disable_referrals client.set_server_chase_referrals(not self.settings.ldap_disable_referrals) client.set_async_connection_class(TimeLimitedAIOLDAPConnection) return client
[docs] async def create_pool(self) -> None: """ Create the LDAP connection pool and save it as :py:attr:`pool`. """ client = self.client() if self.settings.ldap_binddn and self.settings.ldap_password: client.set_credentials( "SIMPLE", user=self.settings.ldap_binddn, password=self.settings.ldap_password, ) self.pool = TimeLimitedAIOConnectionPool( self.settings, client, minconn=self.settings.ldap_min_pool_size, maxconn=self.settings.ldap_max_pool_size, expires=self.settings.ldap_pool_connection_lifetime_seconds, ) await self.pool.open()
[docs] async def authenticate(self, username: str, password: str) -> bool: """ Authenticate a user against the LDAP server. If :py:attr:`nginx_ldap_auth.settings.Settings.ldap_user_basedn` is set, we will prepend the username with that value to create the DN to bind with like so: "{username}{ldap_user_base_dn}. Otherwise, we will use the value of :py:attr:`nginx_ldap_auth.settings.Settings.ldap_username_attribute` to create the DN as ``{username_attribute}={username},{ldap_basedn}``. Args: username: the username to authenticate password: the password to authenticate with Raises: LDAPError: if an error occurs while communicating with the LDAP server Returns: ``True`` if the user is authenticated, ``False`` otherwise """ if self.settings.ldap_user_basedn: # This is AD and we need to use the userPrincipalName dn = f"{username}{self.settings.ldap_user_basedn}" else: dn = ( f"{self.settings.ldap_username_attribute}={username}," f"{self.settings.ldap_basedn}" ) client = self.client() client.set_credentials("SIMPLE", user=dn, password=password) logger.info( "ldap.authenticate", dn=dn, uri=self.settings.ldap_uri, ) try: await client.connect(is_async=True) except AuthenticationError as e: logger.error( "ldap.authenticate.error.invalid_credentials", dn=dn, uri=self.settings.ldap_uri, exc_info=str(e), ) return False except LDAPError: logger.exception("ldap.authenticate.exception", uid=username) raise return True
[docs] async def exists(self, username: str) -> bool: """ Return ``True`` if the user exists in the LDAP directory, ``False`` otherwise. Args: username: the username to check Raises: LDAPError: if an error occurred while communicating with the LDAP server AuthenticationError: if the LDAP server rejects the credentials of :py:class:`nginx_ldap_auth.settings.Settings.ldap_binddn` and :py:class:`nginx_ldap_auth.settings.Settings.ldap_password` Returns: ``True`` if the user exists in the LDAP directory, ``False`` otherwise """ return await self.get(username) is not None
[docs] async def is_authorized( self, username: str, ldap_authorization_filter: str | None ) -> bool: """ Test whether the user is authorized to log in. This is done by performing an LDAP search using the filter specified in a header or :py:class:`nginx_ldap_auth.settings.Settings.ldap_authorization_filter`. If the value is ``None``, the user is considered authorized. Args: username: the username to check ldap_authorization_filter: LDAP authorization filter (optional) Raises: LDAPError: if an error occurred while communicating with the LDAP server AuthenticationError: if the LDAP server rejects the credentials of :py:class:`nginx_ldap_auth.settings.Settings.ldap_binddn` and :py:class:`nginx_ldap_auth.settings.Settings.ldap_password` Returns: ``True`` if the user is authorized to log in, ``False`` otherwise. """ if not self.pool: await self.create_pool() pool = cast("TimeLimitedAIOConnectionPool", self.pool) logger.debug( "ldap.is_authorized", username=username, ldap_authorization_filter=ldap_authorization_filter, ) if ldap_authorization_filter is None: return True try: async with pool.spawn() as conn: results = await conn.search( base=self.settings.ldap_basedn, scope=bonsai.LDAPSearchScope.SUBTREE, filter_exp=ldap_authorization_filter.format( username_attribute=self.settings.ldap_username_attribute, fullname_attribute=self.settings.ldap_full_name_attribute, username=escape_filter_exp(username), ), attrlist=[self.settings.ldap_username_attribute], ) except AuthenticationError: logger.error( "ldap.is_authorized.error.invalid_credentials", bind_dn=self.settings.ldap_binddn, ) raise except LDAPError: logger.exception( "ldap.is_authorized.exception", bind_dn=self.settings.ldap_binddn, username=username, ) raise return len(results) > 0
[docs] async def get(self, username: str) -> "User | None": """ Get a user from the LDAP directory, and return it as a :py:class:`User`. When getting the user, we will use the LDAP search filter specified in :py:class:`nginx_ldap_auth.settings.Settings.ldap_get_user_filter`. Args: username: the username for which to get user information Raises: LDAPError: if an error occurred while communicating with the LDAP server AuthenticationError: if the LDAP server rejects the credentials of :py:class:`nginx_ldap_auth.settings.Settings.ldap_binddn` and :py:class:`nginx_ldap_auth.settings.Settings.ldap_password` Returns: The user information as a :py:class:`User` instance, or ``None`` if the user is not returned by the LDAP search filter """ if not self.pool: await self.create_pool() pool = cast("TimeLimitedAIOConnectionPool", self.pool) try: async with pool.spawn() as conn: results = await conn.search( base=self.settings.ldap_basedn, scope=bonsai.LDAPSearchScope.SUBTREE, filter_exp=self.settings.ldap_get_user_filter.format( username_attribute=self.settings.ldap_username_attribute, fullname_attribute=self.settings.ldap_full_name_attribute, username=escape_filter_exp(username), ), attrlist=[ self.settings.ldap_username_attribute, self.settings.ldap_full_name_attribute, ], ) except AuthenticationError: logger.error( "ldap.get_user.error.invalid_credentials", bind_dn=self.settings.ldap_binddn, ) raise except LDAPError: logger.exception( "ldap.get_user.exception", bind_dn=self.settings.ldap_binddn, username=username, ) raise if results: if len(results) > 1: logger.warning( "ldap.get_user.error.multiple_results", bind_dn=self.settings.ldap_binddn, username=username, dns=";".join([r[0] for r in results]), ) return self.model.parse_ldap(results[0]) return None
[docs] async def cleanup(self) -> None: """ Close the LDAP connection pool. """ if self.pool: await self.pool.close()
[docs]class User(BaseModel): """ Used to represent a user in the LDAP directory. It is constructed from the LDAP response, and is used to authenticate the user against the LDAP server. """ objects: ClassVar["UserManager"] = UserManager() #: The username of the user. uid: str #: The full name of the user. We really only use this for logging. full_name: str
[docs] async def authenticate(self, password: str) -> bool: """ Authenticate this user against the LDAP server. Args: password: the password to authenticate with Returns: ``True`` if the user is authenticated, ``False`` otherwise """ return await self.objects.authenticate(self.uid, password)
[docs] @classmethod def parse_ldap(cls, data: LDAPObject) -> "User": """ Parse the LDAP response, and extract the uid and full name from the LDAP server to use in constructing this class. We use :py:attr:`nginx_ldap_auth.settings.Settings.ldap_username_attribute` to determine which LDAP attribute on ``data`` holds our :py:attr:`uid` value, and :py:attr:`nginx_ldap_auth.settings.Settings.ldap_full_name_attribute` to determine which LDAP attribute holds our :py:attr:`full_name` value. Args: data: the raw LDAP data Returns: A configured :py:class:`User` object """ settings = Settings() username_attribute = settings.ldap_username_attribute fullname_attribute = settings.ldap_full_name_attribute kwargs = { "uid": data[username_attribute][0], "full_name": data[fullname_attribute][0], } logger.info("user.parse_ldap", **kwargs) return cls(**kwargs)
UserManager.model = User