Source code for debusine.db.models.assets

# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for assets."""

import enum
from collections.abc import Generator, Iterable
from typing import Any, Self, TYPE_CHECKING, TypeAlias, cast

from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.db import models
from django.db.models import Q
from django.db.models.constraints import CheckConstraint, UniqueConstraint

from debusine.assets import (
    AssetCategory,
    BaseAssetDataModel,
    BasicAPTAuthenticationData,
    SigningKeyData,
    asset_data_model,
)
from debusine.db.constraints import JsonDataUniqueConstraint
from debusine.db.context import context
from debusine.db.models import permissions
from debusine.db.models.permissions import (
    Allow,
    PermissionUser,
    Role,
    enforce,
    permission_check,
    permission_filter,
)
from debusine.db.models.scopes import Scope
from debusine.db.models.workspaces import Workspace
from debusine.tasks.models import WorkerType
from debusine.utils.typing_utils import copy_signature_from

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta
else:
    TypedModelMeta = object


class AssetRoles(permissions.Roles, permissions.RoleBase, enum.ReprEnum):
    """Available roles for an Asset."""

    OWNER = "owner"


AssetRoles.setup()


class AssetUsageRoleBase(permissions.RoleBase):
    """AssetUsage role implementation."""

    implied_by_scope_roles: frozenset[Scope.Roles]
    implied_by_workspace_roles: frozenset[Workspace.Roles]
    implied_by_asset_usage_roles: frozenset["AssetUsageRoles"]

    def _setup(self) -> None:
        """Set up implications for a newly constructed role."""
        implied_by_scope_roles: set[Scope.Roles] = set()
        implied_by_workspace_roles: set[Workspace.Roles] = set()
        implied_by_asset_usage_roles: set[AssetUsageRoles] = {
            cast(AssetUsageRoles, self)
        }
        for i in self.implied_by:
            match i:
                case Workspace.Roles():
                    implied_by_scope_roles |= i.implied_by_scope_roles
                    implied_by_workspace_roles |= i.implied_by_workspace_roles
                case Role():
                    # Resolve a role passed during class definition into its
                    # enum instance
                    role = self.__class__(i.value)
                    implied_by_scope_roles |= role.implied_by_scope_roles
                    implied_by_workspace_roles |= (
                        role.implied_by_workspace_roles
                    )
                    implied_by_asset_usage_roles |= (
                        role.implied_by_asset_usage_roles
                    )
                case _:
                    raise ImproperlyConfigured(
                        f"AssetUsage roles do not support implications by {i!r}"
                    )
        self.implied_by_scope_roles = frozenset(implied_by_scope_roles)
        self.implied_by_workspace_roles = frozenset(implied_by_workspace_roles)
        self.implied_by_asset_usage_roles = frozenset(
            implied_by_asset_usage_roles
        )

    def q(self, user: PermissionUser) -> Q:
        """Return a Q expression to select asset usages with this role."""
        q = Q(
            roles__group__users=user,
            roles__role__in=self.implied_by_asset_usage_roles,
        )
        if self.implied_by_workspace_roles:
            q |= Q(
                workspace__in=Workspace.objects.filter(
                    Q(
                        roles__group__users=user,
                        roles__role__in=self.implied_by_workspace_roles,
                    )
                    | Q(
                        scope__in=Scope.objects.filter(
                            roles__group__users=user,
                            roles__role__in=self.implied_by_scope_roles,
                        )
                    )
                )
            )
        return q

    def implies(self, role: "AssetUsageRoles") -> bool:
        """Check if this role implies the given one."""
        return (
            self.implied_by_scope_roles <= role.implied_by_scope_roles
            and self.implied_by_workspace_roles
            <= role.implied_by_workspace_roles
            and self.implied_by_asset_usage_roles
            <= role.implied_by_asset_usage_roles
        )


