Design E-Learning Platform
Designing an e-learning platform at scale (like Coursera, Udemy, or Khan Academy) requires handling millions of concurrent learners, petabytes of video content, real-time progress tracking, secure payment processing, and personalized recommendations. This document outlines a production-grade architecture for such a system.
Step 1: Requirements Clarification
Functional Requirements
Course Creation & Management
- Instructors can create, update, and publish courses with rich content (videos, PDFs, code exercises)
- Course versioning and draft management
- Multi-language support with subtitles
- Course categorization and tagging
- Bulk content upload and processing
Video Lessons
- Adaptive bitrate video streaming (ABR) supporting 240p to 4K resolution
- Video playback with speed controls (0.5x to 2x)
- Video seeking and resume from last position
- Picture-in-picture mode
- Offline download for mobile apps
- DRM protection for premium content
Quizzes & Assessments
- Multiple question types (MCQ, coding, essay, file upload)
- Timed assessments with auto-submission
- Auto-grading for objective questions
- Peer review for subjective assessments
- Plagiarism detection for code submissions
- Instant feedback and explanations
- Proctored exam support
Progress Tracking
- Real-time progress updates across devices
- Course completion percentage
- Learning streak tracking
- Time spent per lesson
- Quiz scores and analytics
- Bookmarks and notes
Certificates
- Auto-generate certificates upon course completion
- Verifiable certificates with unique IDs
- Digital signature and blockchain verification
- PDF download and LinkedIn integration
- Certificate templates customization
Payment & Subscriptions
- One-time course purchases
- Subscription plans (monthly/annual)
- Revenue sharing with instructors
- Multiple payment gateways (Stripe, PayPal, regional)
- Coupons and promotional codes
- Refund processing
- Invoice generation
Discussion Forums
- Threaded discussions per lesson
- Q&A with upvoting/downvoting
- Instructor and TA badges
- Mark as resolved
- Notifications for replies
- Search and filtering
Additional Features
- Course recommendations based on history and preferences
- Live classes with WebRTC
- Assignment submissions
- Instructor dashboard with analytics
- Student dashboard with learning path
- Course reviews and ratings
- Email notifications for course updates
Non-Functional Requirements
Scale
- 100M+ registered users
- 10M+ daily active users
- 1M+ concurrent video streams
- 500K+ courses
- 10M+ video assets
- 1M+ daily quiz submissions
Performance
- Video start time < 2 seconds
- API latency p99 < 200ms
- Search latency < 500ms
- Real-time progress sync < 1 second
- Quiz submission processing < 3 seconds
Availability
- 99.99% uptime (52 minutes downtime/year)
- Zero data loss for payment transactions
- Graceful degradation during outages
- Multi-region failover
Security
- End-to-end encryption for payments
- DRM for video content
- User authentication with OAuth 2.0
- Rate limiting and DDoS protection
- GDPR and CCPA compliance
- PCI DSS compliance for payments
Other
- Support 50+ languages
- Mobile-first design
- Accessibility (WCAG 2.1 Level AA)
- SEO optimization
Step 2: High-Level Architecture
System Components
┌─────────────────────────────────────────────────────────────────┐
│ Client Layer │
│ (Web, iOS, Android, TV Apps, Third-party Integrations) │
└────────────┬────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ CDN (CloudFront/Akamai) │
│ (Static Assets, Video Streaming, Images) │
└────────────┬────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ API Gateway + Load Balancer (ALB) │
│ (Rate Limiting, Auth, Routing, SSL Termination) │
└────────────┬────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────────────┐
│ Microservices Layer │
├────────────┬─────────────┬──────────────┬─────────────┬──────────────┤
│ Course │ Video │ Assessment │ Progress │ Payment │
│ Service │ Service │ Service │ Service │ Service │
├────────────┼─────────────┼──────────────┼─────────────┼──────────────┤
│ Discussion │ User │ Certificate │ Notification│Recommendation│
│ Service │ Service │ Service │ Service │ Service │
└────────────┴─────────────┴──────────────┴─────────────┴──────────────┘
│
▼
┌────────────────────────────────────────────────────────────────────────┐
│ Data Layer │
├─────────────────┬──────────────────┬───────────────┬──────────────────┤
│ PostgreSQL │ MongoDB │ Redis │ Elasticsearch │
│ (Courses, │ (Discussions, │ (Cache, │ (Course Search, │
│ Users, │ Logs, │ Sessions, │ Full-text) │
│ Payments) │ Analytics) │ Progress) │ │
└─────────────────┴──────────────────┴───────────────┴──────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────────────┐
│ Storage & Processing Layer │
├─────────────────┬──────────────────┬───────────────┬──────────────────┤
│ S3/Blob │ Video │ Kafka │ Airflow │
│ (Raw Video, │ Transcoding │ (Events, │ (ETL, ML │
│ Documents) │ (FFmpeg) │ Streams) │ Pipelines) │
└─────────────────┴──────────────────┴───────────────┴──────────────────┘
Core Services
1. Course Service
- Manages course metadata, curriculum, and content
- Handles course creation, updates, and publishing
- Course versioning and draft management
- Technologies: Java/Kotlin with Spring Boot, PostgreSQL
2. Video Service
- Handles video upload, transcoding, and streaming
- Generates adaptive bitrate streams (HLS/DASH)
- Manages DRM and content protection
- Technologies: Node.js, FFmpeg, AWS MediaConvert, Wowza
3. Assessment Service
- Manages quizzes, assignments, and exams
- Auto-grading engine for objective questions
- Code execution sandbox for programming assessments
- Technologies: Python/Django, Docker, PostgreSQL
4. Progress Service
- Tracks student progress in real-time
- Calculates completion percentages
- Manages bookmarks and resume points
- Technologies: Go, Redis, Kafka, PostgreSQL
5. Payment Service
- Processes payments and subscriptions
- Manages invoices and refunds
- Revenue sharing calculations
- Technologies: Java, PostgreSQL, Stripe SDK, PayPal SDK
6. Discussion Service
- Manages course forums and Q&A
- Threaded conversations
- Real-time notifications
- Technologies: Node.js, MongoDB, WebSocket, Redis Pub/Sub
7. Certificate Service
- Generates and validates certificates
- PDF rendering with custom templates
- Blockchain-based verification
- Technologies: Python, Celery, PostgreSQL, Ethereum/Hyperledger
8. Recommendation Service
- ML-based course recommendations
- Collaborative filtering and content-based filtering
- A/B testing framework
- Technologies: Python, TensorFlow, Feature Store, S3
9. Notification Service
- Email, push, and in-app notifications
- Template management
- Delivery tracking
- Technologies: Node.js, SendGrid, Firebase, SQS
Step 3: Deep Dives
3.1 Video Streaming with DRM
Video Upload Pipeline
Instructor Upload → S3 Bucket → Lambda Trigger → MediaConvert
↓
Transcoding
↓
┌──────────────────────────────────┴────────────┐
▼ ▼
HLS Segments (.m3u8) DASH Segments
(240p, 360p, 480p, 720p, 1080p, 4K) (.mpd format)
↓ ↓
S3 + CloudFront DRM Encryption
(Widevine, FairPlay)
Adaptive Bitrate Streaming (ABR)
- Use HLS (Apple) and DASH (universal) protocols
- Generate multiple quality variants during transcoding
- Client automatically switches quality based on bandwidth
- Segment duration: 6-10 seconds for optimal seeking
- Pre-generate thumbnails for video scrubbing (1 per 5 seconds)
DRM Implementation
// DRM Configuration
const drmConfig = {
widevine: {
licenseUrl: 'https://drm.platform.com/widevine',
certificateUrl: 'https://drm.platform.com/cert'
},
fairplay: {
licenseUrl: 'https://drm.platform.com/fairplay',
certificateUrl: 'https://drm.platform.com/fairplay-cert'
},
playready: {
licenseUrl: 'https://drm.platform.com/playready'
}
};
// Token-based access control
function generateVideoToken(userId, courseId, videoId) {
return jwt.sign({
userId, courseId, videoId,
exp: Math.floor(Date.now() / 1000) + (60 * 60) // 1 hour
}, DRM_SECRET);
}
Video Playback Optimization
- Pre-fetch next lesson in background
- Use Service Workers for offline caching
- Implement backoff retry for failed segments
- CDN with 200+ edge locations for low latency
- Hot content in memory cache at CDN edge
Watermarking for Piracy Prevention
# Dynamic watermarking with user info
def add_forensic_watermark(video_url, user_id, course_id):
"""Add invisible watermark with user identification"""
watermark_data = {
'user_id': user_id,
'timestamp': int(time.time()),
'course_id': course_id
}
# Encode watermark in frequency domain (invisible)
return watermark_service.embed(video_url, watermark_data)
3.2 Course Content Management
Course Schema Design (PostgreSQL)
-- Courses table with versioning
CREATE TABLE courses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
instructor_id UUID NOT NULL REFERENCES users(id),
title VARCHAR(255) NOT NULL,
description TEXT,
category_id INTEGER REFERENCES categories(id),
difficulty VARCHAR(20) CHECK (difficulty IN ('beginner', 'intermediate', 'advanced')),
price DECIMAL(10, 2),
language VARCHAR(10),
status VARCHAR(20) DEFAULT 'draft', -- draft, published, archived
version INTEGER DEFAULT 1,
published_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
CONSTRAINT unique_course_version UNIQUE(id, version)
);
CREATE INDEX idx_courses_instructor ON courses(instructor_id);
CREATE INDEX idx_courses_category ON courses(category_id);
CREATE INDEX idx_courses_status ON courses(status);
-- Course sections (modules)
CREATE TABLE sections (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
course_id UUID REFERENCES courses(id) ON DELETE CASCADE,
title VARCHAR(255) NOT NULL,
order_index INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
-- Course lessons (videos, readings, quizzes)
CREATE TABLE lessons (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
section_id UUID REFERENCES sections(id) ON DELETE CASCADE,
title VARCHAR(255) NOT NULL,
type VARCHAR(20) CHECK (type IN ('video', 'reading', 'quiz', 'assignment', 'live')),
content_url TEXT,
duration_seconds INTEGER,
order_index INTEGER NOT NULL,
is_preview BOOLEAN DEFAULT FALSE, -- Allow preview without enrollment
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_lessons_section ON lessons(section_id);
Content Delivery Strategy
- Static assets (PDFs, images) → S3 + CloudFront
- Video content → Separate S3 bucket with lifecycle policies
- Code exercises → Git repositories with isolated sandboxes
- Use multipart upload for large files (>100MB)
- Generate signed URLs with 1-hour expiration for security
Course Publishing Workflow
class CoursePublisher:
def publish_course(self, course_id, instructor_id):
# Validation checks
self._validate_course_content(course_id)
self._validate_pricing(course_id)
self._validate_prerequisites(course_id)
# Generate preview assets
self._generate_thumbnail(course_id)
self._generate_preview_video(course_id)
# Update search index
elasticsearch.index_course(course_id)
# Invalidate cache
redis.delete(f"course:{course_id}")
# Publish to CDN
cdn.invalidate_cache(f"/courses/{course_id}/*")
# Notify followers
notification_service.notify_followers(instructor_id, course_id)
# Update analytics
kafka.produce('course.published', {
'course_id': course_id,
'instructor_id': instructor_id,
'timestamp': time.time()
})
3.3 Quiz and Assessment Engine with Auto-Grading
Assessment Types Architecture
┌─────────────────────────────────────────────────┐
│ Assessment Service │
├─────────────┬─────────────┬────────────────────┤
│ MCQ │ Coding │ Essay/File │
│ Engine │ Engine │ Upload Engine │
│ (Instant) │ (Sandbox) │ (Peer Review) │
└─────────────┴─────────────┴────────────────────┘
Quiz Schema
CREATE TABLE quizzes (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
lesson_id UUID REFERENCES lessons(id),
title VARCHAR(255),
time_limit_minutes INTEGER,
passing_score DECIMAL(5, 2),
max_attempts INTEGER DEFAULT 3,
shuffle_questions BOOLEAN DEFAULT TRUE,
show_answers_after_submit BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE questions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
quiz_id UUID REFERENCES quizzes(id),
type VARCHAR(20) CHECK (type IN ('mcq', 'multiple_select', 'true_false', 'fill_blank', 'coding', 'essay')),
question_text TEXT NOT NULL,
points DECIMAL(5, 2) DEFAULT 1.0,
order_index INTEGER,
metadata JSONB, -- Store options, correct answers, test cases
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE quiz_attempts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id),
quiz_id UUID REFERENCES quizzes(id),
attempt_number INTEGER,
started_at TIMESTAMP DEFAULT NOW(),
submitted_at TIMESTAMP,
score DECIMAL(5, 2),
time_spent_seconds INTEGER,
answers JSONB,
CONSTRAINT unique_attempt UNIQUE(user_id, quiz_id, attempt_number)
);
CREATE INDEX idx_attempts_user ON quiz_attempts(user_id);
Auto-Grading Engine
class AutoGrader:
def grade_attempt(self, attempt_id):
attempt = self.db.get_attempt(attempt_id)
questions = self.db.get_questions(attempt.quiz_id)
total_points = 0
max_points = 0
results = []
for question in questions:
max_points += question.points
if question.type == 'mcq' or question.type == 'multiple_select':
points = self._grade_mcq(question, attempt.answers[question.id])
elif question.type == 'coding':
points = self._grade_coding(question, attempt.answers[question.id])
elif question.type == 'fill_blank':
points = self._grade_fill_blank(question, attempt.answers[question.id])
else:
points = 0 # Manual grading required
total_points += points
results.append({
'question_id': question.id,
'points_earned': points,
'points_possible': question.points,
'correct': points == question.points
})
score = (total_points / max_points) * 100 if max_points > 0 else 0
# Update attempt
self.db.update_attempt(attempt_id, {
'score': score,
'results': results,
'graded_at': datetime.now()
})
# Send notification
self.notification_service.send_quiz_result(attempt.user_id, score)
return score
def _grade_coding(self, question, submission):
"""Run code against test cases in isolated sandbox"""
test_cases = question.metadata['test_cases']
language = submission['language']
code = submission['code']
# Execute in Docker container with resource limits
result = self.code_executor.run({
'language': language,
'code': code,
'test_cases': test_cases,
'timeout': 30, # seconds
'memory_limit': '256MB'
})
passed = sum(1 for tc in result['results'] if tc['passed'])
total = len(test_cases)
# Partial credit based on test cases passed
return question.points * (passed / total)
Code Execution Sandbox
# Docker-based code execution with security
class CodeExecutor:
def run(self, execution_request):
# Create isolated container
container = docker.create_container(
image=f"code-executor:{execution_request['language']}",
command=self._build_command(execution_request),
mem_limit=execution_request['memory_limit'],
network_disabled=True, # No internet access
read_only=True, # No write access to filesystem
pids_limit=50, # Limit process creation
ulimits=[
docker.types.Ulimit(name='cpu', soft=30, hard=30)
]
)
try:
docker.start(container)
result = docker.wait(container, timeout=execution_request['timeout'])
logs = docker.logs(container)
return self._parse_results(logs, execution_request['test_cases'])
except TimeoutError:
return {'error': 'Time limit exceeded'}
finally:
docker.remove_container(container, force=True)
3.4 Progress Tracking and Completion
Real-Time Progress Architecture
Student watches video → Progress Event → Kafka → Progress Service
↓
Redis (Cache)
↓
PostgreSQL (Persistent)
Progress Schema
CREATE TABLE enrollments (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id),
course_id UUID REFERENCES courses(id),
enrolled_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP,
progress_percentage DECIMAL(5, 2) DEFAULT 0,
last_accessed_at TIMESTAMP,
CONSTRAINT unique_enrollment UNIQUE(user_id, course_id)
);
CREATE TABLE lesson_progress (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
enrollment_id UUID REFERENCES enrollments(id),
lesson_id UUID REFERENCES lessons(id),
status VARCHAR(20) CHECK (status IN ('not_started', 'in_progress', 'completed')),
progress_seconds INTEGER DEFAULT 0,
completed_at TIMESTAMP,
last_position_seconds INTEGER DEFAULT 0,
CONSTRAINT unique_lesson_progress UNIQUE(enrollment_id, lesson_id)
);
CREATE INDEX idx_lesson_progress_enrollment ON lesson_progress(enrollment_id);
Real-Time Progress Updates
class ProgressTracker:
def update_video_progress(self, user_id, course_id, lesson_id, current_seconds, duration_seconds):
# Update Redis cache for fast access
cache_key = f"progress:{user_id}:{lesson_id}"
self.redis.hset(cache_key, {
'current_seconds': current_seconds,
'duration_seconds': duration_seconds,
'timestamp': time.time()
})
self.redis.expire(cache_key, 86400) # 24 hours
# Mark as completed if watched 90%+
progress_percentage = (current_seconds / duration_seconds) * 100
if progress_percentage >= 90:
self._mark_lesson_completed(user_id, course_id, lesson_id)
# Publish event to Kafka for async processing
kafka.produce('lesson.progress', {
'user_id': user_id,
'course_id': course_id,
'lesson_id': lesson_id,
'progress_seconds': current_seconds,
'timestamp': time.time()
})
def _mark_lesson_completed(self, user_id, course_id, lesson_id):
# Update lesson progress
self.db.execute("""
UPDATE lesson_progress
SET status = 'completed', completed_at = NOW()
WHERE enrollment_id = (
SELECT id FROM enrollments
WHERE user_id = %s AND course_id = %s
) AND lesson_id = %s
""", (user_id, course_id, lesson_id))
# Recalculate course progress
progress = self._calculate_course_progress(user_id, course_id)
self.db.execute("""
UPDATE enrollments
SET progress_percentage = %s, last_accessed_at = NOW()
WHERE user_id = %s AND course_id = %s
""", (progress, user_id, course_id))
# Check if course is completed
if progress >= 100:
self._complete_course(user_id, course_id)
def _calculate_course_progress(self, user_id, course_id):
result = self.db.query("""
SELECT
COUNT(*) as total_lessons,
SUM(CASE WHEN lp.status = 'completed' THEN 1 ELSE 0 END) as completed_lessons
FROM lessons l
JOIN sections s ON l.section_id = s.id
LEFT JOIN lesson_progress lp ON l.id = lp.lesson_id
LEFT JOIN enrollments e ON lp.enrollment_id = e.id
WHERE s.course_id = %s AND e.user_id = %s
""", (course_id, user_id))
if result['total_lessons'] == 0:
return 0
return (result['completed_lessons'] / result['total_lessons']) * 100
def _complete_course(self, user_id, course_id):
# Mark enrollment as completed
self.db.execute("""
UPDATE enrollments
SET completed_at = NOW()
WHERE user_id = %s AND course_id = %s
""", (user_id, course_id))
# Trigger certificate generation
certificate_service.generate(user_id, course_id)
# Send congratulations email
notification_service.send_completion_email(user_id, course_id)
# Update recommendations
kafka.produce('course.completed', {
'user_id': user_id,
'course_id': course_id,
'timestamp': time.time()
})
Progress Sync Across Devices
// Client-side progress sync with debouncing
class ProgressSync {
constructor(userId, courseId, lessonId) {
this.userId = userId;
this.courseId = courseId;
this.lessonId = lessonId;
this.lastSyncTime = 0;
this.syncInterval = 10000; // 10 seconds
}
onVideoProgress(currentSeconds, duration) {
const now = Date.now();
// Sync every 10 seconds or at key milestones
if (now - this.lastSyncTime > this.syncInterval ||
this.isKeyMilestone(currentSeconds, duration)) {
this.syncProgress(currentSeconds, duration);
this.lastSyncTime = now;
}
}
isKeyMilestone(current, duration) {
const progress = (current / duration) * 100;
// Sync at 25%, 50%, 75%, 90%, 100%
return [25, 50, 75, 90, 100].some(
milestone => Math.abs(progress - milestone) < 1
);
}
async syncProgress(currentSeconds, duration) {
try {
await fetch('/api/v1/progress/update', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
user_id: this.userId,
course_id: this.courseId,
lesson_id: this.lessonId,
current_seconds: currentSeconds,
duration_seconds: duration
})
});
} catch (error) {
// Queue for retry
this.queueOfflineSync(currentSeconds, duration);
}
}
}
3.5 Certificate Generation
Certificate Service Architecture
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from PIL import Image
import hashlib
from web3 import Web3
class CertificateGenerator:
def generate(self, user_id, course_id):
# Get data
user = self.db.get_user(user_id)
course = self.db.get_course(course_id)
enrollment = self.db.get_enrollment(user_id, course_id)
# Generate unique certificate ID
cert_id = self._generate_certificate_id(user_id, course_id)
# Create certificate record
certificate = self.db.create_certificate({
'id': cert_id,
'user_id': user_id,
'course_id': course_id,
'issued_at': datetime.now(),
'verification_hash': self._generate_verification_hash(cert_id, user_id, course_id)
})
# Generate PDF
pdf_path = self._create_pdf(user, course, certificate)
# Upload to S3
s3_url = self._upload_to_s3(pdf_path, cert_id)
# Record on blockchain for verification
blockchain_tx = self._record_on_blockchain(certificate)
# Update certificate with URLs
self.db.update_certificate(cert_id, {
'pdf_url': s3_url,
'blockchain_tx': blockchain_tx
})
# Send email with certificate
self.notification_service.send_certificate_email(user_id, s3_url)
return certificate
def _create_pdf(self, user, course, certificate):
filename = f"/tmp/cert_{certificate['id']}.pdf"
c = canvas.Canvas(filename, pagesize=letter)
width, height = letter
# Add background template
c.drawImage('certificate_template.png', 0, 0, width, height)
# Add user name
c.setFont("Helvetica-Bold", 36)
c.drawCentredString(width/2, height - 300, user['full_name'])
# Add course name
c.setFont("Helvetica", 24)
c.drawCentredString(width/2, height - 380, course['title'])
# Add completion date
c.setFont("Helvetica", 14)
completion_date = certificate['issued_at'].strftime("%B %d, %Y")
c.drawCentredString(width/2, height - 450, f"Completed on {completion_date}")
# Add certificate ID and QR code for verification
c.setFont("Courier", 10)
c.drawString(50, 50, f"Certificate ID: {certificate['id']}")
c.drawString(50, 30, f"Verify at: https://platform.com/verify/{certificate['id']}")
# Add QR code
qr_img = self._generate_qr_code(certificate['id'])
c.drawImage(qr_img, width - 150, 30, 100, 100)
# Add digital signature
c.drawString(width/2 - 100, 150, "Digitally Signed")
c.save()
return filename
def _record_on_blockchain(self, certificate):
"""Record certificate hash on Ethereum for verification"""
w3 = Web3(Web3.HTTPProvider(ETHEREUM_NODE_URL))
# Create hash of certificate data
cert_hash = hashlib.sha256(
f"{certificate['id']}{certificate['user_id']}{certificate['course_id']}".encode()
).hexdigest()
# Call smart contract to record hash
contract = w3.eth.contract(address=CERT_CONTRACT_ADDRESS, abi=CERT_CONTRACT_ABI)
tx_hash = contract.functions.recordCertificate(
certificate['id'],
cert_hash
).transact({'from': PLATFORM_WALLET_ADDRESS})
receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
return receipt['transactionHash'].hex()
3.6 Discussion Forums with Threading
Discussion Schema (MongoDB)
// discussions collection
{
_id: ObjectId,
course_id: UUID,
lesson_id: UUID, // null for course-level discussions
user_id: UUID,
title: String,
content: String,
type: 'question' | 'discussion',
tags: [String],
is_resolved: Boolean,
upvotes: Number,
view_count: Number,
created_at: ISODate,
updated_at: ISODate,
// Denormalized for performance
author: {
name: String,
avatar_url: String,
badges: ['instructor', 'ta', 'top_contributor']
},
reply_count: Number,
last_reply_at: ISODate
}
// replies collection (nested comments)
{
_id: ObjectId,
discussion_id: ObjectId,
parent_reply_id: ObjectId, // null for top-level replies
user_id: UUID,
content: String,
upvotes: Number,
is_answer: Boolean, // marked by instructor
created_at: ISODate,
author: {
name: String,
avatar_url: String,
badges: []
},
// For nested threading
path: String, // e.g., "001.002.001" for tree traversal
depth: Number
}
Discussion Service API
class DiscussionService:
def create_discussion(self, user_id, course_id, lesson_id, title, content):
# Check enrollment
if not self._is_enrolled(user_id, course_id):
raise PermissionError("User not enrolled in course")
discussion = {
'course_id': course_id,
'lesson_id': lesson_id,
'user_id': user_id,
'title': title,
'content': content,
'type': 'question',
'is_resolved': False,
'upvotes': 0,
'view_count': 0,
'reply_count': 0,
'created_at': datetime.utcnow()
}
# Get author info
user = self.user_service.get_user(user_id)
discussion['author'] = {
'name': user['full_name'],
'avatar_url': user['avatar_url'],
'badges': self._get_user_badges(user_id, course_id)
}
result = self.mongo.discussions.insert_one(discussion)
discussion_id = result.inserted_id
# Index in Elasticsearch for search
self.es.index(index='discussions', id=str(discussion_id), body=discussion)
# Notify instructor and TAs
self.notification_service.notify_new_discussion(course_id, discussion_id)
return discussion_id
def add_reply(self, discussion_id, user_id, content, parent_reply_id=None):
# Calculate path for nested threading
if parent_reply_id:
parent = self.mongo.replies.find_one({'_id': ObjectId(parent_reply_id)})
path = f"{parent['path']}.{self._get_next_sibling_index(parent['path'])}"
depth = parent['depth'] + 1
else:
path = f"{self._get_next_sibling_index(discussion_id)}"
depth = 0
reply = {
'discussion_id': ObjectId(discussion_id),
'parent_reply_id': ObjectId(parent_reply_id) if parent_reply_id else None,
'user_id': user_id,
'content': content,
'upvotes': 0,
'is_answer': False,
'created_at': datetime.utcnow(),
'path': path,
'depth': depth
}
result = self.mongo.replies.insert_one(reply)
# Update discussion reply count
self.mongo.discussions.update_one(
{'_id': ObjectId(discussion_id)},
{
'$inc': {'reply_count': 1},
'$set': {'last_reply_at': datetime.utcnow()}
}
)
# Send notification to discussion participants
self._notify_participants(discussion_id, user_id)
return result.inserted_id
def get_discussion_with_replies(self, discussion_id):
# Get discussion
discussion = self.mongo.discussions.find_one({'_id': ObjectId(discussion_id)})
# Increment view count
self.mongo.discussions.update_one(
{'_id': ObjectId(discussion_id)},
{'$inc': {'view_count': 1}}
)
# Get all replies sorted by path for tree structure
replies = list(self.mongo.replies.find(
{'discussion_id': ObjectId(discussion_id)}
).sort('path', 1))
# Build threaded structure
discussion['replies'] = self._build_reply_tree(replies)
return discussion
def _build_reply_tree(self, replies):
"""Convert flat list of replies into nested tree structure"""
reply_map = {str(r['_id']): r for r in replies}
tree = []
for reply in replies:
reply['children'] = []
if reply['parent_reply_id']:
parent_id = str(reply['parent_reply_id'])
if parent_id in reply_map:
reply_map[parent_id]['children'].append(reply)
else:
tree.append(reply)
return tree
3.7 Recommendation Engine for Courses
Recommendation Architecture
User Activity → Kafka → Feature Store → ML Model → Redis Cache → API
↓
Offline Training
(Airflow + Spark)
Feature Engineering
class FeatureStore:
def compute_user_features(self, user_id):
# Enrollment history
enrolled_courses = self.db.get_user_enrollments(user_id)
completed_courses = [c for c in enrolled_courses if c['completed_at']]
# Categories of interest
categories = self._extract_categories(enrolled_courses)
# Difficulty preference
avg_difficulty = self._calculate_avg_difficulty(completed_courses)
# Learning pace
avg_completion_time = self._calculate_avg_completion_time(completed_courses)
# Engagement metrics
avg_quiz_score = self._calculate_avg_quiz_score(user_id)
discussion_participation = self._count_discussions(user_id)
return {
'user_id': user_id,
'enrolled_count': len(enrolled_courses),
'completed_count': len(completed_courses),
'completion_rate': len(completed_courses) / len(enrolled_courses) if enrolled_courses else 0,
'preferred_categories': categories,
'preferred_difficulty': avg_difficulty,
'avg_completion_days': avg_completion_time,
'avg_quiz_score': avg_quiz_score,
'discussion_count': discussion_participation
}
def compute_course_features(self, course_id):
course = self.db.get_course(course_id)
# Popularity metrics
enrollment_count = self.db.count_enrollments(course_id)
completion_count = self.db.count_completions(course_id)
# Quality metrics
avg_rating = self.db.get_avg_rating(course_id)
review_count = self.db.count_reviews(course_id)
# Content metadata
total_duration = self.db.get_total_duration(course_id)
lesson_count = self.db.count_lessons(course_id)
return {
'course_id': course_id,
'category': course['category_id'],
'difficulty': course['difficulty'],
'price': course['price'],
'enrollment_count': enrollment_count,
'completion_rate': completion_count / enrollment_count if enrollment_count else 0,
'avg_rating': avg_rating,
'review_count': review_count,
'total_hours': total_duration / 3600,
'lesson_count': lesson_count
}
ML Recommendation Models
import tensorflow as tf
from sklearn.metrics.pairwise import cosine_similarity
class CourseRecommender:
def __init__(self):
self.collaborative_model = self._load_collaborative_model()
self.content_model = self._load_content_model()
self.hybrid_weights = {'collaborative': 0.6, 'content': 0.4}
def get_recommendations(self, user_id, top_k=10):
# Collaborative filtering recommendations
collab_scores = self._collaborative_filtering(user_id)
# Content-based recommendations
content_scores = self._content_based_filtering(user_id)
# Hybrid approach
hybrid_scores = {}
all_courses = set(collab_scores.keys()) | set(content_scores.keys())
for course_id in all_courses:
hybrid_scores[course_id] = (
self.hybrid_weights['collaborative'] * collab_scores.get(course_id, 0) +
self.hybrid_weights['content'] * content_scores.get(course_id, 0)
)
# Filter out already enrolled courses
enrolled = self._get_enrolled_courses(user_id)
hybrid_scores = {k: v for k, v in hybrid_scores.items() if k not in enrolled}
# Sort and return top K
recommendations = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
# Add explanation for each recommendation
return [self._add_explanation(user_id, course_id, score)
for course_id, score in recommendations]
def _collaborative_filtering(self, user_id):
"""Matrix factorization using neural collaborative filtering"""
user_embedding = self.collaborative_model.get_user_embedding(user_id)
all_course_embeddings = self.collaborative_model.get_all_course_embeddings()
# Compute similarity scores
scores = cosine_similarity([user_embedding], all_course_embeddings)[0]
return dict(zip(all_course_embeddings.index, scores))
def _content_based_filtering(self, user_id):
"""Based on user's historical preferences"""
user_features = self.feature_store.compute_user_features(user_id)
# Get all courses in preferred categories
candidate_courses = self.db.get_courses_by_categories(
user_features['preferred_categories']
)
scores = {}
for course in candidate_courses:
course_features = self.feature_store.compute_course_features(course['id'])
scores[course['id']] = self._compute_content_similarity(
user_features, course_features
)
return scores
def _add_explanation(self, user_id, course_id, score):
"""Generate human-readable explanation for recommendation"""
course = self.db.get_course(course_id)
user_history = self.db.get_user_enrollments(user_id)
reasons = []
# Check category match
if any(c['category_id'] == course['category_id'] for c in user_history):
reasons.append(f"Based on your interest in {course['category_name']}")
# Check popularity
if course['enrollment_count'] > 10000:
reasons.append(f"Popular choice with {course['enrollment_count']} students")
# Check rating
if course['avg_rating'] >= 4.5:
reasons.append(f"Highly rated ({course['avg_rating']}/5)")
return {
'course_id': course_id,
'course': course,
'score': score,
'reasons': reasons
}
3.8 Payment and Subscription Management
Payment Schema
CREATE TABLE payments (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id),
order_id UUID UNIQUE NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
currency VARCHAR(3) DEFAULT 'USD',
payment_method VARCHAR(50), -- 'stripe', 'paypal', 'apple_pay'
status VARCHAR(20) CHECK (status IN ('pending', 'completed', 'failed', 'refunded')),
payment_gateway_id VARCHAR(255), -- Stripe payment intent ID
metadata JSONB,
created_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP
);
CREATE TABLE subscriptions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id),
plan_type VARCHAR(20) CHECK (plan_type IN ('monthly', 'annual')),
status VARCHAR(20) CHECK (status IN ('active', 'cancelled', 'expired', 'past_due')),
price DECIMAL(10, 2),
currency VARCHAR(3),
current_period_start TIMESTAMP,
current_period_end TIMESTAMP,
stripe_subscription_id VARCHAR(255),
cancelled_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE revenue_shares (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
payment_id UUID REFERENCES payments(id),
instructor_id UUID REFERENCES users(id),
course_id UUID REFERENCES courses(id),
instructor_amount DECIMAL(10, 2),
platform_amount DECIMAL(10, 2),
processed BOOLEAN DEFAULT FALSE,
processed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW()
);
Payment Processing
import stripe
class PaymentService:
def __init__(self):
stripe.api_key = STRIPE_SECRET_KEY
def process_course_purchase(self, user_id, course_id, payment_method_id):
user = self.db.get_user(user_id)
course = self.db.get_course(course_id)
# Check if already enrolled
if self.db.is_enrolled(user_id, course_id):
raise ValueError("User already enrolled in course")
# Create order
order_id = str(uuid.uuid4())
amount = course['price']
try:
# Create Stripe payment intent
payment_intent = stripe.PaymentIntent.create(
amount=int(amount * 100), # Convert to cents
currency='usd',
customer=user['stripe_customer_id'],
payment_method=payment_method_id,
confirm=True,
metadata={
'order_id': order_id,
'user_id': user_id,
'course_id': course_id
}
)
# Create payment record
payment = self.db.create_payment({
'user_id': user_id,
'order_id': order_id,
'amount': amount,
'payment_method': 'stripe',
'status': 'pending',
'payment_gateway_id': payment_intent.id
})
# Wait for confirmation
if payment_intent.status == 'succeeded':
self._complete_payment(payment['id'], course_id)
return payment
except stripe.error.CardError as e:
# Card declined
self.db.create_payment({
'user_id': user_id,
'order_id': order_id,
'amount': amount,
'payment_method': 'stripe',
'status': 'failed',
'metadata': {'error': str(e)}
})
raise
def _complete_payment(self, payment_id, course_id):
# Update payment status
self.db.update_payment(payment_id, {
'status': 'completed',
'completed_at': datetime.now()
})
# Enroll user in course
payment = self.db.get_payment(payment_id)
self.db.create_enrollment({
'user_id': payment['user_id'],
'course_id': course_id,
'enrolled_at': datetime.now()
})
# Calculate revenue share
course = self.db.get_course(course_id)
instructor_share = payment['amount'] * 0.7 # 70% to instructor
platform_share = payment['amount'] * 0.3 # 30% platform fee
self.db.create_revenue_share({
'payment_id': payment_id,
'instructor_id': course['instructor_id'],
'course_id': course_id,
'instructor_amount': instructor_share,
'platform_amount': platform_share
})
# Send receipt email
self.notification_service.send_purchase_receipt(
payment['user_id'],
course_id,
payment_id
)
# Publish event
kafka.produce('course.purchased', {
'user_id': payment['user_id'],
'course_id': course_id,
'amount': payment['amount']
})
def create_subscription(self, user_id, plan_type):
user = self.db.get_user(user_id)
# Define pricing
prices = {'monthly': 29.99, 'annual': 299.99}
price = prices[plan_type]
# Create Stripe subscription
subscription = stripe.Subscription.create(
customer=user['stripe_customer_id'],
items=[{'price': STRIPE_PRICE_IDS[plan_type]}],
metadata={'user_id': user_id}
)
# Create subscription record
self.db.create_subscription({
'user_id': user_id,
'plan_type': plan_type,
'status': 'active',
'price': price,
'current_period_start': datetime.fromtimestamp(subscription.current_period_start),
'current_period_end': datetime.fromtimestamp(subscription.current_period_end),
'stripe_subscription_id': subscription.id
})
return subscription
def handle_webhook(self, payload, signature):
"""Handle Stripe webhooks for subscription events"""
event = stripe.Webhook.construct_event(
payload, signature, STRIPE_WEBHOOK_SECRET
)
if event.type == 'invoice.payment_succeeded':
self._handle_payment_succeeded(event.data.object)
elif event.type == 'invoice.payment_failed':
self._handle_payment_failed(event.data.object)
elif event.type == 'customer.subscription.deleted':
self._handle_subscription_cancelled(event.data.object)
Step 4: Wrap-up
Capacity Estimation
Storage
- Video content: 500K courses × 20 videos × 5GB = 50 PB
- User data: 100M users × 10KB = 1 TB
- Quiz data: 10M quizzes × 100KB = 1 TB
- Discussion data: 50M posts × 5KB = 250 GB
- Total: ~50 PB (primarily video)
Bandwidth
- Video streaming: 1M concurrent × 5 Mbps = 5 Tbps
- API traffic: 10M DAU × 100 requests/day × 10KB = 10 TB/day
- Peak: 5+ Tbps
Database
- PostgreSQL: 10K writes/sec, 100K reads/sec
- MongoDB: 50K writes/sec (discussions)
- Redis: 500K ops/sec (cache + progress)
Key Design Decisions
- Microservices Architecture: Enables independent scaling and deployment of course, video, assessment, and payment services
- Video CDN with DRM: Multi-CDN strategy with Widevine/FairPlay for content protection
- Real-time Progress Tracking: Redis cache with Kafka async persistence for sub-second updates
- Hybrid Recommendations: Combining collaborative filtering and content-based approaches for better accuracy
- Blockchain Certificates: Immutable verification using Ethereum smart contracts
- Auto-grading with Sandboxing: Docker-based code execution with resource limits for security
- Threaded Discussions: Path-based threading in MongoDB for efficient nested comment retrieval
Monitoring and Observability
Key Metrics
- Video start time (p50, p95, p99)
- Video buffering ratio
- API latency by endpoint
- Payment success rate
- Course completion rate
- Daily active users
- Revenue per user
Tools
- Datadog for infrastructure monitoring
- Sentry for error tracking
- ELK stack for log aggregation
- Prometheus + Grafana for custom metrics
- AWS CloudWatch for AWS resources
Security Considerations
- WAF (Web Application Firewall) for DDoS protection
- OAuth 2.0 + JWT for authentication
- Encrypted video content with token-based access
- PCI DSS compliance for payment processing
- Rate limiting: 1000 req/min per user
- SQL injection prevention with parameterized queries
- XSS protection with content security policy
- Regular security audits and penetration testing
Future Enhancements
- Live streaming with real-time interaction
- AR/VR course experiences
- AI-powered course creation assistant
- Automated subtitle generation using speech-to-text
- Adaptive learning paths based on performance
- Gamification with badges and leaderboards
- Mobile offline mode with sync
- White-label solutions for enterprises
- Integration with LMS systems (Canvas, Moodle)
- Multi-tenant architecture for B2B
This architecture provides a robust, scalable foundation for an e-learning platform serving millions of users with high-quality video content, engaging assessments, and personalized learning experiences.
Comments