Design E-Learning Platform

Designing an e-learning platform at scale (like Coursera, Udemy, or Khan Academy) requires handling millions of concurrent learners, petabytes of video content, real-time progress tracking, secure payment processing, and personalized recommendations. This document outlines a production-grade architecture for such a system.


Step 1: Requirements Clarification

Functional Requirements

Course Creation & Management

  • Instructors can create, update, and publish courses with rich content (videos, PDFs, code exercises)
  • Course versioning and draft management
  • Multi-language support with subtitles
  • Course categorization and tagging
  • Bulk content upload and processing

Video Lessons

  • Adaptive bitrate video streaming (ABR) supporting 240p to 4K resolution
  • Video playback with speed controls (0.5x to 2x)
  • Video seeking and resume from last position
  • Picture-in-picture mode
  • Offline download for mobile apps
  • DRM protection for premium content

Quizzes & Assessments

  • Multiple question types (MCQ, coding, essay, file upload)
  • Timed assessments with auto-submission
  • Auto-grading for objective questions
  • Peer review for subjective assessments
  • Plagiarism detection for code submissions
  • Instant feedback and explanations
  • Proctored exam support

Progress Tracking

  • Real-time progress updates across devices
  • Course completion percentage
  • Learning streak tracking
  • Time spent per lesson
  • Quiz scores and analytics
  • Bookmarks and notes

Certificates

  • Auto-generate certificates upon course completion
  • Verifiable certificates with unique IDs
  • Digital signature and blockchain verification
  • PDF download and LinkedIn integration
  • Certificate templates customization

Payment & Subscriptions

  • One-time course purchases
  • Subscription plans (monthly/annual)
  • Revenue sharing with instructors
  • Multiple payment gateways (Stripe, PayPal, regional)
  • Coupons and promotional codes
  • Refund processing
  • Invoice generation

Discussion Forums

  • Threaded discussions per lesson
  • Q&A with upvoting/downvoting
  • Instructor and TA badges
  • Mark as resolved
  • Notifications for replies
  • Search and filtering

Additional Features

  • Course recommendations based on history and preferences
  • Live classes with WebRTC
  • Assignment submissions
  • Instructor dashboard with analytics
  • Student dashboard with learning path
  • Course reviews and ratings
  • Email notifications for course updates

Non-Functional Requirements

Scale

  • 100M+ registered users
  • 10M+ daily active users
  • 1M+ concurrent video streams
  • 500K+ courses
  • 10M+ video assets
  • 1M+ daily quiz submissions

Performance

  • Video start time < 2 seconds
  • API latency p99 < 200ms
  • Search latency < 500ms
  • Real-time progress sync < 1 second
  • Quiz submission processing < 3 seconds

Availability

  • 99.99% uptime (52 minutes downtime/year)
  • Zero data loss for payment transactions
  • Graceful degradation during outages
  • Multi-region failover

Security

  • End-to-end encryption for payments
  • DRM for video content
  • User authentication with OAuth 2.0
  • Rate limiting and DDoS protection
  • GDPR and CCPA compliance
  • PCI DSS compliance for payments

Other

  • Support 50+ languages
  • Mobile-first design
  • Accessibility (WCAG 2.1 Level AA)
  • SEO optimization

Step 2: High-Level Architecture

System Components

┌─────────────────────────────────────────────────────────────────┐
│                         Client Layer                            │
│  (Web, iOS, Android, TV Apps, Third-party Integrations)        │
└────────────┬────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                     CDN (CloudFront/Akamai)                     │
│           (Static Assets, Video Streaming, Images)              │
└────────────┬────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│              API Gateway + Load Balancer (ALB)                  │
│     (Rate Limiting, Auth, Routing, SSL Termination)            │
└────────────┬────────────────────────────────────────────────────┘


┌────────────────────────────────────────────────────────────────────────┐
│                        Microservices Layer                             │
├────────────┬─────────────┬──────────────┬─────────────┬──────────────┤
│  Course    │   Video     │  Assessment  │  Progress   │   Payment    │
│  Service   │  Service    │   Service    │   Service   │   Service    │
├────────────┼─────────────┼──────────────┼─────────────┼──────────────┤
│ Discussion │    User     │  Certificate │ Notification│Recommendation│
│  Service   │  Service    │   Service    │   Service   │   Service    │
└────────────┴─────────────┴──────────────┴─────────────┴──────────────┘


┌────────────────────────────────────────────────────────────────────────┐
│                          Data Layer                                    │
├─────────────────┬──────────────────┬───────────────┬──────────────────┤
│  PostgreSQL     │   MongoDB        │  Redis        │  Elasticsearch   │
│  (Courses,      │   (Discussions,  │  (Cache,      │  (Course Search, │
│   Users,        │    Logs,         │   Sessions,   │   Full-text)     │
│   Payments)     │    Analytics)    │   Progress)   │                  │
└─────────────────┴──────────────────┴───────────────┴──────────────────┘


┌────────────────────────────────────────────────────────────────────────┐
│                     Storage & Processing Layer                         │
├─────────────────┬──────────────────┬───────────────┬──────────────────┤
│   S3/Blob       │   Video          │   Kafka       │   Airflow        │
│   (Raw Video,   │   Transcoding    │   (Events,    │   (ETL, ML       │
│    Documents)   │   (FFmpeg)       │    Streams)   │    Pipelines)    │
└─────────────────┴──────────────────┴───────────────┴──────────────────┘

Core Services