class AssetUsageRoles(permissions.Roles, AssetUsageRoleBase, enum.ReprEnum):
    """Available roles for an AssetUsage."""

    SIGNER = Role("signer")
    REPOSITORY_SIGNER = Role(
        "repository_signer",
        label="Repository signer",
        implied_by=[SIGNER, Workspace.Roles.OWNER],
    )
    APT_AUTHENTICATOR = Role(
        "apt_authenticator",
        label="Can use APT authentication",
        implied_by=[Workspace.Roles.OWNER],
    )


AssetUsageRoles.setup()


class AssetQuerySet[A](models.QuerySet["Asset", A]):
    """Custom QuerySet for Asset."""

    def in_current_scope(self) -> "AssetQuerySet[A]":
        """Filter to assets in the current scope."""
        return self.filter(workspace__scope=context.require_scope())

    @permission_filter(workers=Allow.PASS, anonymous=Allow.PASS)
    def can_display(self, user: PermissionUser) -> "AssetQuerySet[A]":
        """Keep only Assets that can be displayed."""
        # Delegate to workspace can_display check
        return self.filter(
            workspace__in=Workspace.objects.can_display(user)
        ).exclude(category=AssetCategory.CLOUD_PROVIDER_ACCOUNT)

    @permission_filter()
    def can_manage_permissions(
        self, user: PermissionUser
    ) -> "AssetQuerySet[A]":
        """Filter to Assets that can be managed by user."""
        assert user is not None and user.is_authenticated
        return self.filter(
            roles__group__users=user, roles__role=AssetRoles.OWNER
        )


class AssetManager(models.Manager["Asset"]):
    """Manager for the Asset model."""

    def get_roles_model(self) -> type["AssetRole"]:
        """Get the model used for role assignment."""
        return AssetRole

    def get_queryset(self) -> AssetQuerySet[Any]:
        """Use the custom QuerySet."""
        return AssetQuerySet(self.model, using=self._db)

    def get_by_slug(
        self, category: str, slug: str, workspace: Workspace | None = None
    ) -> "Asset":
        """Return an asset with a matching slug."""
        match category:
            case AssetCategory.SIGNING_KEY:
                purpose, fingerprint = slug.split(":", 1)
                return self.get(
                    category=category,
                    data__purpose=purpose,
                    data__fingerprint=fingerprint,
                )
            case AssetCategory.APT_AUTHENTICATION:
                assert workspace is not None
                assets = self.filter(category=category)
                if ":" in slug:
                    workspace_name, name = slug.split(":", 1)
                    return assets.get(
                        workspace__scope=workspace.scope,
                        workspace__name=workspace_name,
                        data__name=name,
                    )
                else:
                    return assets.get(workspace=workspace, data__name=slug)
            case _:
                raise ValueError(f"No slug defined for category '{category}'")


class AssetUsageQuerySet[A](models.QuerySet["AssetUsage", A]):
    """Custom QuerySet for AssetUsage."""

    def with_role(self, user: PermissionUser, role: AssetUsageRoles) -> Self:
        """Keep only resources where the user has the given role."""
        if not user or not user.is_authenticated:
            return self.none()
        return self.filter(
            pk__in=self.model.objects.filter(role.q(user)).distinct()
        )

    @permission_filter(workers=Allow.PASS)
    def can_sign_with(self, user: PermissionUser) -> "AssetUsageQuerySet[A]":
        """Keep only AssetUsages that the user can sign with."""
        return self.with_role(user, AssetUsageRoles.SIGNER)

    @permission_filter(workers=Allow.PASS)
    def can_sign_repository_with(
        self, user: PermissionUser
    ) -> "AssetUsageQuerySet[A]":
        """Keep only AssetUsages that the user can sign a repository with."""
        return self.with_role(user, AssetUsageRoles.REPOSITORY_SIGNER)

    @permission_filter(workers=Allow.PASS)
    def can_use_apt_authentication_with(
        self, user: PermissionUser
    ) -> "AssetUsageQuerySet[A]":
        """Keep only AssetUsages that the user can use for APT auth."""
        return self.with_role(user, AssetUsageRoles.APT_AUTHENTICATOR)


