Files
Refactored-App/backend/apps/api/views.py
2026-04-01 03:20:54 +02:00

1082 lines
47 KiB
Python

import json
import secrets
from functools import wraps
from decimal import Decimal
from datetime import timedelta
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.hashers import make_password
from django.db import models
from django.db.models import Count, Q, Sum
from django.http import HttpRequest, JsonResponse
from django.utils import timezone
from django.views.decorators.csrf import csrf_exempt, ensure_csrf_cookie
from django.views.decorators.http import require_GET, require_http_methods
from apps.accounts.models import AllowedDevice, DeviceRegistrationToken, DomainPermission, Role, User, UserBusinessAccess
from apps.core.models import Business, Category, Product, ProductCategory, Vendor, VendorBusiness, VendorCategory
from apps.notifications.models import Notification
from apps.operations.models import Event, InventoryBalance, Invoice, ShiftAssignment, ShiftRole, ShiftTemplate
from apps.operations.services import build_invoice_payload, create_invoice_from_payload, update_invoice_from_payload
from apps.reporting.models import DailyRevenueSummary
def _json_body(request: HttpRequest) -> dict:
return json.loads(request.body.decode("utf-8") or "{}")
def _money(value) -> float:
return float(value or Decimal("0"))
def _bad_request(message: str) -> JsonResponse:
return JsonResponse({"detail": message}, status=400)
def _forbidden(message: str = "Permission denied", missing_permissions: list[str] | None = None) -> JsonResponse:
payload = {"detail": message}
if missing_permissions is not None:
payload["missing_permissions"] = missing_permissions
return JsonResponse(payload, status=403)
def api_login_required(view_func):
@wraps(view_func)
def _wrapped(request: HttpRequest, *args, **kwargs):
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
return view_func(request, *args, **kwargs)
return _wrapped
def require_permissions(*required_permissions: str):
def decorator(view_func):
@wraps(view_func)
def _wrapped(request: HttpRequest, *args, **kwargs):
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if request.user.is_superuser:
return view_func(request, *args, **kwargs)
granted = set(request.user.permission_keys())
missing = [permission for permission in required_permissions if permission not in granted]
if missing:
return JsonResponse(
{"detail": "Permission denied", "missing_permissions": missing},
status=403,
)
return view_func(request, *args, **kwargs)
return _wrapped
return decorator
def _user_payload(user) -> dict:
return {
"id": user.id,
"username": user.username,
"display_name": user.display_name,
"role": user.role.name if user.role_id else None,
"is_superuser": user.is_superuser,
"permission_keys": user.permission_keys(),
"allowed_business_ids": list(user.business_links.values_list("business_id", flat=True)),
}
def _allowed_business_ids(user: User) -> set[int]:
if user.is_superuser:
return set(Business.objects.filter(is_active=True).values_list("id", flat=True))
return set(user.business_links.values_list("business_id", flat=True))
def _ensure_business_access(request: HttpRequest, business_id: int | None) -> JsonResponse | None:
if business_id is None or request.user.is_superuser:
return None
if business_id not in _allowed_business_ids(request.user):
return _forbidden("Business access denied")
return None
def _scoped_vendors_queryset(user: User, queryset=None):
qs = queryset if queryset is not None else Vendor.objects.all()
if user.is_superuser:
return qs
return qs.filter(business_links__business_id__in=_allowed_business_ids(user)).distinct()
def _scoped_categories_queryset(user: User, queryset=None):
qs = queryset if queryset is not None else Category.objects.all()
if user.is_superuser:
return qs
return qs.filter(vendor_links__vendor__business_links__business_id__in=_allowed_business_ids(user)).distinct()
def _scoped_products_queryset(user: User, queryset=None):
qs = queryset if queryset is not None else Product.objects.all()
if user.is_superuser:
return qs
return qs.filter(
category_links__category__vendor_links__vendor__business_links__business_id__in=_allowed_business_ids(user)
).distinct()
def _scoped_invoices_queryset(user: User, queryset=None):
qs = queryset if queryset is not None else Invoice.objects.all()
if user.is_superuser:
return qs
return qs.filter(business_id__in=_allowed_business_ids(user))
def _vendor_is_accessible(user: User, vendor: Vendor) -> bool:
if user.is_superuser:
return True
return vendor.business_links.filter(business_id__in=_allowed_business_ids(user)).exists()
def _validate_invoice_payload_access(request: HttpRequest, data: dict) -> JsonResponse | None:
business_id = data.get("business_id")
if business_id not in (None, ""):
if denied := _ensure_business_access(request, int(business_id)):
return denied
vendor_id = data.get("vendor_id")
if vendor_id in (None, ""):
return _bad_request("vendor_id is required")
try:
vendor = Vendor.objects.prefetch_related("business_links").get(pk=int(vendor_id))
except Vendor.DoesNotExist:
return _bad_request("Unknown vendor_id")
if not _vendor_is_accessible(request.user, vendor):
return _forbidden("Vendor access denied")
if not request.user.is_superuser:
category_ids = [int(category_id) for category_id in data.get("category_ids", [])]
allowed_category_ids = set(
_scoped_categories_queryset(request.user).filter(id__in=category_ids).values_list("id", flat=True)
)
if len(allowed_category_ids) != len(set(category_ids)):
return _forbidden("Category access denied")
product_ids = [int(item["product_id"]) for item in data.get("line_items", []) if item.get("product_id")]
allowed_product_ids = set(
_scoped_products_queryset(request.user).filter(id__in=product_ids).values_list("id", flat=True)
)
if len(allowed_product_ids) != len(set(product_ids)):
return _forbidden("Product access denied")
return None
def _vendor_payload(vendor: Vendor) -> dict:
return {
"id": vendor.id,
"legacy_id": vendor.legacy_id,
"name": vendor.name,
"vat_number": vendor.vat_number,
"registration_id": vendor.registration_id,
"contact_email": vendor.contact_email,
"contact_phone": vendor.contact_phone,
"address": vendor.address,
"notes": vendor.notes,
"is_active": vendor.is_active,
"business_ids": list(vendor.business_links.values_list("business_id", flat=True)),
"category_ids": list(vendor.category_links.values_list("category_id", flat=True)),
}
def _business_summary_payload(business: Business) -> dict:
invoice_qs = Invoice.objects.filter(business=business).exclude(payment_status="void")
revenue_qs = DailyRevenueSummary.objects.filter(business=business)
inventory_qs = InventoryBalance.objects.filter(
product__category_links__category__vendor_links__vendor__business_links__business=business
).distinct()
recent_invoices = [
{
"id": invoice.id,
"vendor_name": invoice.vendor.name,
"invoice_number": invoice.invoice_number,
"gross_total": _money(invoice.gross_total),
"payment_status": invoice.payment_status,
"due_date": invoice.due_date.isoformat() if invoice.due_date else None,
}
for invoice in invoice_qs.select_related("vendor").order_by("-created_at")[:5]
]
return {
"business": {
"id": business.id,
"legacy_id": business.legacy_id,
"name": business.name,
"short_code": business.short_code,
"currency": business.currency,
},
"stats": {
"invoice_count": invoice_qs.count(),
"outstanding_invoices": invoice_qs.filter(payment_status="unpaid").count(),
"total_expenses": _money(invoice_qs.aggregate(total=Sum("gross_total"))["total"]),
"total_revenue": _money(revenue_qs.aggregate(total=Sum("sales_revenue"))["total"]),
"total_vat": _money(revenue_qs.aggregate(total=Sum("vat_total"))["total"]),
"inventory_items": inventory_qs.count(),
},
"recent_revenue": [
{
"business_date": row.business_date.isoformat(),
"sales_revenue": _money(row.sales_revenue),
"food_revenue": _money(row.food_revenue),
"alcohol_revenue": _money(row.alcohol_revenue),
"tips_payable": _money(row.tips_payable),
"vat_total": _money(row.vat_total),
}
for row in revenue_qs.order_by("-business_date")[:14]
][::-1],
"recent_invoices": recent_invoices,
}
def _event_payload(event: Event) -> dict:
return {
"id": event.id,
"legacy_id": event.legacy_id,
"business_id": event.business_id,
"business_name": event.business.name,
"title": event.title,
"description": event.description,
"event_type": event.event_type,
"start_datetime": event.start_datetime.isoformat(),
"end_datetime": event.end_datetime.isoformat(),
"all_day": event.all_day,
"location": event.location,
"color": event.color,
"recurrence_type": event.recurrence_type,
"recurrence_end_date": event.recurrence_end_date.isoformat() if event.recurrence_end_date else None,
"created_by": event.created_by.display_name or event.created_by.username if event.created_by else None,
"is_active": event.is_active,
}
def _user_admin_payload(user: User) -> dict:
return {
"id": user.id,
"username": user.username,
"display_name": user.display_name,
"email": user.email,
"is_active": user.is_active,
"is_superuser": user.is_superuser,
"role_id": user.role_id,
"role_name": user.role.name if user.role_id else None,
"last_login": user.last_login.isoformat() if user.last_login else None,
"last_login_ip": user.last_login_ip,
"business_ids": list(user.business_links.values_list("business_id", flat=True)),
}
def _device_payload(device: AllowedDevice) -> dict:
return {
"id": device.id,
"ip_address": device.ip_address,
"label": device.label,
"user_agent": device.user_agent,
"registered_at": device.registered_at.isoformat() if device.registered_at else None,
"last_seen_at": device.last_seen_at.isoformat() if device.last_seen_at else None,
"is_active": device.is_active,
"ipv6_prefix": device.ipv6_prefix,
"device_token": device.device_token,
"known_ips": [ip.strip() for ip in device.known_ips.split(",") if ip.strip()],
}
def _device_token_payload(token: DeviceRegistrationToken) -> dict:
return {
"id": token.id,
"token": token.token,
"label": token.label,
"created_at": token.created_at.isoformat(),
"expires_at": token.expires_at.isoformat(),
"used_at": token.used_at.isoformat() if token.used_at else None,
"used_by_ip": token.used_by_ip,
"created_by": token.created_by.display_name or token.created_by.username if token.created_by else None,
}
@ensure_csrf_cookie
@require_GET
def csrf_view(request: HttpRequest) -> JsonResponse:
return JsonResponse({"detail": "CSRF cookie set"})
@csrf_exempt
@require_http_methods(["POST"])
def login_view(request: HttpRequest) -> JsonResponse:
data = _json_body(request)
user = authenticate(request, username=data.get("username"), password=data.get("password"))
if user is None or not user.is_active:
return JsonResponse({"detail": "Invalid credentials"}, status=401)
login(request, user)
return JsonResponse({"user": _user_payload(user)})
@require_http_methods(["POST"])
def logout_view(request: HttpRequest) -> JsonResponse:
logout(request)
return JsonResponse({"detail": "Logged out"})
@api_login_required
@require_GET
def me_view(request: HttpRequest) -> JsonResponse:
return JsonResponse({"user": _user_payload(request.user)})
@require_permissions("dashboard.view")
@require_GET
def businesses_view(request: HttpRequest) -> JsonResponse:
allowed_business_ids = _allowed_business_ids(request.user)
businesses = [
{
"id": business.id,
"legacy_id": business.legacy_id,
"name": business.name,
"short_code": business.short_code,
"currency": business.currency,
}
for business in Business.objects.filter(is_active=True, id__in=allowed_business_ids).order_by("name")
]
return JsonResponse({"results": businesses})
@require_permissions("products.view")
@require_GET
def products_view(request: HttpRequest) -> JsonResponse:
search = request.GET.get("q", "").strip()
products = _scoped_products_queryset(request.user, Product.objects.filter(is_active=True))
if search:
products = products.filter(models.Q(name__icontains=search) | models.Q(gtin__icontains=search))
results = [
{
"id": product.id,
"legacy_id": product.legacy_id,
"name": product.name,
"gtin": product.gtin,
"vat_rate": _money(product.vat_rate),
"uom": product.uom,
"currency_code": product.currency_code,
"category_ids": list(product.category_links.values_list("category_id", flat=True)),
"net_purchase_price": _money(product.net_purchase_price),
"display_sales_price": _money(product.display_sales_price),
}
for product in products.order_by("name")[:100]
]
return JsonResponse({"results": results})
@require_permissions("categories.manage")
@require_GET
def categories_view(request: HttpRequest) -> JsonResponse:
results = [
{"id": category.id, "legacy_id": category.legacy_id, "name": category.name}
for category in _scoped_categories_queryset(request.user, Category.objects.filter(is_active=True)).order_by("name")
]
return JsonResponse({"results": results})
@require_permissions("dashboard.view")
@require_GET
def dashboard_overview_view(request: HttpRequest) -> JsonResponse:
allowed_business_ids = _allowed_business_ids(request.user)
invoices = _scoped_invoices_queryset(request.user, Invoice.objects.exclude(payment_status="void"))
revenue = DailyRevenueSummary.objects.filter(business_id__in=allowed_business_ids)
payload = {
"total_revenue": _money(revenue.aggregate(total=Sum("sales_revenue"))["total"]),
"total_expenses": _money(invoices.aggregate(total=Sum("gross_total"))["total"]),
"outstanding_invoices": invoices.filter(payment_status="unpaid").count(),
"business_count": Business.objects.filter(is_active=True, id__in=allowed_business_ids).count(),
"vendor_count": _scoped_vendors_queryset(request.user, Vendor.objects.filter(is_active=True)).count(),
"invoice_count": invoices.count(),
"total_vat": _money(revenue.aggregate(total=Sum("vat_total"))["total"]),
"unread_notifications": Notification.objects.filter(is_read=False, is_dismissed=False).count(),
}
return JsonResponse(payload)
@require_permissions("dashboard.view")
@require_GET
def dashboard_business_summary_view(request: HttpRequest) -> JsonResponse:
businesses = Business.objects.filter(is_active=True, id__in=_allowed_business_ids(request.user)).order_by("name")
results = []
for business in businesses:
invoice_qs = Invoice.objects.filter(business=business).exclude(payment_status="void")
revenue_qs = DailyRevenueSummary.objects.filter(business=business)
results.append(
{
"business_id": business.id,
"business_name": business.name,
"short_code": business.short_code,
"currency": business.currency,
"total_revenue": _money(revenue_qs.aggregate(total=Sum("sales_revenue"))["total"]),
"total_expenses": _money(invoice_qs.aggregate(total=Sum("gross_total"))["total"]),
"outstanding_invoices": invoice_qs.filter(payment_status="unpaid").count(),
"invoice_count": invoice_qs.count(),
}
)
return JsonResponse({"results": results})
@require_permissions("dashboard.view")
@require_GET
def business_summary_view(request: HttpRequest, business_id: int) -> JsonResponse:
try:
business = Business.objects.get(pk=business_id, is_active=True)
except Business.DoesNotExist:
return JsonResponse({"detail": "Business not found"}, status=404)
if denied := _ensure_business_access(request, business.id):
return denied
return JsonResponse(_business_summary_payload(business))
@require_http_methods(["GET", "POST"])
def vendors_view(request: HttpRequest) -> JsonResponse:
if request.method == "GET":
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if not request.user.is_superuser and "vendors.view" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["vendors.view"]}, status=403)
query = request.GET.get("q", "").strip()
business_id = request.GET.get("business_id")
category_id = request.GET.get("category_id")
vendors = _scoped_vendors_queryset(
request.user,
Vendor.objects.all().prefetch_related("business_links", "category_links"),
)
if query:
vendors = vendors.filter(Q(name__icontains=query) | Q(contact_email__icontains=query))
if business_id:
if denied := _ensure_business_access(request, int(business_id)):
return denied
vendors = vendors.filter(business_links__business_id=business_id)
if category_id:
vendors = vendors.filter(category_links__category_id=category_id)
vendors = vendors.distinct().order_by("name")
vendors = [_vendor_payload(vendor) for vendor in vendors]
return JsonResponse({"results": vendors})
data = _json_body(request)
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if not request.user.is_superuser and "vendors.create" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["vendors.create"]}, status=403)
if not data.get("name"):
return _bad_request("Vendor name is required")
vendor = Vendor.objects.create(
name=data["name"],
vat_number=data.get("vat_number", ""),
registration_id=data.get("registration_id", ""),
contact_email=data.get("contact_email", ""),
contact_phone=data.get("contact_phone", ""),
address=data.get("address", ""),
notes=data.get("notes", ""),
is_active=data.get("is_active", True),
)
if business_ids := data.get("business_ids"):
for business_id in business_ids:
if denied := _ensure_business_access(request, int(business_id)):
vendor.delete()
return denied
VendorBusiness.objects.bulk_create(
[VendorBusiness(vendor=vendor, business_id=int(business_id)) for business_id in business_ids],
ignore_conflicts=True,
)
if category_ids := data.get("category_ids"):
VendorCategory.objects.bulk_create(
[VendorCategory(vendor=vendor, category_id=int(category_id)) for category_id in category_ids],
ignore_conflicts=True,
)
vendor = Vendor.objects.prefetch_related("business_links", "category_links").get(pk=vendor.pk)
return JsonResponse(_vendor_payload(vendor), status=201)
@require_http_methods(["GET", "PUT"])
def vendor_detail_view(request: HttpRequest, vendor_id: int) -> JsonResponse:
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
try:
vendor = Vendor.objects.prefetch_related("business_links", "category_links").get(pk=vendor_id)
except Vendor.DoesNotExist:
return JsonResponse({"detail": "Vendor not found"}, status=404)
if request.method == "GET":
if not request.user.is_superuser and "vendors.view" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["vendors.view"]}, status=403)
if not _vendor_is_accessible(request.user, vendor):
return _forbidden("Vendor access denied")
return JsonResponse(_vendor_payload(vendor))
if not request.user.is_superuser and "vendors.edit" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["vendors.edit"]}, status=403)
if not _vendor_is_accessible(request.user, vendor):
return _forbidden("Vendor access denied")
data = _json_body(request)
vendor.name = data.get("name", vendor.name)
vendor.vat_number = data.get("vat_number", vendor.vat_number)
vendor.registration_id = data.get("registration_id", vendor.registration_id)
vendor.contact_email = data.get("contact_email", vendor.contact_email)
vendor.contact_phone = data.get("contact_phone", vendor.contact_phone)
vendor.address = data.get("address", vendor.address)
vendor.notes = data.get("notes", vendor.notes)
if "is_active" in data:
vendor.is_active = bool(data["is_active"])
vendor.save()
if "business_ids" in data:
for business_id in data["business_ids"]:
if denied := _ensure_business_access(request, int(business_id)):
return denied
vendor.business_links.all().delete()
VendorBusiness.objects.bulk_create(
[VendorBusiness(vendor=vendor, business_id=int(business_id)) for business_id in data["business_ids"]],
ignore_conflicts=True,
)
if "category_ids" in data:
vendor.category_links.all().delete()
VendorCategory.objects.bulk_create(
[VendorCategory(vendor=vendor, category_id=int(category_id)) for category_id in data["category_ids"]],
ignore_conflicts=True,
)
vendor = Vendor.objects.prefetch_related("business_links", "category_links").get(pk=vendor.pk)
return JsonResponse(_vendor_payload(vendor))
@require_http_methods(["GET", "POST"])
def invoices_view(request: HttpRequest) -> JsonResponse:
if request.method == "GET":
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if not request.user.is_superuser and "invoices.view" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["invoices.view"]}, status=403)
invoices = _scoped_invoices_queryset(
request.user,
Invoice.objects.select_related("business", "vendor").prefetch_related(
"line_items__product", "category_links__category"
),
)
if business_id := request.GET.get("business_id"):
if denied := _ensure_business_access(request, int(business_id)):
return denied
invoices = invoices.filter(business_id=business_id)
if vendor_id := request.GET.get("vendor_id"):
invoices = invoices.filter(vendor_id=vendor_id)
if payment_status := request.GET.get("payment_status"):
invoices = invoices.filter(payment_status=payment_status)
if query := request.GET.get("q", "").strip():
invoices = invoices.filter(
Q(invoice_number__icontains=query)
| Q(vendor__name__icontains=query)
| Q(notes__icontains=query)
)
results = [build_invoice_payload(invoice) for invoice in invoices.order_by("-created_at")[:100]]
return JsonResponse({"results": results})
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if not request.user.is_superuser and "invoices.create" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["invoices.create"]}, status=403)
data = _json_body(request)
if denied := _validate_invoice_payload_access(request, data):
return denied
try:
invoice = create_invoice_from_payload(data)
except (KeyError, TypeError, ValueError) as exc:
return _bad_request(str(exc))
invoice = (
Invoice.objects.select_related("business", "vendor")
.prefetch_related("line_items__product", "category_links__category")
.get(pk=invoice.pk)
)
return JsonResponse(build_invoice_payload(invoice), status=201)
@require_http_methods(["GET", "PUT", "DELETE"])
def invoice_detail_view(request: HttpRequest, invoice_id: int) -> JsonResponse:
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
try:
invoice = _scoped_invoices_queryset(
request.user,
Invoice.objects.select_related("business", "vendor").prefetch_related(
"line_items__product", "category_links__category"
),
).get(pk=invoice_id)
except Invoice.DoesNotExist:
return JsonResponse({"detail": "Invoice not found"}, status=404)
if request.method == "GET":
if not request.user.is_superuser and "invoices.view" not in set(request.user.permission_keys()):
return _forbidden(missing_permissions=["invoices.view"])
return JsonResponse(build_invoice_payload(invoice))
if request.method == "DELETE":
if not request.user.is_superuser and "invoices.delete" not in set(request.user.permission_keys()):
return _forbidden(missing_permissions=["invoices.delete"])
invoice.delete()
return JsonResponse({"detail": "Invoice deleted"})
if not request.user.is_superuser and "invoices.edit" not in set(request.user.permission_keys()):
return _forbidden(missing_permissions=["invoices.edit"])
data = _json_body(request)
if denied := _validate_invoice_payload_access(request, data):
return denied
try:
invoice = update_invoice_from_payload(invoice, data)
except (KeyError, TypeError, ValueError) as exc:
return _bad_request(str(exc))
invoice = (
Invoice.objects.select_related("business", "vendor")
.prefetch_related("line_items__product", "category_links__category")
.get(pk=invoice.pk)
)
return JsonResponse(build_invoice_payload(invoice))
@require_permissions("inventory.view")
@require_GET
def inventory_view(request: HttpRequest) -> JsonResponse:
search = request.GET.get("q", "").strip()
category_id = request.GET.get("category_id")
visible_products = _scoped_products_queryset(request.user, Product.objects.filter(is_active=True))
rows = (
InventoryBalance.objects.select_related("product")
.annotate(category_count=Count("product__category_links"))
.prefetch_related("product__category_links__category")
.filter(product__in=visible_products)
)
if search:
rows = rows.filter(Q(product__name__icontains=search) | Q(product__gtin__icontains=search))
if category_id:
rows = rows.filter(product__category_links__category_id=category_id)
rows = rows.distinct().order_by("product__name")
return JsonResponse(
{
"results": [
{
"product_id": row.product_id,
"legacy_product_id": row.product.legacy_id,
"product_name": row.product.name,
"gtin": row.product.gtin,
"quantity_on_hand": float(row.quantity_on_hand),
"uom": row.uom,
"vat_rate": _money(row.product.vat_rate),
"net_purchase_price": _money(row.product.net_purchase_price),
"display_sales_price": _money(row.product.display_sales_price),
"category_count": row.category_count,
"category_ids": list(row.product.category_links.values_list("category_id", flat=True)),
"category_names": [link.category.name for link in row.product.category_links.all()],
}
for row in rows
]
}
)
@require_http_methods(["GET", "POST"])
def events_view(request: HttpRequest) -> JsonResponse:
if request.method == "GET":
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if not request.user.is_superuser and "events.view" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["events.view"]}, status=403)
business_id = request.GET.get("business_id")
events = Event.objects.select_related("business", "created_by").filter(is_active=True)
if business_id:
if denied := _ensure_business_access(request, int(business_id)):
return denied
events = events.filter(business_id=business_id)
elif not request.user.is_superuser:
events = events.filter(business_id__in=_allowed_business_ids(request.user))
if event_type := request.GET.get("event_type"):
events = events.filter(event_type=event_type)
results = [_event_payload(event) for event in events.order_by("start_datetime")]
return JsonResponse({"results": results})
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if not request.user.is_superuser and "events.create" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["events.create"]}, status=403)
data = _json_body(request)
if not data.get("business_id"):
return _bad_request("business_id is required")
if denied := _ensure_business_access(request, int(data["business_id"])):
return denied
if not data.get("title"):
return _bad_request("title is required")
if not data.get("start_datetime") or not data.get("end_datetime"):
return _bad_request("start_datetime and end_datetime are required")
event = Event.objects.create(
business_id=int(data["business_id"]),
title=data["title"],
description=data.get("description", ""),
event_type=data.get("event_type", "other"),
start_datetime=data["start_datetime"],
end_datetime=data["end_datetime"],
all_day=bool(data.get("all_day", False)),
location=data.get("location", ""),
color=data.get("color", ""),
recurrence_type=data.get("recurrence_type", "none"),
recurrence_end_date=data.get("recurrence_end_date") or None,
created_by=request.user,
is_active=True,
)
event = Event.objects.select_related("business", "created_by").get(pk=event.pk)
return JsonResponse(_event_payload(event), status=201)
@require_http_methods(["GET", "PUT", "DELETE"])
def event_detail_view(request: HttpRequest, event_id: int) -> JsonResponse:
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
try:
event = Event.objects.select_related("business", "created_by").get(pk=event_id)
except Event.DoesNotExist:
return JsonResponse({"detail": "Event not found"}, status=404)
if denied := _ensure_business_access(request, event.business_id):
return denied
if request.method == "GET":
if not request.user.is_superuser and "events.view" not in set(request.user.permission_keys()):
return _forbidden(missing_permissions=["events.view"])
return JsonResponse(_event_payload(event))
if request.method == "DELETE":
if not request.user.is_superuser and "events.delete" not in set(request.user.permission_keys()):
return _forbidden(missing_permissions=["events.delete"])
event.delete()
return JsonResponse({"detail": "Event deleted"})
if not request.user.is_superuser and "events.edit" not in set(request.user.permission_keys()):
return _forbidden(missing_permissions=["events.edit"])
data = _json_body(request)
if "business_id" in data:
denied = _ensure_business_access(request, int(data["business_id"]))
if denied:
return denied
for field in ["title", "description", "event_type", "location", "color", "recurrence_type"]:
if field in data:
setattr(event, field, data[field])
if "business_id" in data:
event.business_id = int(data["business_id"])
if "start_datetime" in data:
event.start_datetime = data["start_datetime"]
if "end_datetime" in data:
event.end_datetime = data["end_datetime"]
if "all_day" in data:
event.all_day = bool(data["all_day"])
if "recurrence_end_date" in data:
event.recurrence_end_date = data["recurrence_end_date"] or None
if "is_active" in data:
event.is_active = bool(data["is_active"])
event.save()
event = Event.objects.select_related("business", "created_by").get(pk=event.pk)
return JsonResponse(_event_payload(event))
@require_permissions("shifts.view")
@require_GET
def schedule_overview_view(request: HttpRequest) -> JsonResponse:
business_id = request.GET.get("business_id")
templates = ShiftTemplate.objects.select_related("business", "shift_role", "created_by").prefetch_related("assignments__user")
roles = ShiftRole.objects.select_related("business").filter(is_active=True)
assignments = ShiftAssignment.objects.select_related("shift_template", "user").all()
if business_id:
if denied := _ensure_business_access(request, int(business_id)):
return denied
templates = templates.filter(business_id=business_id)
roles = roles.filter(business_id=business_id)
assignments = assignments.filter(shift_template__business_id=business_id)
elif not request.user.is_superuser:
allowed_business_ids = _allowed_business_ids(request.user)
templates = templates.filter(business_id__in=allowed_business_ids)
roles = roles.filter(business_id__in=allowed_business_ids)
assignments = assignments.filter(shift_template__business_id__in=allowed_business_ids)
upcoming_templates = templates.order_by("start_datetime")[:30]
upcoming_assignments = assignments.order_by("occurrence_date", "shift_template__start_datetime")[:50]
results = {
"roles": [
{
"id": role.id,
"legacy_id": role.legacy_id,
"business_id": role.business_id,
"business_name": role.business.name,
"name": role.name,
"color": role.color,
"sort_order": role.sort_order,
}
for role in roles.order_by("business__name", "sort_order", "name")
],
"templates": [
{
"id": template.id,
"legacy_id": template.legacy_id,
"business_id": template.business_id,
"business_name": template.business.name,
"name": template.name,
"start_datetime": template.start_datetime.isoformat(),
"end_datetime": template.end_datetime.isoformat(),
"min_staff": template.min_staff,
"max_staff": template.max_staff,
"color": template.color,
"recurrence_type": template.recurrence_type,
"recurrence_end_date": template.recurrence_end_date.isoformat() if template.recurrence_end_date else None,
"shift_role_name": template.shift_role.name if template.shift_role_id else None,
"assignment_count": template.assignments.count(),
}
for template in upcoming_templates
],
"assignments": [
{
"id": assignment.id,
"legacy_id": assignment.legacy_id,
"shift_template_id": assignment.shift_template_id,
"shift_name": assignment.shift_template.name,
"user_name": assignment.user.display_name or assignment.user.username,
"occurrence_date": assignment.occurrence_date.isoformat(),
"status": assignment.status,
"start_override": assignment.start_override.isoformat() if assignment.start_override else None,
"end_override": assignment.end_override.isoformat() if assignment.end_override else None,
"notes": assignment.notes,
}
for assignment in upcoming_assignments
],
}
return JsonResponse(results)
@require_permissions("users.manage")
@require_GET
def settings_overview_view(request: HttpRequest) -> JsonResponse:
roles = Role.objects.prefetch_related("permission_links__permission").order_by("name")
users = User.objects.select_related("role").prefetch_related("business_links").order_by("username")
permissions = DomainPermission.objects.order_by("group", "key")
return JsonResponse(
{
"roles": [
{
"id": role.id,
"name": role.name,
"description": role.description,
"is_system": role.is_system,
"permission_keys": list(role.permission_links.values_list("permission__key", flat=True)),
"user_count": role.users.filter(is_active=True).count(),
}
for role in roles
],
"users": [_user_admin_payload(user) for user in users],
"permissions": [
{
"id": permission.id,
"key": permission.key,
"label": permission.label,
"group": permission.group,
}
for permission in permissions
],
}
)
@require_permissions("users.manage")
@require_http_methods(["POST"])
def users_view(request: HttpRequest) -> JsonResponse:
data = _json_body(request)
if not data.get("username"):
return _bad_request("username is required")
if not data.get("password"):
return _bad_request("password is required")
if User.objects.filter(username=data["username"]).exists():
return _bad_request("Username already exists")
user = User.objects.create(
username=data["username"],
password=make_password(data["password"]),
display_name=data.get("display_name", ""),
email=data.get("email", ""),
is_active=bool(data.get("is_active", True)),
role_id=data.get("role_id") or None,
)
if business_ids := data.get("business_ids"):
for business_id in business_ids:
if denied := _ensure_business_access(request, int(business_id)):
user.delete()
return denied
UserBusinessAccess.objects.bulk_create(
[UserBusinessAccess(user=user, business_id=int(business_id)) for business_id in business_ids],
ignore_conflicts=True,
)
user = User.objects.select_related("role").prefetch_related("business_links").get(pk=user.pk)
return JsonResponse(_user_admin_payload(user), status=201)
@require_permissions("users.manage")
@require_http_methods(["GET", "PUT", "DELETE"])
def user_detail_view(request: HttpRequest, user_id: int) -> JsonResponse:
try:
user = User.objects.select_related("role").prefetch_related("business_links").get(pk=user_id)
except User.DoesNotExist:
return JsonResponse({"detail": "User not found"}, status=404)
if request.method == "GET":
return JsonResponse(_user_admin_payload(user))
if request.method == "DELETE":
if request.user.pk == user.pk:
return _bad_request("You cannot delete your own account")
user.delete()
return JsonResponse({"detail": "User deleted"})
data = _json_body(request)
if "username" in data:
username = str(data["username"]).strip()
if not username:
return _bad_request("username is required")
if User.objects.exclude(pk=user.pk).filter(username=username).exists():
return _bad_request("Username already exists")
user.username = username
for field in ["display_name", "email"]:
if field in data:
setattr(user, field, data[field])
if "password" in data and data["password"]:
user.password = make_password(data["password"])
if "is_active" in data:
user.is_active = bool(data["is_active"])
if "is_superuser" in data:
user.is_superuser = bool(data["is_superuser"])
if "role_id" in data:
user.role_id = data["role_id"] or None
user.save()
if "business_ids" in data:
for business_id in data["business_ids"]:
if denied := _ensure_business_access(request, int(business_id)):
return denied
user.business_links.all().delete()
UserBusinessAccess.objects.bulk_create(
[UserBusinessAccess(user=user, business_id=int(business_id)) for business_id in data["business_ids"]],
ignore_conflicts=True,
)
user = User.objects.select_related("role").prefetch_related("business_links").get(pk=user.pk)
return JsonResponse(_user_admin_payload(user))
@require_http_methods(["GET", "POST"])
def devices_view(request: HttpRequest) -> JsonResponse:
if request.method == "GET":
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if not request.user.is_superuser and "users.manage" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["users.manage"]}, status=403)
devices = AllowedDevice.objects.order_by("-registered_at")
return JsonResponse({"results": [_device_payload(device) for device in devices]})
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if not request.user.is_superuser and "users.manage" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["users.manage"]}, status=403)
data = _json_body(request)
if not data.get("ip_address"):
return _bad_request("ip_address is required")
device = AllowedDevice.objects.create(
ip_address=data["ip_address"],
label=data.get("label", ""),
user_agent=data.get("user_agent", ""),
is_active=bool(data.get("is_active", True)),
ipv6_prefix=data.get("ipv6_prefix", ""),
device_token=data.get("device_token", secrets.token_urlsafe(24)),
known_ips=",".join(data.get("known_ips", [])),
)
return JsonResponse(_device_payload(device), status=201)
@require_permissions("users.manage")
@require_http_methods(["PUT", "DELETE"])
def device_detail_view(request: HttpRequest, device_id: int) -> JsonResponse:
try:
device = AllowedDevice.objects.get(pk=device_id)
except AllowedDevice.DoesNotExist:
return JsonResponse({"detail": "Device not found"}, status=404)
if request.method == "DELETE":
device.delete()
return JsonResponse({"detail": "Device deleted"})
data = _json_body(request)
for field in ["ip_address", "label", "user_agent", "ipv6_prefix", "device_token"]:
if field in data:
setattr(device, field, data[field])
if "is_active" in data:
device.is_active = bool(data["is_active"])
if "known_ips" in data:
device.known_ips = ",".join(data["known_ips"])
device.save()
return JsonResponse(_device_payload(device))
@require_http_methods(["GET", "POST"])
def device_tokens_view(request: HttpRequest) -> JsonResponse:
if request.method == "GET":
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if not request.user.is_superuser and "users.manage" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["users.manage"]}, status=403)
tokens = DeviceRegistrationToken.objects.select_related("created_by").order_by("-created_at")
return JsonResponse({"results": [_device_token_payload(token) for token in tokens]})
if not request.user.is_authenticated:
return JsonResponse({"detail": "Authentication required"}, status=401)
if not request.user.is_superuser and "users.manage" not in set(request.user.permission_keys()):
return JsonResponse({"detail": "Permission denied", "missing_permissions": ["users.manage"]}, status=403)
data = _json_body(request)
expires_in_days = int(data.get("expires_in_days", 7))
token = DeviceRegistrationToken.objects.create(
token=secrets.token_urlsafe(24),
label=data.get("label", ""),
expires_at=timezone.now() + timedelta(days=expires_in_days),
created_by=request.user,
)
token = DeviceRegistrationToken.objects.select_related("created_by").get(pk=token.pk)
return JsonResponse(_device_token_payload(token), status=201)
@require_permissions("users.manage")
@require_http_methods(["DELETE"])
def device_token_detail_view(request: HttpRequest, token_id: int) -> JsonResponse:
try:
token = DeviceRegistrationToken.objects.get(pk=token_id)
except DeviceRegistrationToken.DoesNotExist:
return JsonResponse({"detail": "Registration token not found"}, status=404)
token.delete()
return JsonResponse({"detail": "Registration token deleted"})
@api_login_required
@require_GET
def notifications_view(request: HttpRequest) -> JsonResponse:
rows = Notification.objects.filter(Q(is_dismissed=False) | Q(is_read=False)).order_by("-created_at")[:50]
return JsonResponse(
{
"results": [
{
"id": row.id,
"type": row.type,
"title": row.title,
"message": row.message,
"is_read": row.is_read,
"created_at": row.created_at.isoformat(),
}
for row in rows
]
}
)