Files
kursy-mirror/home/models/pages.py

402 lines
13 KiB
Python

from datetime import datetime, timedelta
from django.conf import settings
from django.contrib.auth.models import Group, User
from django.db import models
from django.forms import CheckboxSelectMultiple
from django.utils import timezone
from modelcluster.contrib.taggit import ClusterTaggableManager
from modelcluster.fields import ParentalKey
from taggit.models import TaggedItemBase
from wagtail import blocks
from wagtail.admin.panels import FieldPanel
from wagtail.fields import RichTextField, StreamField
from wagtail.images.blocks import ImageBlock
from wagtail.models import Page
from wagtail.models.copying import ParentalManyToManyField
from wagtail_color_panel.edit_handlers import NativeColorPanel
from wagtail_color_panel.fields import ColorField
from purchase.models import CoursePurchase
class EmptyPage(Page):
pass
class HomePage(Page):
body = RichTextField(blank=True)
content_panels = Page.content_panels + ["body"]
class BlogIndexPage(Page):
subpage_types = ["home.BlogPage"]
class CourseIndexPage(Page):
subpage_types = ["home.CoursePage"]
def get_context(self, request):
context = super().get_context(request)
all_courses = self.get_children().live()
purchased_courses = set()
other_courses = set()
for course in all_courses:
if course.specific._user_has_access(request.user):
purchased_courses.add(course)
else:
other_courses.add(course)
context["purchased_courses"] = sorted(
purchased_courses, key=lambda c: c.title.lower()
)
context["other_courses"] = sorted(other_courses, key=lambda c: c.title.lower())
return context
class EventIndexPage(Page):
subpage_types = ["home.EventPage"]
class BlogPage(Page):
author = models.CharField(max_length=255)
image = models.ForeignKey(
"wagtailimages.Image",
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="+",
)
body = StreamField(
[
("heading", blocks.CharBlock(classname="title")),
("paragraph", blocks.RichTextBlock()),
("image", ImageBlock()),
]
)
content_panels = Page.content_panels + [
FieldPanel("author"),
FieldPanel("image"),
FieldPanel("body"),
]
parent_page_types = ["home.BlogIndexPage"]
class CoursePage(Page):
course_image = models.ForeignKey(
"wagtailimages.Image",
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="+",
)
description = models.CharField(max_length=255, blank=True)
body = RichTextField(blank=True)
allowed_groups = ParentalManyToManyField(
Group,
related_name="course_pages",
help_text="Additional groups that should have access to this course, e.g. Editors. NOTE: Users who purchase the course will be automatically added to a dedicated access group for this course, so you don't need to add that group here.",
)
repository_url = models.URLField(
null=True,
blank=True,
)
def _user_has_access(self, user):
if not user.is_authenticated:
return False
user_group_ids = user.groups.values_list("id", flat=True)
if self.allowed_groups.filter(id__in=user_group_ids).exists():
return True
return CoursePurchase.objects.filter(
user=user, course=self, refunded=False
).exists()
def _user_purchase_id(self, user):
if not user.is_authenticated:
return None
purchase = CoursePurchase.objects.filter(
user=user, course=self, refunded=False
).first()
print(f"User {user} purchase for course {self}: {purchase}")
return purchase.id if purchase else None
def mock_purchase(self, user):
"""Mock method to simulate purchasing this course for a user."""
if not user.is_authenticated:
return False
obj, created = CoursePurchase.objects.get_or_create(
user=user, course=self, refunded=False
)
# Add user to dedicated access group for this course
group_name = f"course_{self.id}_access"
group, _ = Group.objects.get_or_create(name=group_name)
user.groups.add(group)
# Ensure allowed_groups only includes this access group
if not self.allowed_groups.filter(id=group.id).exists():
self.allowed_groups.add(group)
return created
def save(self, *args, **kwargs):
if self.id is not None:
group_name = f"course_{self.id}_access"
group, created = Group.objects.get_or_create(name=group_name)
if not self.allowed_groups.filter(id=group.id).exists():
self.allowed_groups.add(group)
super().save(*args, **kwargs)
def get_context(self, request):
context = super().get_context(request)
context["user_has_access"] = self._user_has_access(request.user)
context["user_purchase_id"] = self._user_purchase_id(request.user)
return context
content_panels = Page.content_panels + [
FieldPanel("course_image"),
FieldPanel("description"),
FieldPanel("body"),
FieldPanel("allowed_groups", widget=CheckboxSelectMultiple),
FieldPanel(
"repository_url",
read_only=True,
heading="Repository URL (auto-generated)",
),
]
parent_page_types = ["home.CourseIndexPage"]
subpage_types = ["home.CourseModulePage"]
class CourseModulePage(Page):
body = RichTextField(blank=True)
@property
def course(self):
if hasattr(self, "get_parent"):
parent = self.get_parent()
if parent and hasattr(parent, "specific"):
return parent.specific
return None
@property
def full_title(self):
course = self.course
if course:
return f"{course.title} - {self.title}"
return self.title
content_panels = Page.content_panels + ["body"]
subpage_types = ["home.ModuleLessonPage"]
parent_page_types = ["home.CoursePage"]
class ModuleLessonPage(Page):
body = RichTextField(blank=True)
create_gitea_repo = models.BooleanField(
default=False,
help_text="If enabled, a Gitea repository will be automatically created for this module when the module is published.",
)
gitea_repo_url = models.URLField(
null=True,
blank=True,
help_text="URL of the Gitea repository for this lesson (auto-generated if 'create_gitea_repo' is enabled)",
)
@property
def module(self):
if hasattr(self, "get_parent"):
parent = self.get_parent()
if parent and hasattr(parent, "specific"):
return parent.specific
return None
@property
def full_title(self):
module = self.module
if module:
return f"{module.full_title} - {self.title}"
return self.title
content_panels = Page.content_panels + [
FieldPanel("body"),
FieldPanel("create_gitea_repo"),
FieldPanel(
"gitea_repo_url",
read_only=True,
heading="Gitea Repository URL",
),
]
parent_page_types = ["home.CourseModulePage"]
class EventPageTag(TaggedItemBase):
content_object = ParentalKey(
"home.EventPage",
related_name="tagged_items",
on_delete=models.CASCADE,
)
class EventPage(Page):
tags = ClusterTaggableManager(through=EventPageTag, blank=True)
color = ColorField(default="#1c398e")
image = models.ForeignKey(
"wagtailimages.Image",
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name="+",
)
start = models.DateTimeField(
blank=True,
null=True,
help_text="Start date and time of the singular event (ignored if recurrence is enabled)",
)
end = models.DateTimeField(
blank=True,
null=True,
help_text="End date and time of the singular event (ignored if recurrence is enabled)",
)
# Recurrence fields
recurrence_enabled = models.BooleanField(
default=False, help_text="Enable automatic recurrence for this event"
)
RECURRENCE_DAYS = [
(0, "Monday"),
(1, "Tuesday"),
(2, "Wednesday"),
(3, "Thursday"),
(4, "Friday"),
(5, "Saturday"),
(6, "Sunday"),
]
recurrence_days_of_week = models.JSONField(
default=list,
blank=True,
help_text="Days of the week for recurrence (e.g., [0,3] for Mon & Thu)",
)
recurrence_start_time = models.TimeField(
null=True, blank=True, help_text="Start time for each occurrence"
)
recurrence_end_time = models.TimeField(
null=True, blank=True, help_text="End time for each occurrence"
)
recurrence_repeat_until = models.DateField(
null=True, blank=True, help_text="Repeat until this date (inclusive)"
)
recurrence_endless = models.BooleanField(
default=False,
help_text="If enabled, recurrence will not end (ignore 'repeat until' date)",
)
# TODO: use google maps here
location = models.CharField(max_length=255, blank=True)
max_attendees = models.PositiveIntegerField(
null=True,
blank=True,
help_text="Maximum number of attendees. Leave blank for unlimited.",
)
description = RichTextField(blank=True)
hosts = ParentalManyToManyField(
User,
related_name="hosted_events",
help_text="Select users who will be listed as hosts of this event.",
)
parent_page_types = ["home.EventIndexPage"]
def get_context(self, request):
context = super().get_context(request)
# Occurrence-specific context should be handled in views/templates
return context
def generate_occurrences(self, days_ahead=30):
"""
Generate EventOccurrence objects for this event based on recurrence settings.
For endless recurrence, generate up to days_ahead into the future.
"""
from .event_occurrence import EventOccurrence
now = timezone.now()
if not self.recurrence_enabled:
# if recurrence is not enabled, ensure there's at least one occurrence for the specified start/end
# and delete any other occurrences that don't match the current start/end
if self.occurrences.exists():
occurrence = self.occurrences.first()
if occurrence.start != self.start or occurrence.end != self.end:
occurrence.start = self.start
occurrence.end = self.end
occurrence.save(update_fields=["start", "end"])
self.occurrences.exclude(id=occurrence.id).delete()
else:
EventOccurrence.objects.create(
event=self, start=self.start, end=self.end
)
return
# Determine the date range
start_date = now.date()
if self.recurrence_endless:
end_date = start_date + timedelta(days=days_ahead)
elif self.recurrence_repeat_until:
end_date = self.recurrence_repeat_until
else:
end_date = start_date + timedelta(days=days_ahead)
days_of_week = self.recurrence_days_of_week or []
start_time = self.recurrence_start_time or datetime.min.time()
end_time = self.recurrence_end_time or datetime.min.time()
for single_date in (
start_date + timedelta(n) for n in range((end_date - start_date).days + 1)
):
if days_of_week and single_date.weekday() not in days_of_week:
continue
start_dt = datetime.combine(single_date, start_time)
end_dt = datetime.combine(single_date, end_time)
start_dt = timezone.make_aware(start_dt)
end_dt = timezone.make_aware(end_dt)
# If an occurrence exists, update its start/end if needed, but keep signed_up_users
occurrence = self.occurrences.filter(start=start_dt, end=end_dt).first()
if occurrence:
if occurrence.start != start_dt or occurrence.end != end_dt:
occurrence.start = start_dt
occurrence.end = end_dt
occurrence.save(update_fields=["start", "end"])
else:
EventOccurrence.objects.create(event=self, start=start_dt, end=end_dt)
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
if self.live:
self.generate_occurrences()
content_panels = Page.content_panels + [
FieldPanel("tags"),
NativeColorPanel("color"),
FieldPanel("image"),
FieldPanel("location"),
FieldPanel("max_attendees"),
FieldPanel("hosts", widget=CheckboxSelectMultiple),
FieldPanel("description"),
FieldPanel("start"),
FieldPanel("end"),
FieldPanel("recurrence_enabled"),
FieldPanel("recurrence_days_of_week"),
FieldPanel("recurrence_start_time"),
FieldPanel("recurrence_end_time"),
FieldPanel("recurrence_repeat_until"),
FieldPanel("recurrence_endless"),
]