393 lines
13 KiB
Python
393 lines
13 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import Group, User
|
|
from django.db import models
|
|
from django.forms import CheckboxSelectMultiple
|
|
from django.utils import timezone
|
|
from modelcluster.contrib.taggit import ClusterTaggableManager
|
|
from modelcluster.fields import ParentalKey
|
|
from taggit.models import TaggedItemBase
|
|
from wagtail import blocks
|
|
from wagtail.admin.panels import FieldPanel
|
|
from wagtail.fields import RichTextField, StreamField
|
|
from wagtail.images.blocks import ImageBlock
|
|
from wagtail.models import Page
|
|
from wagtail.models.copying import ParentalManyToManyField
|
|
from wagtail_color_panel.edit_handlers import NativeColorPanel
|
|
from wagtail_color_panel.fields import ColorField
|
|
|
|
from purchase.models import CoursePurchase
|
|
|
|
|
|
class EmptyPage(Page):
|
|
pass
|
|
|
|
|
|
class HomePage(Page):
|
|
body = RichTextField(blank=True)
|
|
|
|
content_panels = Page.content_panels + ["body"]
|
|
|
|
|
|
class BlogIndexPage(Page):
|
|
subpage_types = ["home.BlogPage"]
|
|
|
|
|
|
class CourseIndexPage(Page):
|
|
subpage_types = ["home.CoursePage"]
|
|
|
|
def get_context(self, request):
|
|
context = super().get_context(request)
|
|
all_courses = self.get_children().live()
|
|
purchased_courses = set()
|
|
other_courses = set()
|
|
|
|
for course in all_courses:
|
|
if course.specific._user_has_access(request.user):
|
|
purchased_courses.add(course)
|
|
else:
|
|
other_courses.add(course)
|
|
|
|
context["purchased_courses"] = sorted(
|
|
purchased_courses, key=lambda c: c.title.lower()
|
|
)
|
|
context["other_courses"] = sorted(other_courses, key=lambda c: c.title.lower())
|
|
return context
|
|
|
|
|
|
class BlogPage(Page):
|
|
author = models.CharField(max_length=255)
|
|
image = models.ForeignKey(
|
|
"wagtailimages.Image",
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.SET_NULL,
|
|
related_name="+",
|
|
)
|
|
body = StreamField(
|
|
[
|
|
("heading", blocks.CharBlock(classname="title")),
|
|
("paragraph", blocks.RichTextBlock()),
|
|
("image", ImageBlock()),
|
|
]
|
|
)
|
|
|
|
content_panels = Page.content_panels + [
|
|
FieldPanel("author"),
|
|
FieldPanel("image"),
|
|
FieldPanel("body"),
|
|
]
|
|
parent_page_types = ["home.BlogIndexPage"]
|
|
|
|
|
|
class CoursePage(Page):
|
|
course_image = models.ForeignKey(
|
|
"wagtailimages.Image",
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.SET_NULL,
|
|
related_name="+",
|
|
)
|
|
description = models.CharField(max_length=255, blank=True)
|
|
body = RichTextField(blank=True)
|
|
allowed_groups = ParentalManyToManyField(
|
|
Group,
|
|
related_name="course_pages",
|
|
help_text="Additional groups that should have access to this course, e.g. Editors. NOTE: Users who purchase the course will be automatically added to a dedicated access group for this course, so you don't need to add that group here.",
|
|
)
|
|
|
|
repository_url = models.URLField(
|
|
null=True,
|
|
blank=True,
|
|
)
|
|
|
|
def _user_has_access(self, user):
|
|
if not user.is_authenticated:
|
|
return False
|
|
|
|
user_group_ids = user.groups.values_list("id", flat=True)
|
|
if self.allowed_groups.filter(id__in=user_group_ids).exists():
|
|
return True
|
|
|
|
return CoursePurchase.objects.filter(
|
|
user=user, course=self, refunded=False
|
|
).exists()
|
|
|
|
def _user_purchase_id(self, user):
|
|
if not user.is_authenticated:
|
|
return None
|
|
|
|
purchase = CoursePurchase.objects.filter(
|
|
user=user, course=self, refunded=False
|
|
).first()
|
|
print(f"User {user} purchase for course {self}: {purchase}")
|
|
return purchase.id if purchase else None
|
|
|
|
def mock_purchase(self, user):
|
|
"""Mock method to simulate purchasing this course for a user."""
|
|
if not user.is_authenticated:
|
|
return False
|
|
obj, created = CoursePurchase.objects.get_or_create(
|
|
user=user, course=self, refunded=False
|
|
)
|
|
# Add user to dedicated access group for this course
|
|
group_name = f"course_{self.id}_access"
|
|
group, _ = Group.objects.get_or_create(name=group_name)
|
|
user.groups.add(group)
|
|
# Ensure allowed_groups only includes this access group
|
|
if not self.allowed_groups.filter(id=group.id).exists():
|
|
self.allowed_groups.add(group)
|
|
return created
|
|
|
|
def save(self, *args, **kwargs):
|
|
if self.id is not None:
|
|
group_name = f"course_{self.id}_access"
|
|
group, created = Group.objects.get_or_create(name=group_name)
|
|
if not self.allowed_groups.filter(id=group.id).exists():
|
|
self.allowed_groups.add(group)
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
def get_context(self, request):
|
|
context = super().get_context(request)
|
|
context["user_has_access"] = self._user_has_access(request.user)
|
|
context["user_purchase_id"] = self._user_purchase_id(request.user)
|
|
return context
|
|
|
|
content_panels = Page.content_panels + [
|
|
FieldPanel("course_image"),
|
|
FieldPanel("description"),
|
|
FieldPanel("body"),
|
|
FieldPanel("allowed_groups", widget=CheckboxSelectMultiple),
|
|
FieldPanel(
|
|
"repository_url",
|
|
read_only=True,
|
|
heading="Repository URL (auto-generated)",
|
|
),
|
|
]
|
|
parent_page_types = ["home.CourseIndexPage"]
|
|
subpage_types = ["home.CourseModulePage"]
|
|
|
|
|
|
class CourseModulePage(Page):
|
|
body = RichTextField(blank=True)
|
|
|
|
@property
|
|
def course(self):
|
|
if hasattr(self, "get_parent"):
|
|
parent = self.get_parent()
|
|
if parent and hasattr(parent, "specific"):
|
|
return parent.specific
|
|
return None
|
|
|
|
@property
|
|
def full_title(self):
|
|
course = self.course
|
|
if course:
|
|
return f"{course.title} - {self.title}"
|
|
return self.title
|
|
|
|
content_panels = Page.content_panels + ["body"]
|
|
subpage_types = ["home.ModuleLessonPage"]
|
|
parent_page_types = ["home.CoursePage"]
|
|
|
|
|
|
class ModuleLessonPage(Page):
|
|
body = RichTextField(blank=True)
|
|
create_gitea_repo = models.BooleanField(
|
|
default=False,
|
|
help_text="If enabled, a Gitea repository will be automatically created for this module when the module is published.",
|
|
)
|
|
gitea_repo_url = models.URLField(
|
|
null=True,
|
|
blank=True,
|
|
help_text="URL of the Gitea repository for this lesson (auto-generated if 'create_gitea_repo' is enabled)",
|
|
)
|
|
|
|
@property
|
|
def module(self):
|
|
if hasattr(self, "get_parent"):
|
|
parent = self.get_parent()
|
|
if parent and hasattr(parent, "specific"):
|
|
return parent.specific
|
|
return None
|
|
|
|
@property
|
|
def full_title(self):
|
|
module = self.module
|
|
if module:
|
|
return f"{module.full_title} - {self.title}"
|
|
return self.title
|
|
|
|
content_panels = Page.content_panels + [
|
|
FieldPanel("body"),
|
|
FieldPanel("create_gitea_repo"),
|
|
FieldPanel(
|
|
"gitea_repo_url",
|
|
read_only=True,
|
|
heading="Gitea Repository URL",
|
|
),
|
|
]
|
|
parent_page_types = ["home.CourseModulePage"]
|
|
|
|
|
|
class EventPageTag(TaggedItemBase):
|
|
content_object = ParentalKey(
|
|
"home.EventPage",
|
|
related_name="tagged_items",
|
|
on_delete=models.CASCADE,
|
|
)
|
|
|
|
|
|
class EventPage(Page):
|
|
tags = ClusterTaggableManager(through=EventPageTag, blank=True)
|
|
color = ColorField(default="#1c398e")
|
|
image = models.ForeignKey(
|
|
"wagtailimages.Image",
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.SET_NULL,
|
|
related_name="+",
|
|
)
|
|
|
|
start = models.DateTimeField(
|
|
blank=True,
|
|
null=True,
|
|
help_text="Start date and time of the singular event (ignored if recurrence is enabled)",
|
|
)
|
|
end = models.DateTimeField(
|
|
blank=True,
|
|
null=True,
|
|
help_text="End date and time of the singular event (ignored if recurrence is enabled)",
|
|
)
|
|
|
|
# Recurrence fields
|
|
recurrence_enabled = models.BooleanField(
|
|
default=False, help_text="Enable automatic recurrence for this event"
|
|
)
|
|
RECURRENCE_DAYS = [
|
|
(0, "Monday"),
|
|
(1, "Tuesday"),
|
|
(2, "Wednesday"),
|
|
(3, "Thursday"),
|
|
(4, "Friday"),
|
|
(5, "Saturday"),
|
|
(6, "Sunday"),
|
|
]
|
|
recurrence_days_of_week = models.JSONField(
|
|
default=list,
|
|
blank=True,
|
|
help_text="Days of the week for recurrence (e.g., [0,3] for Mon & Thu)",
|
|
)
|
|
recurrence_start_time = models.TimeField(
|
|
null=True, blank=True, help_text="Start time for each occurrence"
|
|
)
|
|
recurrence_end_time = models.TimeField(
|
|
null=True, blank=True, help_text="End time for each occurrence"
|
|
)
|
|
recurrence_repeat_until = models.DateField(
|
|
null=True, blank=True, help_text="Repeat until this date (inclusive)"
|
|
)
|
|
recurrence_endless = models.BooleanField(
|
|
default=False,
|
|
help_text="If enabled, recurrence will not end (ignore 'repeat until' date)",
|
|
)
|
|
|
|
# TODO: use google maps here
|
|
location = models.CharField(max_length=255, blank=True)
|
|
max_attendees = models.PositiveIntegerField(
|
|
null=True,
|
|
blank=True,
|
|
help_text="Maximum number of attendees. Leave blank for unlimited.",
|
|
)
|
|
|
|
description = RichTextField(blank=True)
|
|
hosts = ParentalManyToManyField(
|
|
User,
|
|
related_name="hosted_events",
|
|
help_text="Select users who will be listed as hosts of this event.",
|
|
)
|
|
|
|
def get_context(self, request):
|
|
context = super().get_context(request)
|
|
# Occurrence-specific context should be handled in views/templates
|
|
return context
|
|
|
|
def generate_occurrences(self, days_ahead=30):
|
|
"""
|
|
Generate EventOccurrence objects for this event based on recurrence settings.
|
|
For endless recurrence, generate up to days_ahead into the future.
|
|
"""
|
|
from .event_occurrence import EventOccurrence
|
|
|
|
now = timezone.now()
|
|
if not self.recurrence_enabled:
|
|
# if recurrence is not enabled, ensure there's at least one occurrence for the specified start/end
|
|
if self.occurrences.exists():
|
|
occurrence = self.occurrences.first()
|
|
if occurrence.start != self.start or occurrence.end != self.end:
|
|
occurrence.start = self.start
|
|
occurrence.end = self.end
|
|
occurrence.save(update_fields=["start", "end"])
|
|
else:
|
|
EventOccurrence.objects.create(
|
|
event=self, start=self.start, end=self.end
|
|
)
|
|
return
|
|
|
|
# Determine the date range
|
|
start_date = now.date()
|
|
if self.recurrence_endless:
|
|
end_date = start_date + timedelta(days=days_ahead)
|
|
elif self.recurrence_repeat_until:
|
|
end_date = self.recurrence_repeat_until
|
|
else:
|
|
end_date = start_date + timedelta(days=days_ahead)
|
|
|
|
days_of_week = self.recurrence_days_of_week or []
|
|
start_time = self.recurrence_start_time or datetime.min.time()
|
|
end_time = self.recurrence_end_time or datetime.min.time()
|
|
|
|
for single_date in (
|
|
start_date + timedelta(n) for n in range((end_date - start_date).days + 1)
|
|
):
|
|
if days_of_week and single_date.weekday() not in days_of_week:
|
|
continue
|
|
start_dt = datetime.combine(single_date, start_time)
|
|
end_dt = datetime.combine(single_date, end_time)
|
|
start_dt = timezone.make_aware(start_dt)
|
|
end_dt = timezone.make_aware(end_dt)
|
|
# If an occurrence exists, update its start/end if needed, but keep signed_up_users
|
|
occurrence = self.occurrences.filter(start=start_dt, end=end_dt).first()
|
|
if occurrence:
|
|
if occurrence.start != start_dt or occurrence.end != end_dt:
|
|
occurrence.start = start_dt
|
|
occurrence.end = end_dt
|
|
occurrence.save(update_fields=["start", "end"])
|
|
else:
|
|
EventOccurrence.objects.create(event=self, start=start_dt, end=end_dt)
|
|
|
|
def save(self, *args, **kwargs):
|
|
super().save(*args, **kwargs)
|
|
if self.live:
|
|
self.generate_occurrences()
|
|
|
|
content_panels = Page.content_panels + [
|
|
FieldPanel("tags"),
|
|
NativeColorPanel("color"),
|
|
FieldPanel("image"),
|
|
FieldPanel("location"),
|
|
FieldPanel("max_attendees"),
|
|
FieldPanel("hosts", widget=CheckboxSelectMultiple),
|
|
FieldPanel("description"),
|
|
FieldPanel("start"),
|
|
FieldPanel("end"),
|
|
FieldPanel("recurrence_enabled"),
|
|
FieldPanel("recurrence_days_of_week"),
|
|
FieldPanel("recurrence_start_time"),
|
|
FieldPanel("recurrence_end_time"),
|
|
FieldPanel("recurrence_repeat_until"),
|
|
FieldPanel("recurrence_endless"),
|
|
]
|