Wan2.1/wan-pwa/apps/api/routes/generation.py
Claude 51c5837c43
feat: Implement Phase 3 - Backend Integration & Critical Polish
This commit implements Phase 3 of the Wan2.1 PWA, closing all critical
integration gaps between frontend, backend, database, and Replicate API.

## Backend Integration 

### Database Writes
- Create generation records BEFORE calling Replicate
- Store job_id for tracking Replicate predictions
- Track progress, status, and completion timestamps
- Save video URLs and error messages

### Credit System
- Atomic credit deduction using database function deduct_credits()
- Automatic refunds on generation failures via refund_credits()
- Complete audit trail in credit_transactions table
- Transaction logging for all credit operations

### Webhook Handler
- Created /api/webhooks/replicate endpoint
- HMAC signature verification for security
- Automatic status updates from Replicate push notifications
- Maps Replicate statuses to application statuses
- Triggers refunds for failed generations

### Updated Generation Flow
1. Check user credits before starting
2. Create generation record (status: queued)
3. Start Replicate job and get job_id
4. Update record with job_id (status: processing)
5. Deduct credits atomically
6. Webhook updates status when complete
7. Polling fallback if webhook fails

## Frontend Enhancements 

### Error Handling
- Added sonner for beautiful toast notifications
- Success/error/loading states with retry actions
- User-friendly error messages
- Providers component wraps app with Toaster

### Form Validation
- Zod schemas for T2V and I2V inputs
- Prompt length validation (10-500 chars)
- Model and resolution validation
- Credit cost calculator

### Credit Management
- useCredits hook for real-time credit fetching
- Optimistic updates on generation start
- Credit refresh functionality
- Loading and error states

### Image Upload
- Drag-and-drop ImageUpload component
- Client-side validation (file type, size)
- Image preview functionality
- Max 10MB size limit with user feedback
- Ready for I2V integration

### Settings Page
- Basic settings page structure
- Placeholders for Profile, Billing, API Keys
- Ready for Phase 4 enhancements

## Database Changes 

### New Migration: 002_credit_system.sql
- credit_transactions table with audit trail
- deduct_credits() function for atomic operations
- add_credits() function for purchases/bonuses
- refund_credits() function for failed generations
- Added job_id, progress, error_message columns to generations

## Documentation 

### PHASE_3_IMPLEMENTATION.md
- Complete implementation guide
- Testing checklist (backend, frontend, E2E)
- Deployment steps with webhook registration
- Known issues and limitations
- Metrics to monitor
- Phase 4 roadmap

## Files Changed

### Backend (4 files)
- apps/api/main.py - Added webhooks router
- apps/api/routes/generation.py - Complete rewrite with DB integration
- apps/api/routes/webhooks.py - NEW webhook handler
- packages/db/migrations/002_credit_system.sql - NEW credit system

### Frontend (7 files)
- apps/web/package.json - Added sonner
- apps/web/src/app/layout.tsx - Added Providers wrapper
- apps/web/src/app/dashboard/settings/page.tsx - NEW settings page
- apps/web/src/components/providers.tsx - NEW toast provider
- apps/web/src/components/generation/image-upload.tsx - NEW upload component
- apps/web/src/lib/hooks/use-credits.ts - NEW credit management hook
- apps/web/src/lib/validation/generation.ts - NEW Zod schemas

### Documentation (1 file)
- PHASE_3_IMPLEMENTATION.md - NEW comprehensive guide

## Testing Required

### Backend
- [ ] Database writes on generation start
- [ ] Credit deduction accuracy
- [ ] Webhook updates from Replicate
- [ ] Refunds on failures

### Frontend
- [ ] Toast notifications
- [ ] Form validation
- [ ] Credit display and warnings
- [ ] Image upload

### Integration
- [ ] End-to-end generation flow
- [ ] Credit deduction → generation → completion
- [ ] Webhook vs polling updates

## Next Steps (Phase 4)

1. Payment integration with Stripe
2. Retry logic for failed generations
3. Cancel in-progress generations
4. In-app video player
5. Batch operations
6. Admin panel

## Environment Variables

### New Required Variables
- REPLICATE_WEBHOOK_SECRET - For webhook signature verification

See PHASE_3_IMPLEMENTATION.md for complete setup instructions.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-24 14:36:07 +00:00

