Files
kursy-mirror/purchase/models.py

394 lines
16 KiB
Python

import logging as lg
import os
import requests
import stripe
from django.conf import settings
from django.contrib.auth.models import Group
from django.db import models
from wagtail.admin.panels import FieldPanel
from modelcluster.fields import ParentalKey
from wagtail.models import Orderable
GITEA_ORG_NAME = "Studio77"
logger = lg.getLogger(__name__)
class CoursePurchase(models.Model):
class Status(models.TextChoices):
INITIATED = "initiated", "Initiated"
PENDING = "pending", "Pending"
PAID = "paid", "Paid"
REFUNDED = "refunded", "Refunded"
FAILED = "failed", "Failed"
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
course = models.ForeignKey("home.CoursePage", on_delete=models.CASCADE)
purchased_at = models.DateTimeField(auto_now_add=True)
refunded = models.BooleanField(default=False)
status = models.CharField(
max_length=20,
choices=Status.choices,
default=Status.INITIATED,
)
stripe_checkout_session_id = models.CharField(max_length=255, blank=True, null=True)
def mock_refund(self):
self.refunded = True
self.save()
def _get_gitea_team_id(self, team_name):
api_url = getattr(settings, "GITEA_URL", None)
if not api_url:
logger.debug("GITEA_URL is not set, skipping Gitea team assignment")
return None
try:
response = requests.get(
f"{api_url}/orgs/{GITEA_ORG_NAME}/teams",
timeout=5,
headers={"Authorization": f"token {os.getenv('GITEA_API_TOKEN')}"},
)
response.raise_for_status()
teams = response.json()
team = next((team for team in teams if team["name"] == team_name), None)
return team["id"] if team else None
except Exception as e:
logger.exception(
f"Failed to check existing Gitea teams: {e}\n{getattr(response, 'text', '')}",
e,
)
return None
def add_to_gitea_team(self):
course = self.course
user = self.user
team_name = f"course-{course.id}"
api_url = getattr(settings, "GITEA_URL", None)
if not api_url:
logger.debug("GITEA_URL is not set, skipping Gitea team assignment")
return
team_id = self._get_gitea_team_id(team_name)
if not team_id:
logger.warning(
f"Gitea team {team_name} not found for course {course.title}"
)
return
url = f"{api_url}/teams/{team_id}/members/studio77-{user.id}"
try:
response = requests.put(
url,
timeout=5,
headers={"Authorization": f"token {os.getenv('GITEA_API_TOKEN')}"},
)
if response.status_code == 204:
logger.info(
f"Successfully added user {user.email} to Gitea team {team_name}"
)
else:
response.raise_for_status()
except Exception as e:
logger.error(
f"Failed to add user {user.email} to Gitea team {team_name}: {e}\n{getattr(response, 'text', '')}"
)
def remove_from_gitea_team(self):
course = self.course
user = self.user
team_name = f"course-{course.id}"
api_url = getattr(settings, "GITEA_URL", None)
if not api_url:
logger.debug("GITEA_URL is not set, skipping Gitea team removal")
return
team_id = self._get_gitea_team_id(team_name)
if not team_id:
logger.warning(
f"Gitea team {team_name} not found for course {course.title}"
)
return
url = f"{api_url}/teams/{team_id}/members/studio77-{user.id}"
try:
response = requests.delete(
url,
timeout=5,
headers={"Authorization": f"token {os.getenv('GITEA_API_TOKEN')}"},
)
if response.status_code == 204:
logger.info(
f"Successfully removed user {user.email} from Gitea team {team_name}"
)
else:
response.raise_for_status()
except Exception as e:
logger.error(
f"Failed to remove user {user.email} from Gitea team {team_name}: {e}\n{getattr(response, 'text', '')}"
)
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
group_name = f"course_{self.course.id}_access"
group, _ = Group.objects.get_or_create(name=group_name)
print(
f"Saving CoursePurchase for user {self.user} and course {self.course.title}, refunded={self.refunded}"
)
if self.refunded:
print(f"Removing user {self.user} from group {group_name} due to refund")
self.remove_from_gitea_team()
self.user.groups.remove(group)
else:
logger.debug(
f"Adding user {self.user} to group {group_name} for course {self.course.title}"
)
self.add_to_gitea_team()
class PurchasableProduct(Orderable, models.Model):
"""A product that can be purchased. When created it will create a Stripe Product and Price.
On delete it will try to deactivate the Price and delete the Product in Stripe.
The code is defensive: if STRIPE_API_KEY is not configured the model will still work locally.
"""
name = models.CharField(max_length=255, blank=True)
description = models.TextField(blank=True)
price_cents = models.PositiveIntegerField(help_text="Price in cents")
currency = models.CharField(
max_length=10, default=getattr(settings, "STRIPE_DEFAULT_CURRENCY", "pln")
)
stripe_product_id = models.CharField(max_length=255, blank=True, null=True)
stripe_price_id = models.CharField(max_length=255, blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True)
course = ParentalKey(
"home.CoursePage",
on_delete=models.CASCADE,
related_name="purchasable_products",
null=True,
blank=True,
)
panels = [
FieldPanel("price_cents"),
FieldPanel("currency"),
FieldPanel("stripe_product_id", read_only=True),
FieldPanel("stripe_price_id", read_only=True),
FieldPanel("stripe_payment_url", read_only=True),
]
@property
def price(self):
return self.price_cents / 100
def __str__(self):
return f"{self.name} ({self.price_cents / 100:.2f} {self.currency.upper()})"
def save(self, *args, **kwargs):
"""Save locally first to ensure we have a PK, then create or update Stripe product/price if needed.
Behavior:
- On create: create Stripe Product and Price (if STRIPE_SECRET_KEY is set).
- On update:
- If name or description changed -> update Stripe Product.
- If price_cents or currency changed -> create a new Stripe Price and deactivate the old one, then update stripe_price_id.
- If STRIPE_SECRET_KEY is not configured the model will still work locally.
"""
# Capture whether this is a new object and the previous state (if any)
is_new = self.pk is None
previous = None
if not is_new:
try:
previous = self.__class__.objects.get(pk=self.pk)
except self.__class__.DoesNotExist:
previous = None
# Get name, description and image from the linked course if not set explicitly on the product
if self.course:
course_name = self.course.title
course_description = self.course.description or ""
if not self.name:
self.name = course_name
if not self.description:
self.description = course_description
# Persist local changes first so we have an id
super().save(*args, **kwargs)
stripe_api_key = getattr(
settings, "STRIPE_SECRET_KEY", os.getenv("STRIPE_SECRET_KEY")
)
if not stripe_api_key:
logger.debug(
"STRIPE_SECRET_KEY not set, skipping Stripe product/price creation/update"
)
return
stripe.api_key = stripe_api_key
try:
changed_fields = []
# Create Stripe Product if missing
if not self.stripe_product_id:
prod = stripe.Product.create(
name=self.name,
description=self.description or None,
metadata={"local_id": str(self.id)},
)
self.stripe_product_id = prod.id
changed_fields.append("stripe_product_id")
logger.info(
f"Created Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}"
)
# If we don't have a price id, create one
if not self.stripe_price_id:
price = stripe.Price.create(
product=self.stripe_product_id,
unit_amount=self.price_cents,
currency=self.currency.lower(),
)
self.stripe_price_id = price.id
changed_fields.append("stripe_price_id")
logger.info(
f"Created Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}"
)
# Create Stripe Payment Link if missing or if price changed
if not self.stripe_payment_url and self.stripe_price_id:
try:
payment_link = stripe.PaymentLink.create(
line_items=[{"price": self.stripe_price_id, "quantity": 1}],
after_completion={
"type": "redirect",
"redirect": {
"url": getattr(
settings,
"STRIPE_SUCCESS_URL",
"https://example.com/success",
)
},
},
)
self.stripe_payment_url = payment_link.url
changed_fields.append("stripe_payment_url")
logger.info(
f"Created Stripe payment link {self.stripe_payment_url} for PurchasableProduct {self.id}"
)
except Exception as e:
logger.exception(
f"Failed to create Stripe payment link for PurchasableProduct {self.id}: {e}"
)
# If this is an update (we had previous state) perform updates
if previous:
# Update product metadata/name/description if they changed
try:
if (previous.name != self.name) or (
previous.description != self.description
):
stripe.Product.modify(
self.stripe_product_id,
name=self.name,
description=self.description or None,
)
logger.info(
f"Updated Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}"
)
except Exception:
logger.exception(
f"Failed to update Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}"
)
# If price or currency changed, create a new Price and deactivate the old one
try:
prev_currency = (previous.currency or "").lower()
curr_currency = (self.currency or "").lower()
if (previous.price_cents != self.price_cents) or (
prev_currency != curr_currency
):
# Create new price for the same product
new_price = stripe.Price.create(
product=self.stripe_product_id,
unit_amount=self.price_cents,
currency=self.currency.lower(),
)
# Attempt to deactivate the old price, but don't fail the whole operation if it fails
try:
if self.stripe_price_id:
stripe.Price.modify(self.stripe_price_id, active=False)
logger.info(
f"Deactivated old Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}"
)
except Exception:
logger.exception(
f"Failed to deactivate old Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}"
)
# Switch to the new price id and persist it
self.stripe_price_id = new_price.id
if "stripe_price_id" not in changed_fields:
changed_fields.append("stripe_price_id")
logger.info(
f"Created new Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}"
)
except Exception:
logger.exception(
f"Failed to create or switch Stripe price for PurchasableProduct {self.id}"
)
# Persist any changed stripe ids without triggering further Stripe operations
if changed_fields:
super().save(update_fields=changed_fields)
except Exception as e:
logger.exception(
f"Failed to create/update Stripe product/price for PurchasableProduct {self.id}: {e}"
)
def delete(self, *args, **kwargs):
"""Try to clean up Stripe resources when the product is deleted locally."""
stripe_api_key = getattr(
settings, "STRIPE_SECRET_KEY", os.getenv("STRIPE_SECRET_KEY")
)
if not stripe_api_key:
logger.debug(
"STRIPE_SECRET_KEY not set, skipping Stripe product/price cleanup"
)
return super().delete(*args, **kwargs)
stripe.api_key = stripe_api_key
# Attempt to deactivate price and delete product. Be tolerant of failures.
if self.stripe_price_id:
try:
stripe.Price.modify(self.stripe_price_id, active=False)
logger.info(
f"Deactivated Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}"
)
except Exception:
logger.exception(
f"Failed to deactivate Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}"
)
if self.stripe_product_id:
try:
stripe.Product.modify(self.stripe_product_id, active=False)
logger.info(
f"Deactivated Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}"
)
except Exception:
logger.exception(
f"Failed to deactivate Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}"
)
return super().delete(*args, **kwargs)