1. Course Service

  • Manages course metadata, curriculum, and content
  • Handles course creation, updates, and publishing
  • Course versioning and draft management
  • Technologies: Java/Kotlin with Spring Boot, PostgreSQL

2. Video Service

  • Handles video upload, transcoding, and streaming
  • Generates adaptive bitrate streams (HLS/DASH)
  • Manages DRM and content protection
  • Technologies: Node.js, FFmpeg, AWS MediaConvert, Wowza

3. Assessment Service

  • Manages quizzes, assignments, and exams
  • Auto-grading engine for objective questions
  • Code execution sandbox for programming assessments
  • Technologies: Python/Django, Docker, PostgreSQL

4. Progress Service

  • Tracks student progress in real-time
  • Calculates completion percentages
  • Manages bookmarks and resume points
  • Technologies: Go, Redis, Kafka, PostgreSQL

5. Payment Service

  • Processes payments and subscriptions
  • Manages invoices and refunds
  • Revenue sharing calculations
  • Technologies: Java, PostgreSQL, Stripe SDK, PayPal SDK

6. Discussion Service

  • Manages course forums and Q&A
  • Threaded conversations
  • Real-time notifications
  • Technologies: Node.js, MongoDB, WebSocket, Redis Pub/Sub

7. Certificate Service

  • Generates and validates certificates
  • PDF rendering with custom templates
  • Blockchain-based verification
  • Technologies: Python, Celery, PostgreSQL, Ethereum/Hyperledger

8. Recommendation Service

  • ML-based course recommendations
  • Collaborative filtering and content-based filtering
  • A/B testing framework
  • Technologies: Python, TensorFlow, Feature Store, S3

9. Notification Service

  • Email, push, and in-app notifications
  • Template management
  • Delivery tracking
  • Technologies: Node.js, SendGrid, Firebase, SQS

Step 3: Deep Dives

3.1 Video Streaming with DRM

Video Upload Pipeline

Instructor Upload → S3 Bucket → Lambda Trigger → MediaConvert

                                                  Transcoding

                    ┌──────────────────────────────────┴────────────┐
                    ▼                                                ▼
            HLS Segments (.m3u8)                              DASH Segments
            (240p, 360p, 480p, 720p, 1080p, 4K)              (.mpd format)
                    ↓                                                ↓
                S3 + CloudFront                              DRM Encryption
                                                        (Widevine, FairPlay)

Adaptive Bitrate Streaming (ABR)

  • Use HLS (Apple) and DASH (universal) protocols
  • Generate multiple quality variants during transcoding
  • Client automatically switches quality based on bandwidth
  • Segment duration: 6-10 seconds for optimal seeking
  • Pre-generate thumbnails for video scrubbing (1 per 5 seconds)

DRM Implementation

// DRM Configuration
const drmConfig = {
  widevine: {
    licenseUrl: 'https://drm.platform.com/widevine',
    certificateUrl: 'https://drm.platform.com/cert'
  },
  fairplay: {
    licenseUrl: 'https://drm.platform.com/fairplay',
    certificateUrl: 'https://drm.platform.com/fairplay-cert'
  },
  playready: {
    licenseUrl: 'https://drm.platform.com/playready'
  }
};

// Token-based access control
function generateVideoToken(userId, courseId, videoId) {
  return jwt.sign({
    userId, courseId, videoId,
    exp: Math.floor(Date.now() / 1000) + (60 * 60) // 1 hour
  }, DRM_SECRET);
}

Video Playback Optimization

  • Pre-fetch next lesson in background
  • Use Service Workers for offline caching
  • Implement backoff retry for failed segments
  • CDN with 200+ edge locations for low latency
  • Hot content in memory cache at CDN edge

Watermarking for Piracy Prevention

# Dynamic watermarking with user info
def add_forensic_watermark(video_url, user_id, course_id):
    """Add invisible watermark with user identification"""
    watermark_data = {
        'user_id': user_id,
        'timestamp': int(time.time()),
        'course_id': course_id
    }
    # Encode watermark in frequency domain (invisible)
    return watermark_service.embed(video_url, watermark_data)

3.2 Course Content Management

Course Schema Design (PostgreSQL)

-- Courses table with versioning
CREATE TABLE courses (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    instructor_id UUID NOT NULL REFERENCES users(id),
    title VARCHAR(255) NOT NULL,
    description TEXT,
    category_id INTEGER REFERENCES categories(id),
    difficulty VARCHAR(20) CHECK (difficulty IN ('beginner', 'intermediate', 'advanced')),
    price DECIMAL(10, 2),
    language VARCHAR(10),
    status VARCHAR(20) DEFAULT 'draft', -- draft, published, archived
    version INTEGER DEFAULT 1,
    published_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW(),
    CONSTRAINT unique_course_version UNIQUE(id, version)
);

CREATE INDEX idx_courses_instructor ON courses(instructor_id);
CREATE INDEX idx_courses_category ON courses(category_id);
CREATE INDEX idx_courses_status ON courses(status);