338 lines
11 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, Header
from typing import Optional
from models.generation import (
TextToVideoRequest,
ImageToVideoRequest,
GenerationResponse,
GenerationStatus,
)
from services.replicate_service import ReplicateService
from services.credit_service import CreditService
from core.supabase import get_supabase
from datetime import datetime
router = APIRouter()
async def get_user_id(authorization: Optional[str] = Header(None)) -> str:
"""Extract user ID from authorization header"""
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Unauthorized")
token = authorization.replace("Bearer ", "")
supabase = get_supabase()
try:
user = supabase.auth.get_user(token)
return user.user.id
except Exception as e:
raise HTTPException(status_code=401, detail="Invalid token")
@router.post("/text-to-video", response_model=GenerationResponse)
async def generate_text_to_video(
request: TextToVideoRequest, user_id: str = Depends(get_user_id)
):
"""Generate video from text prompt"""
supabase = get_supabase()
# Calculate credit cost
cost = CreditService.calculate_cost(request.model, request.resolution)
# Check if user has sufficient credits
credits_result = await CreditService.get_user_credits(user_id)
if credits_result < cost:
raise HTTPException(
status_code=402,
detail=f"Insufficient credits. You need {cost} credits but have {credits_result}.",
)
# Create generation record BEFORE calling Replicate
generation_record = (
supabase.table("generations")
.insert(
{
"user_id": user_id,
"type": "text-to-video",
"prompt": request.prompt,
"negative_prompt": request.negative_prompt,
"model": request.model,
"resolution": request.resolution,
"status": "queued",
"credits_used": cost,
"progress": 0,
}
)
.execute()
)
if not generation_record.data:
raise HTTPException(status_code=500, detail="Failed to create generation record")
generation_id = generation_record.data[0]["id"]
try:
# Start generation via Replicate
job_id = await ReplicateService.generate_text_to_video(
prompt=request.prompt,
negative_prompt=request.negative_prompt,
model=request.model,
resolution=request.resolution,
duration=request.duration,
seed=request.seed,
)
# Update generation with job_id and status
supabase.table("generations").update(
{"job_id": job_id, "status": "processing", "progress": 10}
).eq("id", generation_id).execute()
# Deduct credits using database function
try:
supabase.rpc(
"deduct_credits", {"p_user_id": user_id, "p_amount": cost, "p_gen_id": generation_id}
).execute()
except Exception as credit_error:
# Rollback: delete generation record
supabase.table("generations").delete().eq("id", generation_id).execute()
raise HTTPException(status_code=402, detail="Failed to deduct credits")
return GenerationResponse(
id=generation_id,
status="processing",
created_at=datetime.utcnow(),
credits_used=cost,
)
except HTTPException:
raise
except Exception as e:
# Mark generation as failed
supabase.table("generations").update(
{"status": "failed", "error_message": str(e), "progress": 0}
).eq("id", generation_id).execute()
# Refund credits if they were deducted
try:
supabase.rpc("refund_credits", {"p_gen_id": generation_id}).execute()
except:
pass
raise HTTPException(status_code=500, detail=f"Generation failed: {str(e)}")
@router.post("/image-to-video", response_model=GenerationResponse)
async def generate_image_to_video(
request: ImageToVideoRequest, user_id: str = Depends(get_user_id)
):
"""Generate video from image"""
supabase = get_supabase()
# Calculate credit cost
cost = CreditService.calculate_cost(request.model, request.resolution)
# Check if user has sufficient credits
credits_result = await CreditService.get_user_credits(user_id)
if credits_result < cost:
raise HTTPException(
status_code=402,
detail=f"Insufficient credits. You need {cost} credits but have {credits_result}.",
)
# Create generation record BEFORE calling Replicate
generation_record = (
supabase.table("generations")
.insert(
{
"user_id": user_id,
"type": "image-to-video",
"prompt": request.prompt,
"negative_prompt": request.negative_prompt,
"image_url": request.image_url,
"model": request.model,
"resolution": request.resolution,
"status": "queued",
"credits_used": cost,
"progress": 0,
}
)
.execute()
)
if not generation_record.data:
raise HTTPException(status_code=500, detail="Failed to create generation record")
generation_id = generation_record.data[0]["id"]
try:
# Start generation via Replicate
job_id = await ReplicateService.generate_image_to_video(
prompt=request.prompt,
image_url=request.image_url,
negative_prompt=request.negative_prompt,
resolution=request.resolution,
duration=request.duration,
seed=request.seed,
)
# Update generation with job_id and status
supabase.table("generations").update(
{"job_id": job_id, "status": "processing", "progress": 10}
).eq("id", generation_id).execute()
# Deduct credits using database function
try:
supabase.rpc(
"deduct_credits", {"p_user_id": user_id, "p_amount": cost, "p_gen_id": generation_id}
).execute()
except Exception as credit_error:
# Rollback: delete generation record
supabase.table("generations").delete().eq("id", generation_id).execute()
raise HTTPException(status_code=402, detail="Failed to deduct credits")
return GenerationResponse(
id=generation_id,
status="processing",
created_at=datetime.utcnow(),
credits_used=cost,
)
except HTTPException:
raise
except Exception as e:
# Mark generation as failed
supabase.table("generations").update(
{"status": "failed", "error_message": str(e), "progress": 0}
).eq("id", generation_id).execute()
# Refund credits if they were deducted
try:
supabase.rpc("refund_credits", {"p_gen_id": generation_id}).execute()
except:
pass
raise HTTPException(status_code=500, detail=f"Generation failed: {str(e)}")
@router.get("/status/{generation_id}", response_model=GenerationStatus)
async def get_generation_status(generation_id: str, user_id: str = Depends(get_user_id)):
"""Get status of a generation"""
# Verify ownership
supabase = get_supabase()
generation = (
supabase.table("generations")
.select("*")
.eq("id", generation_id)
.eq("user_id", user_id)
.single()
.execute()
)
if not generation.data:
raise HTTPException(status_code=404, detail="Generation not found")
gen_data = generation.data
# If generation is already completed, return cached data
if gen_data.get("status") in ["completed", "failed"]:
return GenerationStatus(
id=generation_id,
status=gen_data["status"],
progress=gen_data.get("progress", 100 if gen_data["status"] == "completed" else 0),
video_url=gen_data.get("video_url"),
error=gen_data.get("error_message"),
logs=None,
)
# Get live status from Replicate using job_id
job_id = gen_data.get("job_id")
if not job_id:
# No job_id yet, return queued status
return GenerationStatus(
id=generation_id,
status="queued",
progress=0,
video_url=None,
error=None,
logs=None,
)
try:
replicate_status = await ReplicateService.get_prediction_status(job_id)
# Update database with latest status
update_data = {}
# Map Replicate status to our status
status_map = {
"starting": "processing",
"processing": "processing",
"succeeded": "completed",
"failed": "failed",
"canceled": "failed",
}
new_status = status_map.get(replicate_status["status"], "processing")
update_data["status"] = new_status
# Update progress
if new_status == "processing":
update_data["progress"] = 50
elif new_status == "completed":
update_data["progress"] = 100
elif new_status == "failed":
update_data["progress"] = 0
# Save video URL if completed
if replicate_status.get("output"):
video_url = replicate_status["output"]
if isinstance(video_url, list):
video_url = video_url[0]
update_data["video_url"] = video_url
update_data["completed_at"] = datetime.utcnow().isoformat()
# Save error if failed
if replicate_status.get("error"):
update_data["error_message"] = replicate_status["error"]
# Update database
supabase.table("generations").update(update_data).eq("id", generation_id).execute()
return GenerationStatus(
id=generation_id,
status=new_status,
progress=update_data.get("progress", 0),
video_url=update_data.get("video_url"),
error=update_data.get("error_message"),
logs=replicate_status.get("logs"),
)
except Exception as e:
# If Replicate call fails, return database status
return GenerationStatus(
id=generation_id,
status=gen_data["status"],
progress=gen_data.get("progress", 0),
video_url=gen_data.get("video_url"),
error=gen_data.get("error_message"),
logs=None,
)
@router.get("/history")
async def get_generation_history(user_id: str = Depends(get_user_id)):
"""Get user's generation history"""
supabase = get_supabase()
result = (
supabase.table("generations")
.select("*")
.eq("user_id", user_id)
.order("created_at", desc=True)
.limit(50)
.execute()
)
return {"generations": result.data}