class AssetUsageManager(models.Manager["AssetUsage"]):
    """Manager for the AssetUsage model."""

    def get_roles_model(self) -> type["AssetUsageRole"]:
        """Get the model used for role assignment."""
        return AssetUsageRole


[docs] class Asset(models.Model): """Asset model.""" category = models.CharField(max_length=255, choices=AssetCategory.choices) workspace = models.ForeignKey( Workspace, on_delete=models.PROTECT, blank=True, null=True ) data = models.JSONField(default=dict) created_at = models.DateTimeField(auto_now_add=True) created_by = models.ForeignKey( "User", blank=True, null=True, on_delete=models.PROTECT ) created_by_work_request = models.ForeignKey( "WorkRequest", blank=True, null=True, on_delete=models.SET_NULL ) Roles: TypeAlias = AssetRoles objects = AssetManager.from_queryset(AssetQuerySet)() class Meta(TypedModelMeta): base_manager_name = "objects" constraints = [ JsonDataUniqueConstraint( fields=["data->>'name'"], condition=models.Q( category=AssetCategory.CLOUD_PROVIDER_ACCOUNT ), nulls_distinct=False, name="%(app_label)s_%(class)s_unique_cloud_provider_acct_name", ), JsonDataUniqueConstraint( fields=["data->>'fingerprint'"], condition=models.Q(category=AssetCategory.SIGNING_KEY), nulls_distinct=False, name="%(app_label)s_%(class)s_unique_signing_key_fingerprints", ), CheckConstraint( check=( ~models.Q(category=AssetCategory.APT_AUTHENTICATION) | models.Q(data__name__regex=r"^[A-Za-z][A-Za-z0-9+._-]*$") ), name="%(app_label)s_%(class)s_apt_auth_name", ), JsonDataUniqueConstraint( fields=["workspace", "data->>'name'"], condition=models.Q(category=AssetCategory.APT_AUTHENTICATION), nulls_distinct=False, name="%(app_label)s_%(class)s_unique_apt_auth_workspace_name", ), # Some categories of asset can have null workspaces, but not # signing keys or APT authentication. CheckConstraint( check=~models.Q( category__in={ AssetCategory.SIGNING_KEY, AssetCategory.APT_AUTHENTICATION, } ) | models.Q(workspace__isnull=False), name="%(app_label)s_%(class)s_workspace_not_null", ), ] def __str__(self) -> str: """Return basic information of Asset.""" return ( f"Id: {self.id} " f"Category: {self.category} " f"Workspace: {self.workspace}" )
[docs] @copy_signature_from(models.Model.save) def save(self, **kwargs: Any) -> None: """Wrap save with permission checks.""" from debusine.db.context import context if context.permission_checks_disabled: pass elif self._state.adding: enforce(self.can_create) else: enforce(self.can_edit) return super().save(**kwargs)
[docs] def clean(self) -> None: """ Ensure that data is valid for this asset category. :raise ValidationError: for invalid data. """ self.data_model
@property def slug(self) -> str: """Return a string slug that uniquely identifies the asset.""" match self.category: case AssetCategory.SIGNING_KEY: data_model = self.data_model assert isinstance(data_model, SigningKeyData) return f"{data_model.purpose}:{data_model.fingerprint}" case AssetCategory.APT_AUTHENTICATION: assert self.workspace is not None data_model = self.data_model assert isinstance(data_model, BasicAPTAuthenticationData) return f"{self.workspace.name}:{data_model.name}" case _: raise NotImplementedError( f"No slug defined for category '{self.category}'" )
[docs] @permission_check("{user} cannot edit asset {resource}") def can_edit(self, user: PermissionUser) -> bool: """Can user edit this asset.""" assert user is not None and user.is_authenticated return self.roles.filter( group__users=user, role=AssetRoles.OWNER ).exists()
[docs] @permission_check( "{user} cannot create assets in {resource.workspace.scope}", workers=Allow.PASS, anonymous=Allow.PASS, ) def can_create(self, user: PermissionUser) -> bool: """Can user create this asset.""" from debusine.db.context import context # Allow signing workers to create Assets until we have delegated work # request permissions (#634) if context.worker_token and hasattr(context.worker_token, "worker"): if context.worker_token.worker.worker_type == WorkerType.SIGNING: return True if user is None or not user.is_authenticated: return False if not self.workspace: return False if self.category == AssetCategory.CLOUD_PROVIDER_ACCOUNT: # Not currently creatable through the API return False return any( r.implies(Workspace.Roles.OWNER) for r in Workspace.Roles.from_iterable( self.workspace.scope.get_group_roles(user) ) )
@property def data_model(self) -> BaseAssetDataModel: """Instantiate AssetData from data.""" if not isinstance(self.data, dict): raise ValidationError({"data": "data must be a dictionary"}) try: return asset_data_model(self.category, self.data) except ValueError as e: raise ValidationError( { "category": ( f"{self.category}: invalid asset category or data: {e}" ), }, ) from e
class AssetRole(models.Model): """Role assignment for assets.""" resource = models.ForeignKey( Asset, on_delete=models.CASCADE, related_name="roles", ) group = models.ForeignKey( "Group", on_delete=models.CASCADE, related_name="asset_roles", ) role = models.CharField(max_length=16, choices=AssetRoles.choices)
[docs] class AssetUsage(models.Model): """Usage of an Asset within a workspace.""" Roles: TypeAlias = AssetUsageRoles asset = models.ForeignKey( Asset, on_delete=models.CASCADE, related_name="usage", ) workspace = models.ForeignKey( "Workspace", on_delete=models.CASCADE, related_name="asset_usage", ) objects = AssetUsageManager.from_queryset(AssetUsageQuerySet)() class Meta(TypedModelMeta): base_manager_name = "objects" constraints = [ UniqueConstraint( fields=["asset", "workspace"], name="%(app_label)s_%(class)s_unique_asset_workspace", ) ]
[docs] def has_role(self, user: PermissionUser, role: AssetUsageRoles) -> bool: """Check if the user has the given role on this AssetUsage.""" return ( AssetUsage.objects.with_role(user, role).filter(pk=self.pk).exists() )
[docs] @permission_check("{user} cannot sign with {resource}", workers=Allow.PASS) def can_sign_with(self, user: PermissionUser) -> bool: """Check if the user can sign with this resource.""" assert user is not None and user.is_authenticated return self.has_role(user, AssetUsageRoles.SIGNER)
[docs] @permission_check( "{user} cannot sign a repository with {resource}", workers=Allow.PASS ) def can_sign_repository_with(self, user: PermissionUser) -> bool: """Check if the user can sign a repository with this resource.""" assert user is not None and user.is_authenticated return self.has_role(user, AssetUsageRoles.REPOSITORY_SIGNER)
[docs] @permission_check( "{user} cannot use {resource} for APT authentication", workers=Allow.PASS, ) def can_use_apt_authentication_with(self, user: PermissionUser) -> bool: """Check if the user can use APT authentication with this resource.""" assert user is not None and user.is_authenticated return self.has_role(user, AssetUsageRoles.APT_AUTHENTICATOR)
class AssetUsageRole(models.Model): """Role assignment for assets within a workspace.""" resource = models.ForeignKey( AssetUsage, on_delete=models.CASCADE, related_name="roles", ) group = models.ForeignKey( "Group", on_delete=models.CASCADE, related_name="asset_usage_roles", ) role = models.CharField(max_length=32, choices=AssetUsageRoles.choices) def get_public_keys(signing_keys: Iterable[Asset]) -> Generator[bytes]: """Yield public keys from each of some signing-key assets.""" for signing_key in signing_keys: signing_key_data = signing_key.data_model assert isinstance(signing_key_data, SigningKeyData) yield signing_key_data.public_key