import copy import json import logging import threading import typing import uuid import requests from admin_auto_filters.filters import AutocompleteFilterFactory from django.conf import settings from django.contrib import admin from django.contrib.auth.models import Permission from django.db import models from django.utils.html import format_html from nested_admin import NestedModelAdmin, NestedStackedInline, NestedTabularInline from rangefilter.filters import DateRangeFilter from shared.admin import FieldsetInlineOrder, MarkdownxGuardedModelAdmin from .forms import ( ContractAdminForm, ContractFileAdminForm, SigneeAdminForm, SingnatureRepresentativeFormSet, ) from .models import ( Contract, ContractApproval, Contractee, ContracteeSignature, ContracteeSignatureRepresentative, ContractFile, ContractFilingArea, ContractIntent, ContractIssue, ContractType, Signee, SigneeSignature, SigneeSignatureRepresentative, ) logger = logging.getLogger() class IndexHiddenModelAdmin(MarkdownxGuardedModelAdmin): def has_module_permission(self, request): return False def permissions_mixin_factory( change_permission: str, delete_permission: str, obj_conditional: typing.Callable = lambda request, obj: True, ) -> object: class Mixin: def has_change_permission(self, request, obj=None) -> bool: if ( obj is not None and obj_conditional(request, obj) and not request.user.has_perm(change_permission) ): return False return super().has_change_permission(request, obj) def has_delete_permission(self, request, obj=None) -> bool: if ( obj is not None and obj_conditional(request, obj) and not request.user.has_perm(change_permission) ): return False return super().has_change_permission(request, obj) return Mixin get_obj_contract = lambda request, obj: obj.contract class OwnPermissionsMixin( permissions_mixin_factory( "contracts.edit_others", "contracts.delete_others", obj_conditional=lambda request, obj: obj.created_by != request.user, ) ): def save_model(self, request, obj, form, change): if obj.created_by is None: obj.created_by = request.user return super().save_model(request, obj, form, change) ParentContractApprovedPermissionsMixin = permissions_mixin_factory( "contracts.edit_when_approved", "contracts.delete_when_approved", obj_conditional=lambda request, obj: not get_obj_contract( obj ).is_editable_without_approve_permission, ) ParentContractOwnPermissionsMixin = permissions_mixin_factory( "contracts.edit_others", "contracts.delete_others", obj_conditional=lambda request, obj: get_obj_contract(obj).created_by != request.user, ) # BEGIN Contracts class ContractFileAdmin( IndexHiddenModelAdmin, ParentContractApprovedPermissionsMixin, ParentContractOwnPermissionsMixin, ): form = ContractFileAdminForm class ParentContractInlineMixin: def get_parent_object_is_editable(self, obj): return obj.is_editable_without_approve_permission def has_add_permission(self, request, obj=None): if ( obj is not None and not request.user.has_perm("contracts.edit_when_approved") and not self.get_parent_object_is_editable(obj) ): return False return super().has_add_permission(request, obj) def has_change_permission(self, request, obj=None): return self.has_add_permission(request, obj) def has_delete_permission(self, request, obj=None): if ( obj is not None and not request.user.has_perm("contracts.delete_when_approved") and not self.get_parent_object_is_editable(obj) ): return False return super().has_change_permission(request, obj) class ContracteeSignatureRepresentativeInline( ParentContractInlineMixin, NestedStackedInline ): model = ContracteeSignatureRepresentative extra = 1 formset = SingnatureRepresentativeFormSet def get_parent_object_is_editable(self, obj): if hasattr(obj, "contract"): return obj.contract.is_editable_without_approve_permission else: return obj.is_editable_without_approve_permission def get_formset(self, request, obj=None, **kwargs): formset = super().get_formset(request, obj, **kwargs) formset.request = request return formset class ContracteeSignatureInline(ParentContractInlineMixin, NestedStackedInline): model = ContracteeSignature autocomplete_fields = ("contractee",) inlines = (ContracteeSignatureRepresentativeInline,) extra = 0 class SigneeSignatureRepresentativeInline( ParentContractInlineMixin, NestedStackedInline ): def get_parent_object_is_editable(self, obj): if hasattr(obj, "contract"): return obj.contract.is_editable_without_approve_permission else: return obj.is_editable_without_approve_permission model = SigneeSignatureRepresentative extra = 0 class SigneeSignatureInline(ParentContractInlineMixin, NestedStackedInline): model = SigneeSignature autocomplete_fields = ("signee",) inlines = (SigneeSignatureRepresentativeInline,) extra = 0 class ContractFileInline(ParentContractInlineMixin, NestedTabularInline): model = ContractFile form = ContractFileAdminForm extra = 0 class ContractApprovalInline(ParentContractInlineMixin, NestedTabularInline): model = ContractApproval extra = 0 class ContractIntentInline(ParentContractInlineMixin, NestedTabularInline): model = ContractIntent extra = 0 class ContractAdmin( OwnPermissionsMixin, MarkdownxGuardedModelAdmin, NestedModelAdmin, ): form = ContractAdminForm search_fields = ("name",) readonly_fields = ( "created_by", "created_on", "updated_on", ) autocomplete_fields = ( "primary_contract", "types", "filing_area", "issues", ) inlines = ( ContractFileInline, ContracteeSignatureInline, SigneeSignatureInline, ContractApprovalInline, ContractIntentInline, ) def get_form(self, request, *args, **kwargs) -> ContractAdminForm: form = super().get_form(request, *args, **kwargs) form.current_user = request.user return form def get_fieldsets(self, request, obj=None) -> list: fieldsets = [ ( "Základní informace", { "fields": [ "name", "id_number", "types", "summary", "is_public", "is_valid", "status", "primary_contract", ] }, ), ( "Data", { "fields": [ "valid_start_date", "valid_end_date", ] }, ), ( "Náklady", { "fields": [ "cost_amount", "cost_unit", "cost_unit_other", ] }, ), ( "Odkazy", { "fields": [ "tender_url", ] }, ), ( "Fyzický dokument", { "fields": [ "paper_form_state", "paper_form_person_responsible", "filing_area", ] }, ), ( "Doplňující informace", { "fields": [ "issues", "notes", "created_by", "created_on", "updated_on", ] }, ), ] if ( obj is None # Creating confidential data, creator will be request.user or obj.created_by == request.user or request.user.has_perm("contracts.view_confidential") ): fieldsets[0][1]["fields"].insert( fieldsets[0][1]["fields"].index("is_public") + 1, "publishing_rejection_comment", ) return fieldsets fieldsets_and_inlines_order = [ FieldsetInlineOrder.FIELDSET, FieldsetInlineOrder.FIELDSET, FieldsetInlineOrder.FIELDSET, FieldsetInlineOrder.INLINE, FieldsetInlineOrder.INLINE, FieldsetInlineOrder.INLINE, FieldsetInlineOrder.FIELDSET, FieldsetInlineOrder.INLINE, FieldsetInlineOrder.INLINE, FieldsetInlineOrder.INLINE, FieldsetInlineOrder.FIELDSET, ] external_requests_timeout = 20 def get_queryset(self, request): queryset = super().get_queryset(request) if not request.user.has_perm("contracts.view_confidential"): # Allow user to view their own objects, even if not public queryset = queryset.filter( models.Q(is_public=True) | models.Q(created_by=request.user) ) if not request.user.has_perm("contracts.approve"): queryset = queryset.filter( models.Q(status=Contract.StatusTypes.APPROVED) | models.Q(created_by=request.user) ) return queryset def save_model(self, request, obj, form, change): is_new = obj.created_by is None # Need to generate primary keys first parent_save_response = super().save_model(request, obj, form, change) if obj.valid_start_date is None: last_signature_date = None for signature_set in ( obj.contractee_signatures.all(), obj.signee_signatures.all(), ): for signature in signature_set: if ( last_signature_date is None or last_signature_date < signature.date ): last_signature_date = signature.date obj.valid_start_date = last_signature_date from users.models import User headers = { "Authorization": f"Token {Contract.settings.nastenka_api_key}", "Content-Type": "application/json", } if not is_new and obj.status == obj.StatusTypes.WORK_IN_PROGRESS: try: user = User.objects.filter(id=obj.created_by.id).first() notice = requests.post( settings.NASTENKA_API_URL, data=json.dumps( { "name": f"Smlouva vrácena k úpravě - {obj.name}", "description": ( obj.summary if obj.summary not in (None, "") else "Bez popisu." ), "contract_id": obj.id, "sso_ids": [user.sso_id], } ), headers=headers, timeout=self.external_requests_timeout, ) notice.raise_for_status() notice = notice.json() obj.work_in_progress_nastenka_notice_id = uuid.UUID(notice["id"]) obj.save() if obj.after_approval_nastenka_notice_id is not None: requests.post( f"{settings.NASTENKA_API_URL}/{obj.after_approval_nastenka_notice_id}/all-hidden", headers=headers, timeout=self.external_requests_timeout, ).raise_for_status() obj.after_approval_nastenka_notice_id = None obj.save() if obj.to_be_approved_nastenka_notice_id is not None: requests.post( f"{settings.NASTENKA_API_URL}/{obj.to_be_approved_nastenka_notice_id}/all-hidden", headers=headers, timeout=self.external_requests_timeout, ).raise_for_status() obj.to_be_approved_nastenka_notice_id = None obj.save() except Exception as exception: logger.error( 'Failed to send out "contract returned for editing" Nástěnka notification: %s', str(exception), ) elif is_new or obj.status != form.initial["status"]: if obj.status == obj.StatusTypes.TO_BE_APPROVED: try: sso_ids = [] for user in User.objects.filter( models.Q(is_staff=True) & ~models.Q(sso_id="") ).all(): if user.has_perm("contracts.approve"): sso_ids.append(user.sso_id) notice = requests.post( settings.NASTENKA_API_URL, data=json.dumps( { "name": f"Smlouva ke schválení - {obj.name}", "description": ( obj.summary if obj.summary not in (None, "") else "Bez popisu." ), "contract_id": obj.id, "sso_ids": sso_ids, } ), headers=headers, timeout=self.external_requests_timeout, ) notice.raise_for_status() notice = notice.json() obj.to_be_approved_nastenka_notice_id = uuid.UUID(notice["id"]) obj.save() if obj.work_in_progress_nastenka_notice_id is not None: requests.post( f"{settings.NASTENKA_API_URL}/{obj.work_in_progress_nastenka_notice_id}/all-hidden", headers=headers, timeout=self.external_requests_timeout, ).raise_for_status() obj.work_in_progress_nastenka_notice_id = None obj.save() except Exception as exception: logger.error( 'Failed to send out notice "to be approved" Nástěnka notification: %s', str(exception), ) elif obj.status == obj.StatusTypes.APPROVED: try: user = User.objects.filter(id=obj.created_by.id).first() notice = requests.post( settings.NASTENKA_API_URL, data=json.dumps( { "name": f"Smlouva schválena - {obj.name}", "description": ( obj.summary if obj.summary not in (None, "") else "Bez popisu." ), "contract_id": obj.id, "sso_ids": [user.sso_id], } ), headers=headers, timeout=self.external_requests_timeout, ) notice.raise_for_status() notice = notice.json() obj.after_approval_nastenka_notice_id = uuid.UUID(notice["id"]) obj.save() if obj.to_be_approved_nastenka_notice_id is not None: requests.post( f"{settings.NASTENKA_API_URL}/{obj.to_be_approved_nastenka_notice_id}/all-hidden", headers=headers, timeout=self.external_requests_timeout, ).raise_for_status() obj.to_be_approved_nastenka_notice_id = None obj.save() except Exception as exception: logger.error( 'Failed to send out "contract approved" Nástěnka notification: %s', str(exception), ) elif obj.status == obj.StatusTypes.REJECTED: try: user = User.objects.filter(id=obj.created_by.id).first() notice = requests.post( settings.NASTENKA_API_URL, data=json.dumps( { "name": f"Smlouva zamítnuta - {obj.name}", "description": ( obj.summary if obj.summary not in (None, "") else "Bez popisu." ), "contract_id": obj.id, "sso_ids": [user.sso_id], } ), headers=headers, timeout=self.external_requests_timeout, ) notice.raise_for_status() notice = notice.json() obj.after_approval_nastenka_notice_id = uuid.UUID(notice["id"]) obj.save() if obj.to_be_approved_nastenka_notice_id is not None: requests.post( f"{settings.NASTENKA_API_URL}/{obj.to_be_approved_nastenka_notice_id}/all-hidden", headers=headers, timeout=self.external_requests_timeout, ).raise_for_status() obj.to_be_approved_nastenka_notice_id = None obj.save() except Exception as exception: logger.error( 'Failed to send out "contract rejected" Nástěnka notification: %s', str(exception), ) return parent_save_response def get_readonly_fields(self, request, obj=None): readonly_fields = [] fields = [] for fieldset in self.get_fieldsets(request): fields += fieldset[1]["fields"] if ( obj is not None and obj.created_by == request.user and obj.status == obj.StatusTypes.TO_BE_APPROVED and not request.user.has_perm("contracts.edit_when_approved") ): fields.remove("status") return fields return list(self.readonly_fields) def has_change_permission(self, request, obj=None): if ( obj is not None and not request.user.has_perm("contracts.edit_when_approved") and obj.status == obj.StatusTypes.APPROVED ): return False return super().has_change_permission(request, obj) def has_delete_permission(self, request, obj=None): if ( obj is not None and not request.user.has_perm("contracts.delete_when_approved") and not obj.is_editable_without_approve_permission ): return False return super().has_delete_permission(request, obj) list_filter = ( AutocompleteFilterFactory("Typ", "types"), AutocompleteFilterFactory("Spisovna", "filing_area"), AutocompleteFilterFactory("Problém", "issues"), AutocompleteFilterFactory( "Naše smluvní strana", "contractee_signatures__contractee" ), AutocompleteFilterFactory("Jiná smluvní strana", "signee_signatures__signee"), "status", "is_valid", "is_public", "paper_form_state", ("all_parties_sign_date", DateRangeFilter), ("valid_start_date", DateRangeFilter), ("valid_end_date", DateRangeFilter), ) list_display = ( "name", "status", "is_public", "created_on", ) class ContractTypeAdmin(MarkdownxGuardedModelAdmin): model = ContractType search_fields = ("name",) class ContractIssueAdmin(MarkdownxGuardedModelAdmin): model = ContractIssue search_fields = ("name",) class ContractFilingAreaAdmin(MarkdownxGuardedModelAdmin): model = ContractFilingArea search_fields = ( "name", "person_responsible", ) # END Contracts # BEGIN Signing parties class ContracteeAdmin(OwnPermissionsMixin, MarkdownxGuardedModelAdmin): model = Contractee fields = ( "name", "address_street_with_number", "address_district", "address_zip", "address_country", "ico_number", "department", ) search_fields = ( "name", "department", ) class SigneeAdmin(OwnPermissionsMixin, MarkdownxGuardedModelAdmin): model = Signee fields = ( "name", "entity_type", "address_street_with_number", "address_district", "address_zip", "address_country", "ico_number", "date_of_birth", "department", ) search_fields = ( "name", "department", ) form = SigneeAdminForm readonly_fields = ("load_ares_data_button",) list_filter = ("entity_type",) list_display = ("name", "entity_type") def get_fields(self, request, obj=None): fields = [ "name", "entity_type", "ico_number", "department", ] if ( obj is None # Creating or obj.entity_has_public_address or request.user.has_perm("contracts.view_confidential") ): entity_type_index = fields.index("entity_type") + 1 fields[entity_type_index:entity_type_index] = [ "address_street_with_number", "address_district", "address_zip", "address_country", ] fields.insert( fields.index("department"), "date_of_birth", ) if obj is None or request.user.has_perm("contracts.edit_signee"): fields.insert(fields.index("ico_number"), "load_ares_data_button") return fields def load_ares_data_button(self, obj): return format_html( '<button type="button" id="load_ares_data">Načíst data</button>' ) load_ares_data_button.allow_tags = True load_ares_data_button.short_description = "ARES" get_obj_signee_contract = lambda request, obj: obj.signee.contract get_obj_contractee_contract = lambda request, obj: obj.contractee.contract class SigneeSignatureRepresentativeAdmin( IndexHiddenModelAdmin, permissions_mixin_factory( "contracts.edit_when_approved", "contracts.delete_when_approved", obj_conditional=lambda request, obj: not get_obj_signee_contract( obj ).is_editable_without_approve_permission, ), permissions_mixin_factory( "contracts.edit_others", "contracts.delete_others", obj_conditional=lambda request, obj: get_obj_contractee_contract(obj).created_by != request.user, ), ): pass class ContracteeSignatureRepresentativeAdmin( IndexHiddenModelAdmin, permissions_mixin_factory( "contracts.edit_when_approved", "contracts.delete_when_approved", obj_conditional=lambda request, obj: not get_obj_contractee_contract( obj ).is_editable_without_approve_permission, ), permissions_mixin_factory( "contracts.edit_others", "contracts.delete_others", obj_conditional=lambda request, obj: get_obj_contractee_contract(obj).created_by != request.user, ), ): pass # END Signing parties class ContractSubmodelAdmin( IndexHiddenModelAdmin, ParentContractApprovedPermissionsMixin, ParentContractOwnPermissionsMixin, ): pass admin.site.register(Permission) for model in ( SigneeSignature, ContracteeSignature, ContractApproval, ContractIntent, ): admin.site.register(model, ContractSubmodelAdmin) admin.site.register(SigneeSignatureRepresentative, SigneeSignatureRepresentativeAdmin) admin.site.register( ContracteeSignatureRepresentative, ContracteeSignatureRepresentativeAdmin ) admin.site.register(ContractType, ContractTypeAdmin) admin.site.register(ContractIssue, ContractIssueAdmin) admin.site.register(ContractFile, ContractFileAdmin) admin.site.register(ContractFilingArea, ContractFilingAreaAdmin) admin.site.register(Contractee, ContracteeAdmin) admin.site.register(Signee, SigneeAdmin) admin.site.register(Contract, ContractAdmin)