-- Course sections (modules)
CREATE TABLE sections (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    course_id UUID REFERENCES courses(id) ON DELETE CASCADE,
    title VARCHAR(255) NOT NULL,
    order_index INTEGER NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

-- Course lessons (videos, readings, quizzes)
CREATE TABLE lessons (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    section_id UUID REFERENCES sections(id) ON DELETE CASCADE,
    title VARCHAR(255) NOT NULL,
    type VARCHAR(20) CHECK (type IN ('video', 'reading', 'quiz', 'assignment', 'live')),
    content_url TEXT,
    duration_seconds INTEGER,
    order_index INTEGER NOT NULL,
    is_preview BOOLEAN DEFAULT FALSE, -- Allow preview without enrollment
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_lessons_section ON lessons(section_id);

Content Delivery Strategy

  • Static assets (PDFs, images) → S3 + CloudFront
  • Video content → Separate S3 bucket with lifecycle policies
  • Code exercises → Git repositories with isolated sandboxes
  • Use multipart upload for large files (>100MB)
  • Generate signed URLs with 1-hour expiration for security

Course Publishing Workflow

class CoursePublisher:
    def publish_course(self, course_id, instructor_id):
        # Validation checks
        self._validate_course_content(course_id)
        self._validate_pricing(course_id)
        self._validate_prerequisites(course_id)

        # Generate preview assets
        self._generate_thumbnail(course_id)
        self._generate_preview_video(course_id)

        # Update search index
        elasticsearch.index_course(course_id)

        # Invalidate cache
        redis.delete(f"course:{course_id}")

        # Publish to CDN
        cdn.invalidate_cache(f"/courses/{course_id}/*")

        # Notify followers
        notification_service.notify_followers(instructor_id, course_id)

        # Update analytics
        kafka.produce('course.published', {
            'course_id': course_id,
            'instructor_id': instructor_id,
            'timestamp': time.time()
        })

3.3 Quiz and Assessment Engine with Auto-Grading

Assessment Types Architecture

┌─────────────────────────────────────────────────┐
│            Assessment Service                   │
├─────────────┬─────────────┬────────────────────┤
│    MCQ      │   Coding    │   Essay/File       │
│   Engine    │   Engine    │   Upload Engine    │
│ (Instant)   │ (Sandbox)   │ (Peer Review)      │
└─────────────┴─────────────┴────────────────────┘

Quiz Schema

CREATE TABLE quizzes (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    lesson_id UUID REFERENCES lessons(id),
    title VARCHAR(255),
    time_limit_minutes INTEGER,
    passing_score DECIMAL(5, 2),
    max_attempts INTEGER DEFAULT 3,
    shuffle_questions BOOLEAN DEFAULT TRUE,
    show_answers_after_submit BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE questions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    quiz_id UUID REFERENCES quizzes(id),
    type VARCHAR(20) CHECK (type IN ('mcq', 'multiple_select', 'true_false', 'fill_blank', 'coding', 'essay')),
    question_text TEXT NOT NULL,
    points DECIMAL(5, 2) DEFAULT 1.0,
    order_index INTEGER,
    metadata JSONB, -- Store options, correct answers, test cases
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE quiz_attempts (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES users(id),
    quiz_id UUID REFERENCES quizzes(id),
    attempt_number INTEGER,
    started_at TIMESTAMP DEFAULT NOW(),
    submitted_at TIMESTAMP,
    score DECIMAL(5, 2),
    time_spent_seconds INTEGER,
    answers JSONB,
    CONSTRAINT unique_attempt UNIQUE(user_id, quiz_id, attempt_number)
);

CREATE INDEX idx_attempts_user ON quiz_attempts(user_id);

Auto-Grading Engine

class AutoGrader:
    def grade_attempt(self, attempt_id):
        attempt = self.db.get_attempt(attempt_id)
        questions = self.db.get_questions(attempt.quiz_id)

        total_points = 0
        max_points = 0
        results = []

        for question in questions:
            max_points += question.points

            if question.type == 'mcq' or question.type == 'multiple_select':
                points = self._grade_mcq(question, attempt.answers[question.id])
            elif question.type == 'coding':
                points = self._grade_coding(question, attempt.answers[question.id])
            elif question.type == 'fill_blank':
                points = self._grade_fill_blank(question, attempt.answers[question.id])
            else:
                points = 0  # Manual grading required

            total_points += points
            results.append({
                'question_id': question.id,
                'points_earned': points,
                'points_possible': question.points,
                'correct': points == question.points
            })

        score = (total_points / max_points) * 100 if max_points > 0 else 0

        # Update attempt
        self.db.update_attempt(attempt_id, {
            'score': score,
            'results': results,
            'graded_at': datetime.now()
        })

        # Send notification
        self.notification_service.send_quiz_result(attempt.user_id, score)

        return score

    def _grade_coding(self, question, submission):
        """Run code against test cases in isolated sandbox"""
        test_cases = question.metadata['test_cases']
        language = submission['language']
        code = submission['code']

        # Execute in Docker container with resource limits
        result = self.code_executor.run({
            'language': language,
            'code': code,
            'test_cases': test_cases,
            'timeout': 30,  # seconds
            'memory_limit': '256MB'
        })

        passed = sum(1 for tc in result['results'] if tc['passed'])
        total = len(test_cases)

        # Partial credit based on test cases passed
        return question.points * (passed / total)

Code Execution Sandbox

# Docker-based code execution with security
class CodeExecutor:
    def run(self, execution_request):
        # Create isolated container
        container = docker.create_container(
            image=f"code-executor:{execution_request['language']}",
            command=self._build_command(execution_request),
            mem_limit=execution_request['memory_limit'],
            network_disabled=True,  # No internet access
            read_only=True,  # No write access to filesystem
            pids_limit=50,  # Limit process creation
            ulimits=[
                docker.types.Ulimit(name='cpu', soft=30, hard=30)
            ]
        )

        try:
            docker.start(container)
            result = docker.wait(container, timeout=execution_request['timeout'])
            logs = docker.logs(container)

            return self._parse_results(logs, execution_request['test_cases'])
        except TimeoutError:
            return {'error': 'Time limit exceeded'}
        finally:
            docker.remove_container(container, force=True)

3.4 Progress Tracking and Completion

Real-Time Progress Architecture

Student watches video → Progress Event → Kafka → Progress Service

                                                 Redis (Cache)

                                                 PostgreSQL (Persistent)

Progress Schema

CREATE TABLE enrollments (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES users(id),
    course_id UUID REFERENCES courses(id),
    enrolled_at TIMESTAMP DEFAULT NOW(),
    completed_at TIMESTAMP,
    progress_percentage DECIMAL(5, 2) DEFAULT 0,
    last_accessed_at TIMESTAMP,
    CONSTRAINT unique_enrollment UNIQUE(user_id, course_id)
);

CREATE TABLE lesson_progress (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    enrollment_id UUID REFERENCES enrollments(id),
    lesson_id UUID REFERENCES lessons(id),
    status VARCHAR(20) CHECK (status IN ('not_started', 'in_progress', 'completed')),
    progress_seconds INTEGER DEFAULT 0,
    completed_at TIMESTAMP,
    last_position_seconds INTEGER DEFAULT 0,
    CONSTRAINT unique_lesson_progress UNIQUE(enrollment_id, lesson_id)
);

CREATE INDEX idx_lesson_progress_enrollment ON lesson_progress(enrollment_id);

Real-Time Progress Updates

class ProgressTracker:
    def update_video_progress(self, user_id, course_id, lesson_id, current_seconds, duration_seconds):
        # Update Redis cache for fast access
        cache_key = f"progress:{user_id}:{lesson_id}"
        self.redis.hset(cache_key, {
            'current_seconds': current_seconds,
            'duration_seconds': duration_seconds,
            'timestamp': time.time()
        })
        self.redis.expire(cache_key, 86400)  # 24 hours

        # Mark as completed if watched 90%+
        progress_percentage = (current_seconds / duration_seconds) * 100
        if progress_percentage >= 90:
            self._mark_lesson_completed(user_id, course_id, lesson_id)

        # Publish event to Kafka for async processing
        kafka.produce('lesson.progress', {
            'user_id': user_id,
            'course_id': course_id,
            'lesson_id': lesson_id,
            'progress_seconds': current_seconds,
            'timestamp': time.time()
        })

    def _mark_lesson_completed(self, user_id, course_id, lesson_id):
        # Update lesson progress
        self.db.execute("""
            UPDATE lesson_progress
            SET status = 'completed', completed_at = NOW()
            WHERE enrollment_id = (
                SELECT id FROM enrollments
                WHERE user_id = %s AND course_id = %s
            ) AND lesson_id = %s
        """, (user_id, course_id, lesson_id))

        # Recalculate course progress
        progress = self._calculate_course_progress(user_id, course_id)

        self.db.execute("""
            UPDATE enrollments
            SET progress_percentage = %s, last_accessed_at = NOW()
            WHERE user_id = %s AND course_id = %s
        """, (progress, user_id, course_id))

        # Check if course is completed
        if progress >= 100:
            self._complete_course(user_id, course_id)

    def _calculate_course_progress(self, user_id, course_id):
        result = self.db.query("""
            SELECT
                COUNT(*) as total_lessons,
                SUM(CASE WHEN lp.status = 'completed' THEN 1 ELSE 0 END) as completed_lessons
            FROM lessons l
            JOIN sections s ON l.section_id = s.id
            LEFT JOIN lesson_progress lp ON l.id = lp.lesson_id
            LEFT JOIN enrollments e ON lp.enrollment_id = e.id
            WHERE s.course_id = %s AND e.user_id = %s
        """, (course_id, user_id))

        if result['total_lessons'] == 0:
            return 0

        return (result['completed_lessons'] / result['total_lessons']) * 100

    def _complete_course(self, user_id, course_id):
        # Mark enrollment as completed
        self.db.execute("""
            UPDATE enrollments
            SET completed_at = NOW()
            WHERE user_id = %s AND course_id = %s
        """, (user_id, course_id))

        # Trigger certificate generation
        certificate_service.generate(user_id, course_id)

        # Send congratulations email
        notification_service.send_completion_email(user_id, course_id)

        # Update recommendations
        kafka.produce('course.completed', {
            'user_id': user_id,
            'course_id': course_id,
            'timestamp': time.time()
        })

Progress Sync Across Devices

// Client-side progress sync with debouncing
class ProgressSync {
  constructor(userId, courseId, lessonId) {
    this.userId = userId;
    this.courseId = courseId;
    this.lessonId = lessonId;
    this.lastSyncTime = 0;
    this.syncInterval = 10000; // 10 seconds
  }

  onVideoProgress(currentSeconds, duration) {
    const now = Date.now();

    // Sync every 10 seconds or at key milestones
    if (now - this.lastSyncTime > this.syncInterval ||
        this.isKeyMilestone(currentSeconds, duration)) {
      this.syncProgress(currentSeconds, duration);
      this.lastSyncTime = now;
    }
  }

  isKeyMilestone(current, duration) {
    const progress = (current / duration) * 100;
    // Sync at 25%, 50%, 75%, 90%, 100%
    return [25, 50, 75, 90, 100].some(
      milestone => Math.abs(progress - milestone) < 1
    );
  }

  async syncProgress(currentSeconds, duration) {
    try {
      await fetch('/api/v1/progress/update', {
        method: 'POST',
        headers: {'Content-Type': 'application/json'},
        body: JSON.stringify({
          user_id: this.userId,
          course_id: this.courseId,
          lesson_id: this.lessonId,
          current_seconds: currentSeconds,
          duration_seconds: duration
        })
      });
    } catch (error) {
      // Queue for retry
      this.queueOfflineSync(currentSeconds, duration);
    }
  }
}

3.5 Certificate Generation

Certificate Service Architecture

from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from PIL import Image
import hashlib
from web3 import Web3

class CertificateGenerator:
    def generate(self, user_id, course_id):
        # Get data
        user = self.db.get_user(user_id)
        course = self.db.get_course(course_id)
        enrollment = self.db.get_enrollment(user_id, course_id)

        # Generate unique certificate ID
        cert_id = self._generate_certificate_id(user_id, course_id)

        # Create certificate record
        certificate = self.db.create_certificate({
            'id': cert_id,
            'user_id': user_id,
            'course_id': course_id,
            'issued_at': datetime.now(),
            'verification_hash': self._generate_verification_hash(cert_id, user_id, course_id)
        })

        # Generate PDF
        pdf_path = self._create_pdf(user, course, certificate)

        # Upload to S3
        s3_url = self._upload_to_s3(pdf_path, cert_id)

        # Record on blockchain for verification
        blockchain_tx = self._record_on_blockchain(certificate)

        # Update certificate with URLs
        self.db.update_certificate(cert_id, {
            'pdf_url': s3_url,
            'blockchain_tx': blockchain_tx
        })

        # Send email with certificate
        self.notification_service.send_certificate_email(user_id, s3_url)

        return certificate

    def _create_pdf(self, user, course, certificate):
        filename = f"/tmp/cert_{certificate['id']}.pdf"
        c = canvas.Canvas(filename, pagesize=letter)
        width, height = letter

        # Add background template
        c.drawImage('certificate_template.png', 0, 0, width, height)

        # Add user name
        c.setFont("Helvetica-Bold", 36)
        c.drawCentredString(width/2, height - 300, user['full_name'])

        # Add course name
        c.setFont("Helvetica", 24)
        c.drawCentredString(width/2, height - 380, course['title'])

        # Add completion date
        c.setFont("Helvetica", 14)
        completion_date = certificate['issued_at'].strftime("%B %d, %Y")
        c.drawCentredString(width/2, height - 450, f"Completed on {completion_date}")

        # Add certificate ID and QR code for verification
        c.setFont("Courier", 10)
        c.drawString(50, 50, f"Certificate ID: {certificate['id']}")
        c.drawString(50, 30, f"Verify at: https://platform.com/verify/{certificate['id']}")

        # Add QR code
        qr_img = self._generate_qr_code(certificate['id'])
        c.drawImage(qr_img, width - 150, 30, 100, 100)

        # Add digital signature
        c.drawString(width/2 - 100, 150, "Digitally Signed")

        c.save()
        return filename

    def _record_on_blockchain(self, certificate):
        """Record certificate hash on Ethereum for verification"""
        w3 = Web3(Web3.HTTPProvider(ETHEREUM_NODE_URL))

        # Create hash of certificate data
        cert_hash = hashlib.sha256(
            f"{certificate['id']}{certificate['user_id']}{certificate['course_id']}".encode()
        ).hexdigest()

        # Call smart contract to record hash
        contract = w3.eth.contract(address=CERT_CONTRACT_ADDRESS, abi=CERT_CONTRACT_ABI)
        tx_hash = contract.functions.recordCertificate(
            certificate['id'],
            cert_hash
        ).transact({'from': PLATFORM_WALLET_ADDRESS})

        receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
        return receipt['transactionHash'].hex()

3.6 Discussion Forums with Threading

Discussion Schema (MongoDB)

// discussions collection
{
  _id: ObjectId,
  course_id: UUID,
  lesson_id: UUID,  // null for course-level discussions
  user_id: UUID,
  title: String,
  content: String,
  type: 'question' | 'discussion',
  tags: [String],
  is_resolved: Boolean,
  upvotes: Number,
  view_count: Number,
  created_at: ISODate,
  updated_at: ISODate,

  // Denormalized for performance
  author: {
    name: String,
    avatar_url: String,
    badges: ['instructor', 'ta', 'top_contributor']
  },

  reply_count: Number,
  last_reply_at: ISODate
}

// replies collection (nested comments)
{
  _id: ObjectId,
  discussion_id: ObjectId,
  parent_reply_id: ObjectId,  // null for top-level replies
  user_id: UUID,
  content: String,
  upvotes: Number,
  is_answer: Boolean,  // marked by instructor
  created_at: ISODate,

  author: {
    name: String,
    avatar_url: String,
    badges: []
  },

  // For nested threading
  path: String,  // e.g., "001.002.001" for tree traversal
  depth: Number
}

Discussion Service API

class DiscussionService:
    def create_discussion(self, user_id, course_id, lesson_id, title, content):
        # Check enrollment
        if not self._is_enrolled(user_id, course_id):
            raise PermissionError("User not enrolled in course")

        discussion = {
            'course_id': course_id,
            'lesson_id': lesson_id,
            'user_id': user_id,
            'title': title,
            'content': content,
            'type': 'question',
            'is_resolved': False,
            'upvotes': 0,
            'view_count': 0,
            'reply_count': 0,
            'created_at': datetime.utcnow()
        }

        # Get author info
        user = self.user_service.get_user(user_id)
        discussion['author'] = {
            'name': user['full_name'],
            'avatar_url': user['avatar_url'],
            'badges': self._get_user_badges(user_id, course_id)
        }

        result = self.mongo.discussions.insert_one(discussion)
        discussion_id = result.inserted_id

        # Index in Elasticsearch for search
        self.es.index(index='discussions', id=str(discussion_id), body=discussion)

        # Notify instructor and TAs
        self.notification_service.notify_new_discussion(course_id, discussion_id)

        return discussion_id

    def add_reply(self, discussion_id, user_id, content, parent_reply_id=None):
        # Calculate path for nested threading
        if parent_reply_id:
            parent = self.mongo.replies.find_one({'_id': ObjectId(parent_reply_id)})
            path = f"{parent['path']}.{self._get_next_sibling_index(parent['path'])}"
            depth = parent['depth'] + 1
        else:
            path = f"{self._get_next_sibling_index(discussion_id)}"
            depth = 0

        reply = {
            'discussion_id': ObjectId(discussion_id),
            'parent_reply_id': ObjectId(parent_reply_id) if parent_reply_id else None,
            'user_id': user_id,
            'content': content,
            'upvotes': 0,
            'is_answer': False,
            'created_at': datetime.utcnow(),
            'path': path,
            'depth': depth
        }

        result = self.mongo.replies.insert_one(reply)

        # Update discussion reply count
        self.mongo.discussions.update_one(
            {'_id': ObjectId(discussion_id)},
            {
                '$inc': {'reply_count': 1},
                '$set': {'last_reply_at': datetime.utcnow()}
            }
        )

        # Send notification to discussion participants
        self._notify_participants(discussion_id, user_id)

        return result.inserted_id

    def get_discussion_with_replies(self, discussion_id):
        # Get discussion
        discussion = self.mongo.discussions.find_one({'_id': ObjectId(discussion_id)})

        # Increment view count
        self.mongo.discussions.update_one(
            {'_id': ObjectId(discussion_id)},
            {'$inc': {'view_count': 1}}
        )

        # Get all replies sorted by path for tree structure
        replies = list(self.mongo.replies.find(
            {'discussion_id': ObjectId(discussion_id)}
        ).sort('path', 1))

        # Build threaded structure
        discussion['replies'] = self._build_reply_tree(replies)

        return discussion

    def _build_reply_tree(self, replies):
        """Convert flat list of replies into nested tree structure"""
        reply_map = {str(r['_id']): r for r in replies}
        tree = []

        for reply in replies:
            reply['children'] = []
            if reply['parent_reply_id']:
                parent_id = str(reply['parent_reply_id'])
                if parent_id in reply_map:
                    reply_map[parent_id]['children'].append(reply)
            else:
                tree.append(reply)

        return tree

3.7 Recommendation Engine for Courses

Recommendation Architecture

User Activity → Kafka → Feature Store → ML Model → Redis Cache → API

                        Offline Training
                        (Airflow + Spark)

Feature Engineering

class FeatureStore:
    def compute_user_features(self, user_id):
        # Enrollment history
        enrolled_courses = self.db.get_user_enrollments(user_id)
        completed_courses = [c for c in enrolled_courses if c['completed_at']]

        # Categories of interest
        categories = self._extract_categories(enrolled_courses)

        # Difficulty preference
        avg_difficulty = self._calculate_avg_difficulty(completed_courses)

        # Learning pace
        avg_completion_time = self._calculate_avg_completion_time(completed_courses)

        # Engagement metrics
        avg_quiz_score = self._calculate_avg_quiz_score(user_id)
        discussion_participation = self._count_discussions(user_id)

        return {
            'user_id': user_id,
            'enrolled_count': len(enrolled_courses),
            'completed_count': len(completed_courses),
            'completion_rate': len(completed_courses) / len(enrolled_courses) if enrolled_courses else 0,
            'preferred_categories': categories,
            'preferred_difficulty': avg_difficulty,
            'avg_completion_days': avg_completion_time,
            'avg_quiz_score': avg_quiz_score,
            'discussion_count': discussion_participation
        }

    def compute_course_features(self, course_id):
        course = self.db.get_course(course_id)

        # Popularity metrics
        enrollment_count = self.db.count_enrollments(course_id)
        completion_count = self.db.count_completions(course_id)

        # Quality metrics
        avg_rating = self.db.get_avg_rating(course_id)
        review_count = self.db.count_reviews(course_id)

        # Content metadata
        total_duration = self.db.get_total_duration(course_id)
        lesson_count = self.db.count_lessons(course_id)

        return {
            'course_id': course_id,
            'category': course['category_id'],
            'difficulty': course['difficulty'],
            'price': course['price'],
            'enrollment_count': enrollment_count,
            'completion_rate': completion_count / enrollment_count if enrollment_count else 0,
            'avg_rating': avg_rating,
            'review_count': review_count,
            'total_hours': total_duration / 3600,
            'lesson_count': lesson_count
        }

ML Recommendation Models

import tensorflow as tf
from sklearn.metrics.pairwise import cosine_similarity

class CourseRecommender:
    def __init__(self):
        self.collaborative_model = self._load_collaborative_model()
        self.content_model = self._load_content_model()
        self.hybrid_weights = {'collaborative': 0.6, 'content': 0.4}

    def get_recommendations(self, user_id, top_k=10):
        # Collaborative filtering recommendations
        collab_scores = self._collaborative_filtering(user_id)

        # Content-based recommendations
        content_scores = self._content_based_filtering(user_id)

        # Hybrid approach
        hybrid_scores = {}
        all_courses = set(collab_scores.keys()) | set(content_scores.keys())

        for course_id in all_courses:
            hybrid_scores[course_id] = (
                self.hybrid_weights['collaborative'] * collab_scores.get(course_id, 0) +
                self.hybrid_weights['content'] * content_scores.get(course_id, 0)
            )

        # Filter out already enrolled courses
        enrolled = self._get_enrolled_courses(user_id)
        hybrid_scores = {k: v for k, v in hybrid_scores.items() if k not in enrolled}

        # Sort and return top K
        recommendations = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]

        # Add explanation for each recommendation
        return [self._add_explanation(user_id, course_id, score)
                for course_id, score in recommendations]

    def _collaborative_filtering(self, user_id):
        """Matrix factorization using neural collaborative filtering"""
        user_embedding = self.collaborative_model.get_user_embedding(user_id)
        all_course_embeddings = self.collaborative_model.get_all_course_embeddings()

        # Compute similarity scores
        scores = cosine_similarity([user_embedding], all_course_embeddings)[0]

        return dict(zip(all_course_embeddings.index, scores))

    def _content_based_filtering(self, user_id):
        """Based on user's historical preferences"""
        user_features = self.feature_store.compute_user_features(user_id)

        # Get all courses in preferred categories
        candidate_courses = self.db.get_courses_by_categories(
            user_features['preferred_categories']
        )

        scores = {}
        for course in candidate_courses:
            course_features = self.feature_store.compute_course_features(course['id'])
            scores[course['id']] = self._compute_content_similarity(
                user_features, course_features
            )

        return scores

    def _add_explanation(self, user_id, course_id, score):
        """Generate human-readable explanation for recommendation"""
        course = self.db.get_course(course_id)
        user_history = self.db.get_user_enrollments(user_id)

        reasons = []

        # Check category match
        if any(c['category_id'] == course['category_id'] for c in user_history):
            reasons.append(f"Based on your interest in {course['category_name']}")

        # Check popularity
        if course['enrollment_count'] > 10000:
            reasons.append(f"Popular choice with {course['enrollment_count']} students")

        # Check rating
        if course['avg_rating'] >= 4.5:
            reasons.append(f"Highly rated ({course['avg_rating']}/5)")

        return {
            'course_id': course_id,
            'course': course,
            'score': score,
            'reasons': reasons
        }

3.8 Payment and Subscription Management

Payment Schema

CREATE TABLE payments (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES users(id),
    order_id UUID UNIQUE NOT NULL,
    amount DECIMAL(10, 2) NOT NULL,
    currency VARCHAR(3) DEFAULT 'USD',
    payment_method VARCHAR(50), -- 'stripe', 'paypal', 'apple_pay'
    status VARCHAR(20) CHECK (status IN ('pending', 'completed', 'failed', 'refunded')),
    payment_gateway_id VARCHAR(255), -- Stripe payment intent ID
    metadata JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    completed_at TIMESTAMP
);

CREATE TABLE subscriptions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES users(id),
    plan_type VARCHAR(20) CHECK (plan_type IN ('monthly', 'annual')),
    status VARCHAR(20) CHECK (status IN ('active', 'cancelled', 'expired', 'past_due')),
    price DECIMAL(10, 2),
    currency VARCHAR(3),
    current_period_start TIMESTAMP,
    current_period_end TIMESTAMP,
    stripe_subscription_id VARCHAR(255),
    cancelled_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE revenue_shares (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    payment_id UUID REFERENCES payments(id),
    instructor_id UUID REFERENCES users(id),
    course_id UUID REFERENCES courses(id),
    instructor_amount DECIMAL(10, 2),
    platform_amount DECIMAL(10, 2),
    processed BOOLEAN DEFAULT FALSE,
    processed_at TIMESTAMP,
    created_at TIMESTAMP DEFAULT NOW()
);

Payment Processing

import stripe

class PaymentService:
    def __init__(self):
        stripe.api_key = STRIPE_SECRET_KEY

    def process_course_purchase(self, user_id, course_id, payment_method_id):
        user = self.db.get_user(user_id)
        course = self.db.get_course(course_id)

        # Check if already enrolled
        if self.db.is_enrolled(user_id, course_id):
            raise ValueError("User already enrolled in course")

        # Create order
        order_id = str(uuid.uuid4())
        amount = course['price']

        try:
            # Create Stripe payment intent
            payment_intent = stripe.PaymentIntent.create(
                amount=int(amount * 100),  # Convert to cents
                currency='usd',
                customer=user['stripe_customer_id'],
                payment_method=payment_method_id,
                confirm=True,
                metadata={
                    'order_id': order_id,
                    'user_id': user_id,
                    'course_id': course_id
                }
            )

            # Create payment record
            payment = self.db.create_payment({
                'user_id': user_id,
                'order_id': order_id,
                'amount': amount,
                'payment_method': 'stripe',
                'status': 'pending',
                'payment_gateway_id': payment_intent.id
            })

            # Wait for confirmation
            if payment_intent.status == 'succeeded':
                self._complete_payment(payment['id'], course_id)

            return payment

        except stripe.error.CardError as e:
            # Card declined
            self.db.create_payment({
                'user_id': user_id,
                'order_id': order_id,
                'amount': amount,
                'payment_method': 'stripe',
                'status': 'failed',
                'metadata': {'error': str(e)}
            })
            raise

    def _complete_payment(self, payment_id, course_id):
        # Update payment status
        self.db.update_payment(payment_id, {
            'status': 'completed',
            'completed_at': datetime.now()
        })

        # Enroll user in course
        payment = self.db.get_payment(payment_id)
        self.db.create_enrollment({
            'user_id': payment['user_id'],
            'course_id': course_id,
            'enrolled_at': datetime.now()
        })

        # Calculate revenue share
        course = self.db.get_course(course_id)
        instructor_share = payment['amount'] * 0.7  # 70% to instructor
        platform_share = payment['amount'] * 0.3    # 30% platform fee

        self.db.create_revenue_share({
            'payment_id': payment_id,
            'instructor_id': course['instructor_id'],
            'course_id': course_id,
            'instructor_amount': instructor_share,
            'platform_amount': platform_share
        })

        # Send receipt email
        self.notification_service.send_purchase_receipt(
            payment['user_id'],
            course_id,
            payment_id
        )

        # Publish event
        kafka.produce('course.purchased', {
            'user_id': payment['user_id'],
            'course_id': course_id,
            'amount': payment['amount']
        })

    def create_subscription(self, user_id, plan_type):
        user = self.db.get_user(user_id)

        # Define pricing
        prices = {'monthly': 29.99, 'annual': 299.99}
        price = prices[plan_type]

        # Create Stripe subscription
        subscription = stripe.Subscription.create(
            customer=user['stripe_customer_id'],
            items=[{'price': STRIPE_PRICE_IDS[plan_type]}],
            metadata={'user_id': user_id}
        )

        # Create subscription record
        self.db.create_subscription({
            'user_id': user_id,
            'plan_type': plan_type,
            'status': 'active',
            'price': price,
            'current_period_start': datetime.fromtimestamp(subscription.current_period_start),
            'current_period_end': datetime.fromtimestamp(subscription.current_period_end),
            'stripe_subscription_id': subscription.id
        })

        return subscription

    def handle_webhook(self, payload, signature):
        """Handle Stripe webhooks for subscription events"""
        event = stripe.Webhook.construct_event(
            payload, signature, STRIPE_WEBHOOK_SECRET
        )

        if event.type == 'invoice.payment_succeeded':
            self._handle_payment_succeeded(event.data.object)
        elif event.type == 'invoice.payment_failed':
            self._handle_payment_failed(event.data.object)
        elif event.type == 'customer.subscription.deleted':
            self._handle_subscription_cancelled(event.data.object)

Step 4: Wrap-up

Capacity Estimation

Storage

  • Video content: 500K courses × 20 videos × 5GB = 50 PB
  • User data: 100M users × 10KB = 1 TB
  • Quiz data: 10M quizzes × 100KB = 1 TB
  • Discussion data: 50M posts × 5KB = 250 GB
  • Total: ~50 PB (primarily video)

Bandwidth

  • Video streaming: 1M concurrent × 5 Mbps = 5 Tbps
  • API traffic: 10M DAU × 100 requests/day × 10KB = 10 TB/day
  • Peak: 5+ Tbps

Database

  • PostgreSQL: 10K writes/sec, 100K reads/sec
  • MongoDB: 50K writes/sec (discussions)
  • Redis: 500K ops/sec (cache + progress)

Key Design Decisions

  1. Microservices Architecture: Enables independent scaling and deployment of course, video, assessment, and payment services
  2. Video CDN with DRM: Multi-CDN strategy with Widevine/FairPlay for content protection
  3. Real-time Progress Tracking: Redis cache with Kafka async persistence for sub-second updates
  4. Hybrid Recommendations: Combining collaborative filtering and content-based approaches for better accuracy
  5. Blockchain Certificates: Immutable verification using Ethereum smart contracts
  6. Auto-grading with Sandboxing: Docker-based code execution with resource limits for security
  7. Threaded Discussions: Path-based threading in MongoDB for efficient nested comment retrieval

Monitoring and Observability

Key Metrics

  • Video start time (p50, p95, p99)
  • Video buffering ratio
  • API latency by endpoint
  • Payment success rate
  • Course completion rate
  • Daily active users
  • Revenue per user

Tools

  • Datadog for infrastructure monitoring
  • Sentry for error tracking
  • ELK stack for log aggregation
  • Prometheus + Grafana for custom metrics
  • AWS CloudWatch for AWS resources

Security Considerations

  • WAF (Web Application Firewall) for DDoS protection
  • OAuth 2.0 + JWT for authentication
  • Encrypted video content with token-based access
  • PCI DSS compliance for payment processing
  • Rate limiting: 1000 req/min per user
  • SQL injection prevention with parameterized queries
  • XSS protection with content security policy
  • Regular security audits and penetration testing

Future Enhancements

  • Live streaming with real-time interaction
  • AR/VR course experiences
  • AI-powered course creation assistant
  • Automated subtitle generation using speech-to-text
  • Adaptive learning paths based on performance
  • Gamification with badges and leaderboards
  • Mobile offline mode with sync
  • White-label solutions for enterprises
  • Integration with LMS systems (Canvas, Moodle)
  • Multi-tenant architecture for B2B

This architecture provides a robust, scalable foundation for an e-learning platform serving millions of users with high-quality video content, engaging assessments, and personalized learning experiences.