import logging as lg import os import requests import stripe from django.conf import settings from django.contrib.auth.models import Group from django.db import models from wagtail.admin.panels import FieldPanel GITEA_ORG_NAME = "Studio77" logger = lg.getLogger(__name__) class CoursePurchase(models.Model): user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE) course = models.ForeignKey("home.CoursePage", on_delete=models.CASCADE) purchased_at = models.DateTimeField(auto_now_add=True) refunded = models.BooleanField(default=False) def mock_refund(self): self.refunded = True self.save() def _get_gitea_team_id(self, team_name): api_url = getattr(settings, "GITEA_URL", None) if not api_url: logger.debug("GITEA_URL is not set, skipping Gitea team assignment") return None try: response = requests.get( f"{api_url}/orgs/{GITEA_ORG_NAME}/teams", timeout=5, headers={"Authorization": f"token {os.getenv('GITEA_API_TOKEN')}"}, ) response.raise_for_status() teams = response.json() team = next((team for team in teams if team["name"] == team_name), None) return team["id"] if team else None except Exception as e: logger.exception( f"Failed to check existing Gitea teams: {e}\n{getattr(response, 'text', '')}", e, ) return None def add_to_gitea_team(self): course = self.course user = self.user team_name = f"course-{course.id}" api_url = getattr(settings, "GITEA_URL", None) if not api_url: logger.debug("GITEA_URL is not set, skipping Gitea team assignment") return team_id = self._get_gitea_team_id(team_name) if not team_id: logger.warning( f"Gitea team {team_name} not found for course {course.title}" ) return url = f"{api_url}/teams/{team_id}/members/studio77-{user.id}" try: response = requests.put( url, timeout=5, headers={"Authorization": f"token {os.getenv('GITEA_API_TOKEN')}"}, ) if response.status_code == 204: logger.info( f"Successfully added user {user.email} to Gitea team {team_name}" ) else: response.raise_for_status() except Exception as e: logger.error( f"Failed to add user {user.email} to Gitea team {team_name}: {e}\n{getattr(response, 'text', '')}" ) def remove_from_gitea_team(self): course = self.course user = self.user team_name = f"course-{course.id}" api_url = getattr(settings, "GITEA_URL", None) if not api_url: logger.debug("GITEA_URL is not set, skipping Gitea team removal") return team_id = self._get_gitea_team_id(team_name) if not team_id: logger.warning( f"Gitea team {team_name} not found for course {course.title}" ) return url = f"{api_url}/teams/{team_id}/members/studio77-{user.id}" try: response = requests.delete( url, timeout=5, headers={"Authorization": f"token {os.getenv('GITEA_API_TOKEN')}"}, ) if response.status_code == 204: logger.info( f"Successfully removed user {user.email} from Gitea team {team_name}" ) else: response.raise_for_status() except Exception as e: logger.error( f"Failed to remove user {user.email} from Gitea team {team_name}: {e}\n{getattr(response, 'text', '')}" ) def save(self, *args, **kwargs): super().save(*args, **kwargs) group_name = f"course_{self.course.id}_access" group, _ = Group.objects.get_or_create(name=group_name) print( f"Saving CoursePurchase for user {self.user} and course {self.course.title}, refunded={self.refunded}" ) if self.refunded: print(f"Removing user {self.user} from group {group_name} due to refund") self.remove_from_gitea_team() self.user.groups.remove(group) else: logger.debug( f"Adding user {self.user} to group {group_name} for course {self.course.title}" ) self.add_to_gitea_team() class PurchasableProduct(models.Model): """A product that can be purchased. When created it will create a Stripe Product and Price. On delete it will try to deactivate the Price and delete the Product in Stripe. The code is defensive: if STRIPE_API_KEY is not configured the model will still work locally. """ name = models.CharField(max_length=255) description = models.TextField(blank=True) price_cents = models.PositiveIntegerField(help_text="Price in cents") currency = models.CharField( max_length=10, default=getattr(settings, "STRIPE_DEFAULT_CURRENCY", "pln") ) stripe_product_id = models.CharField(max_length=255, blank=True, null=True) stripe_price_id = models.CharField(max_length=255, blank=True, null=True) created_at = models.DateTimeField(auto_now_add=True) panels = [ FieldPanel("price_cents"), FieldPanel("currency"), ] def __str__(self): return f"{self.name} ({self.price_cents / 100:.2f} {self.currency.upper()})" def save(self, *args, **kwargs): """Save locally first to ensure we have a PK, then create or update Stripe product/price if needed. Behavior: - On create: create Stripe Product and Price (if STRIPE_SECRET_KEY is set). - On update: - If name or description changed -> update Stripe Product. - If price_cents or currency changed -> create a new Stripe Price and deactivate the old one, then update stripe_price_id. - If STRIPE_SECRET_KEY is not configured the model will still work locally. """ # Capture whether this is a new object and the previous state (if any) is_new = self.pk is None previous = None if not is_new: try: previous = self.__class__.objects.get(pk=self.pk) except self.__class__.DoesNotExist: previous = None # Persist local changes first so we have an id super().save(*args, **kwargs) stripe_api_key = getattr( settings, "STRIPE_SECRET_KEY", os.getenv("STRIPE_SECRET_KEY") ) if not stripe_api_key: logger.debug( "STRIPE_SECRET_KEY not set, skipping Stripe product/price creation/update" ) return stripe.api_key = stripe_api_key try: changed_fields = [] # Create Stripe Product if missing if not self.stripe_product_id: prod = stripe.Product.create( name=self.name, description=self.description or None, metadata={"local_id": str(self.id)}, ) self.stripe_product_id = prod.id changed_fields.append("stripe_product_id") logger.info( f"Created Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}" ) # If we don't have a price id, create one if not self.stripe_price_id: price = stripe.Price.create( product=self.stripe_product_id, unit_amount=self.price_cents, currency=self.currency.lower(), ) self.stripe_price_id = price.id changed_fields.append("stripe_price_id") logger.info( f"Created Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) # If this is an update (we had previous state) perform updates if previous: # Update product metadata/name/description if they changed try: if (previous.name != self.name) or ( previous.description != self.description ): stripe.Product.modify( self.stripe_product_id, name=self.name, description=self.description or None, ) logger.info( f"Updated Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}" ) except Exception: logger.exception( f"Failed to update Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}" ) # If price or currency changed, create a new Price and deactivate the old one try: prev_currency = (previous.currency or "").lower() curr_currency = (self.currency or "").lower() if (previous.price_cents != self.price_cents) or ( prev_currency != curr_currency ): # Create new price for the same product new_price = stripe.Price.create( product=self.stripe_product_id, unit_amount=self.price_cents, currency=self.currency.lower(), ) # Attempt to deactivate the old price, but don't fail the whole operation if it fails try: if self.stripe_price_id: stripe.Price.modify(self.stripe_price_id, active=False) logger.info( f"Deactivated old Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) except Exception: logger.exception( f"Failed to deactivate old Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) # Switch to the new price id and persist it self.stripe_price_id = new_price.id if "stripe_price_id" not in changed_fields: changed_fields.append("stripe_price_id") logger.info( f"Created new Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) except Exception: logger.exception( f"Failed to create or switch Stripe price for PurchasableProduct {self.id}" ) # Persist any changed stripe ids without triggering further Stripe operations if changed_fields: super().save(update_fields=changed_fields) except Exception as e: logger.exception( f"Failed to create/update Stripe product/price for PurchasableProduct {self.id}: {e}" ) def delete(self, *args, **kwargs): """Try to clean up Stripe resources when the product is deleted locally.""" stripe_api_key = getattr( settings, "STRIPE_SECRET_KEY", os.getenv("STRIPE_SECRET_KEY") ) if not stripe_api_key: logger.debug( "STRIPE_SECRET_KEY not set, skipping Stripe product/price cleanup" ) return super().delete(*args, **kwargs) stripe.api_key = stripe_api_key # Attempt to deactivate price and delete product. Be tolerant of failures. if self.stripe_price_id: try: stripe.Price.modify(self.stripe_price_id, active=False) logger.info( f"Deactivated Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) except Exception: logger.exception( f"Failed to deactivate Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) if self.stripe_product_id: try: stripe.Product.modify(self.stripe_product_id, active=False) logger.info( f"Deactivated Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}" ) except Exception: logger.exception( f"Failed to deactivate Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}" ) return super().delete(*args, **kwargs)