import logging as lg import os import requests import stripe from django.conf import settings from django.contrib.auth.models import Group from django.db import models from wagtail.admin.panels import FieldPanel from modelcluster.fields import ParentalKey from wagtail.models import Orderable GITEA_ORG_NAME = "Studio77" logger = lg.getLogger(__name__) class CoursePurchase(models.Model): class Status(models.TextChoices): INITIATED = "initiated" PENDING = "pending" PAID = "paid" REFUNDED = "refunded" FAILED = "failed" user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE) course = models.ForeignKey("home.CoursePage", on_delete=models.CASCADE) purchased_at = models.DateTimeField(auto_now_add=True) refunded = models.BooleanField(default=False) status = models.CharField( max_length=20, choices=Status.choices, default=Status.INITIATED, ) stripe_checkout_session_id = models.CharField(max_length=255, blank=True, null=True) # Stripe identifiers to help reconcile refunds coming from webhooks or admin actions stripe_payment_intent_id = models.CharField(max_length=255, blank=True, null=True) stripe_charge_id = models.CharField(max_length=255, blank=True, null=True) def mock_refund(self): """Legacy helper used in dev: mark purchase refunded locally and perform cleanup. Prefer using `refund_via_stripe` to perform an actual Stripe refund when appropriate. """ # If we have Stripe identifiers it's better to actually issue a refund via Stripe if self.stripe_charge_id or self.stripe_payment_intent_id: try: self.refund_via_stripe() return True except Exception: # Fallback to local refund if Stripe refund cannot be performed pass self.refunded = True self.status = CoursePurchase.Status.REFUNDED self.save() def refund_via_stripe(self, amount=None, reason=None): """Initiate a refund in Stripe for this purchase and mark it refunded locally. - amount: integer in cents (optional) to perform a partial refund - reason: optional string ("duplicate", "fraud", "requested_by_customer") This method is idempotent: calling it for an already-refunded purchase will be a no-op. """ # If already refunded, do nothing if self.refunded: return None stripe_api_key = getattr( settings, "STRIPE_SECRET_KEY", os.getenv("STRIPE_SECRET_KEY") ) if not stripe_api_key: # Can't call Stripe; mark refunded locally (useful for local testing) self.refunded = True self.status = CoursePurchase.Status.REFUNDED self.save() return None import stripe as _stripe _stripe.api_key = stripe_api_key # Determine what identifier to use for refunding: prefer charge id if present, # otherwise use payment_intent. Stripe accepts either when creating a refund. refund_kwargs = {} if amount is not None: refund_kwargs["amount"] = int(amount) if reason is not None: refund_kwargs["reason"] = reason try: if self.stripe_charge_id: refund = _stripe.Refund.create( charge=self.stripe_charge_id, **refund_kwargs ) elif self.stripe_payment_intent_id: refund = _stripe.Refund.create( payment_intent=self.stripe_payment_intent_id, **refund_kwargs ) else: # As a last resort, try to lookup the PaymentIntent from the Checkout Session if self.stripe_checkout_session_id: session = _stripe.checkout.Session.retrieve( self.stripe_checkout_session_id ) payment_intent = session.get("payment_intent") if payment_intent: refund = _stripe.Refund.create( payment_intent=payment_intent, **refund_kwargs ) else: raise RuntimeError( "No Stripe identifiers available to perform refund" ) else: raise RuntimeError( "No Stripe identifiers available to perform refund" ) # On success, mark refunded locally and perform cleanup (remove group, gitea team) self.refunded = True self.status = CoursePurchase.Status.REFUNDED # Try to persist charge/payment intent ids if Stripe returned them try: charge_id = getattr(refund, "charge", None) if charge_id and not self.stripe_charge_id: self.stripe_charge_id = charge_id except Exception: pass try: # Some Refund objects include payment_intent pi = getattr(refund, "payment_intent", None) if pi and not self.stripe_payment_intent_id: self.stripe_payment_intent_id = pi except Exception: pass # Save and trigger model save logic (which will remove Gitea/team membership) self.save() return refund except Exception as e: logger.exception( f"Failed to initiate Stripe refund for CoursePurchase {self.id}: {e}" ) raise def _get_gitea_team_id(self, team_name): api_url = getattr(settings, "GITEA_URL", None) if not api_url: logger.debug("GITEA_URL is not set, skipping Gitea team assignment") return None try: response = requests.get( f"{api_url}/orgs/{GITEA_ORG_NAME}/teams", timeout=5, headers={"Authorization": f"token {os.getenv('GITEA_API_TOKEN')}"}, ) response.raise_for_status() teams = response.json() team = next((team for team in teams if team["name"] == team_name), None) return team["id"] if team else None except Exception as e: logger.exception( f"Failed to check existing Gitea teams: {e}\n{getattr(response, 'text', '')}", e, ) return None def add_to_gitea_team(self): course = self.course user = self.user team_name = f"course-{course.id}" api_url = getattr(settings, "GITEA_URL", None) if not api_url: logger.debug("GITEA_URL is not set, skipping Gitea team assignment") return team_id = self._get_gitea_team_id(team_name) if not team_id: logger.warning( f"Gitea team {team_name} not found for course {course.title}" ) return url = f"{api_url}/teams/{team_id}/members/studio77-{user.id}" try: response = requests.put( url, timeout=5, headers={"Authorization": f"token {os.getenv('GITEA_API_TOKEN')}"}, ) if response.status_code == 204: logger.info( f"Successfully added user {user.email} to Gitea team {team_name}" ) else: response.raise_for_status() except Exception as e: logger.error( f"Failed to add user {user.email} to Gitea team {team_name}: {e}\n{getattr(response, 'text', '')}" ) def remove_from_gitea_team(self): course = self.course user = self.user team_name = f"course-{course.id}" api_url = getattr(settings, "GITEA_URL", None) if not api_url: logger.debug("GITEA_URL is not set, skipping Gitea team removal") return team_id = self._get_gitea_team_id(team_name) if not team_id: logger.warning( f"Gitea team {team_name} not found for course {course.title}" ) return url = f"{api_url}/teams/{team_id}/members/studio77-{user.id}" try: response = requests.delete( url, timeout=5, headers={"Authorization": f"token {os.getenv('GITEA_API_TOKEN')}"}, ) if response.status_code == 204: logger.info( f"Successfully removed user {user.email} from Gitea team {team_name}" ) else: response.raise_for_status() except Exception as e: logger.error( f"Failed to remove user {user.email} from Gitea team {team_name}: {e}\n{getattr(response, 'text', '')}" ) def save(self, *args, **kwargs): previous_state = None if self.pk: previous_state = ( self.__class__.objects.filter(pk=self.pk) .values("status", "refunded") .first() ) super().save(*args, **kwargs) group_name = f"course_{self.course.id}_access" group, _ = Group.objects.get_or_create(name=group_name) print( f"Saving CoursePurchase for user {self.user} and course {self.course.title}, refunded={self.refunded}" ) should_grant_access = ( self.status == CoursePurchase.Status.PAID and not self.refunded ) had_granted_access = ( bool(previous_state) and previous_state["status"] == CoursePurchase.Status.PAID and not previous_state["refunded"] ) if should_grant_access: logger.debug( f"Adding user {self.user} to group {group_name} for course {self.course.title}" ) self.add_to_gitea_team() self.user.groups.add(group) else: if had_granted_access: print( f"Removing user {self.user} from group {group_name} due to status {self.status}" ) self.remove_from_gitea_team() self.user.groups.remove(group) class PurchasableProduct(Orderable, models.Model): """A product that can be purchased. When created it will create a Stripe Product and Price. On delete it will try to deactivate the Price and delete the Product in Stripe. The code is defensive: if STRIPE_API_KEY is not configured the model will still work locally. """ name = models.CharField(max_length=255, blank=True) description = models.TextField(blank=True) price_cents = models.PositiveIntegerField(help_text="Price in cents") currency = models.CharField( max_length=10, default=getattr(settings, "STRIPE_DEFAULT_CURRENCY", "pln") ) stripe_product_id = models.CharField(max_length=255, blank=True, null=True) stripe_price_id = models.CharField(max_length=255, blank=True, null=True) created_at = models.DateTimeField(auto_now_add=True) course = ParentalKey( "home.CoursePage", on_delete=models.CASCADE, related_name="purchasable_products", null=True, blank=True, ) panels = [ FieldPanel("price_cents"), FieldPanel("currency"), FieldPanel("stripe_product_id", read_only=True), FieldPanel("stripe_price_id", read_only=True), FieldPanel("stripe_payment_url", read_only=True), ] @property def price(self): return self.price_cents / 100 def __str__(self): return f"{self.name} ({self.price_cents / 100:.2f} {self.currency.upper()})" def save(self, *args, **kwargs): """Save locally first to ensure we have a PK, then create or update Stripe product/price if needed. Behavior: - On create: create Stripe Product and Price (if STRIPE_SECRET_KEY is set). - On update: - If name or description changed -> update Stripe Product. - If price_cents or currency changed -> create a new Stripe Price and deactivate the old one, then update stripe_price_id. - If STRIPE_SECRET_KEY is not configured the model will still work locally. """ # Capture whether this is a new object and the previous state (if any) is_new = self.pk is None previous = None if not is_new: try: previous = self.__class__.objects.get(pk=self.pk) except self.__class__.DoesNotExist: previous = None # Get name, description and image from the linked course if not set explicitly on the product if self.course: course_name = self.course.title course_description = self.course.description or "" if not self.name: self.name = course_name if not self.description: self.description = course_description # Persist local changes first so we have an id super().save(*args, **kwargs) stripe_api_key = getattr( settings, "STRIPE_SECRET_KEY", os.getenv("STRIPE_SECRET_KEY") ) if not stripe_api_key: logger.debug( "STRIPE_SECRET_KEY not set, skipping Stripe product/price creation/update" ) return stripe.api_key = stripe_api_key try: changed_fields = [] # Create Stripe Product if missing if not self.stripe_product_id: prod = stripe.Product.create( name=self.name, description=self.description or None, metadata={"local_id": str(self.id)}, ) self.stripe_product_id = prod.id changed_fields.append("stripe_product_id") logger.info( f"Created Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}" ) # If we don't have a price id, create one if not self.stripe_price_id: price = stripe.Price.create( product=self.stripe_product_id, unit_amount=self.price_cents, currency=self.currency.lower(), ) self.stripe_price_id = price.id changed_fields.append("stripe_price_id") logger.info( f"Created Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) # Create Stripe Payment Link if missing or if price changed if not self.stripe_payment_url and self.stripe_price_id: try: payment_link = stripe.PaymentLink.create( line_items=[{"price": self.stripe_price_id, "quantity": 1}], after_completion={ "type": "redirect", "redirect": { "url": getattr( settings, "STRIPE_SUCCESS_URL", "https://example.com/success", ) }, }, ) self.stripe_payment_url = payment_link.url changed_fields.append("stripe_payment_url") logger.info( f"Created Stripe payment link {self.stripe_payment_url} for PurchasableProduct {self.id}" ) except Exception as e: logger.exception( f"Failed to create Stripe payment link for PurchasableProduct {self.id}: {e}" ) # If this is an update (we had previous state) perform updates if previous: # Update product metadata/name/description if they changed try: if (previous.name != self.name) or ( previous.description != self.description ): stripe.Product.modify( self.stripe_product_id, name=self.name, description=self.description or None, ) logger.info( f"Updated Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}" ) except Exception: logger.exception( f"Failed to update Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}" ) # If price or currency changed, create a new Price and deactivate the old one try: prev_currency = (previous.currency or "").lower() curr_currency = (self.currency or "").lower() if (previous.price_cents != self.price_cents) or ( prev_currency != curr_currency ): # Create new price for the same product new_price = stripe.Price.create( product=self.stripe_product_id, unit_amount=self.price_cents, currency=self.currency.lower(), ) # Attempt to deactivate the old price, but don't fail the whole operation if it fails try: if self.stripe_price_id: stripe.Price.modify(self.stripe_price_id, active=False) logger.info( f"Deactivated old Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) except Exception: logger.exception( f"Failed to deactivate old Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) # Switch to the new price id and persist it self.stripe_price_id = new_price.id if "stripe_price_id" not in changed_fields: changed_fields.append("stripe_price_id") logger.info( f"Created new Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) except Exception: logger.exception( f"Failed to create or switch Stripe price for PurchasableProduct {self.id}" ) # Persist any changed stripe ids without triggering further Stripe operations if changed_fields: super().save(update_fields=changed_fields) except Exception as e: logger.exception( f"Failed to create/update Stripe product/price for PurchasableProduct {self.id}: {e}" ) def delete(self, *args, **kwargs): """Try to clean up Stripe resources when the product is deleted locally.""" stripe_api_key = getattr( settings, "STRIPE_SECRET_KEY", os.getenv("STRIPE_SECRET_KEY") ) if not stripe_api_key: logger.debug( "STRIPE_SECRET_KEY not set, skipping Stripe product/price cleanup" ) return super().delete(*args, **kwargs) stripe.api_key = stripe_api_key # Attempt to deactivate price and delete product. Be tolerant of failures. if self.stripe_price_id: try: stripe.Price.modify(self.stripe_price_id, active=False) logger.info( f"Deactivated Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) except Exception: logger.exception( f"Failed to deactivate Stripe price {self.stripe_price_id} for PurchasableProduct {self.id}" ) if self.stripe_product_id: try: stripe.Product.modify(self.stripe_product_id, active=False) logger.info( f"Deactivated Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}" ) except Exception: logger.exception( f"Failed to deactivate Stripe product {self.stripe_product_id} for PurchasableProduct {self.id}" ) return super().delete(*args, **kwargs)