本文接上篇,没有阅读上篇的朋友,请先阅读本系列的上篇。
手把手教您使用Python+Flutter开发在线教育系统(上),纯干货!
七、讨论区系统实现
1. 完善的评论模型设计(discussion/models.py)
python
复制
from django.db import models
from django.contrib.auth import get_user_model
from courses.models import Course
User = get_user_model()
class CourseComment(models.Model):
"""
课程评论模型
- 支持多级评论(通过parent字段自关联实现)
- 支持点赞功能(通过likes多对多关系实现)
- 自动记录创建时间
"""
course = models.ForeignKey(
Course,
on_delete=models.CASCADE,
related_name='comments',
verbose_name='所属课程'
)
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='comments',
verbose_name='评论用户'
)
content = models.TextField(
verbose_name='评论内容',
max_length=2000,
help_text='评论内容最多2000个字符'
)
created_at = models.DateTimeField(
auto_now_add=True,
verbose_name='创建时间'
)
parent = models.ForeignKey(
'self',
null=True,
blank=True,
on_delete=models.CASCADE,
related_name='replies',
verbose_name='父级评论'
)
likes = models.ManyToManyField(
User,
related_name='liked_comments',
verbose_name='点赞用户',
blank=True
)
class Meta:
verbose_name = '课程评论'
verbose_name_plural = '课程评论'
ordering = ['-created_at'] # 按创建时间降序排列
indexes = [
models.Index(fields=['created_at'], name='comment_created_idx'),
models.Index(fields=['course'], name='comment_course_idx'),
]
def __str__(self):
return f"{self.user.username}的评论:{self.content[:20]}..."
@property
def like_count(self):
"""获取点赞数量"""
return self.likes.count()
def get_replies(self):
"""获取所有子回复(按时间正序排列)"""
return self.replies.all().order_by('created_at')
def has_parent(self):
"""判断是否有父级评论"""
return self.parent is not None
2. 序列化器(discussion/serializers.py)
python
复制
from rest_framework import serializers
from .models import CourseComment
from django.contrib.auth import get_user_model
User = get_user_model()
class UserSerializer(serializers.ModelSerializer):
"""用户信息序列化器"""
class Meta:
model = User
fields = ['id', 'username', 'avatar']
class CourseCommentSerializer(serializers.ModelSerializer):
"""评论序列化器(包含嵌套回复)"""
user = UserSerializer(read_only=True) # 嵌套用户信息
replies = serializers.SerializerMethodField() # 自定义回复字段
like_count = serializers.SerializerMethodField() # 点赞数
is_liked = serializers.SerializerMethodField() # 当前用户是否点赞
class Meta:
model = CourseComment
fields = [
'id', 'user', 'content', 'created_at',
'parent', 'replies', 'like_count', 'is_liked'
]
read_only_fields = ['user', 'created_at']
def get_replies(self, obj):
"""获取子回复(递归序列化)"""
if obj.replies.exists():
return CourseCommentSerializer(
obj.get_replies(),
many=True,
context=self.context
).data
return None
def get_like_count(self, obj):
"""获取点赞数量"""
return obj.like_count
def get_is_liked(self, obj):
"""检查当前用户是否点赞"""
request = self.context.get('request')
if request and request.user.is_authenticated:
return obj.likes.filter(id=request.user.id).exists()
return False
def create(self, validated_data):
"""创建评论时自动关联用户"""
validated_data['user'] = self.context['request'].user
return super().create(validated_data)
3. 视图(discussion/views.py)
python
复制
from rest_framework import viewsets, permissions, status
from rest_framework.decorators import action
from rest_framework.response import Response
from .models import CourseComment
from .serializers import CourseCommentSerializer
class CourseCommentViewSet(viewsets.ModelViewSet):
"""
评论视图集:
- 支持评论的CRUD操作
- 支持点赞功能
- 按课程过滤评论
"""
serializer_class = CourseCommentSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
def get_queryset(self):
"""根据课程ID过滤顶级评论"""
queryset = CourseComment.objects.filter(parent__isnull=True)
course_id = self.request.query_params.get('course_id')
if course_id:
queryset = queryset.filter(course_id=course_id)
return queryset.prefetch_related('replies')
@action(detail=True, methods=['post'])
def like(self, request, pk=None):
"""点赞/取消点赞"""
comment = self.get_object()
user = request.user
if comment.likes.filter(id=user.id).exists():
comment.likes.remove(user)
status_code = status.HTTP_200_OK
message = '取消点赞成功'
else:
comment.likes.add(user)
status_code = status.HTTP_201_CREATED
message = '点赞成功'
return Response({'message': message}, status=status_code)
def perform_create(self, serializer):
"""创建评论时自动关联用户和课程"""
course_id = self.request.data.get('course')
serializer.save(user=self.request.user, course_id=course_id)
4. 路由配置(discussion/urls.py)
python
复制
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .views import CourseCommentViewSet
router = DefaultRouter()
router.register(r'comments', CourseCommentViewSet, basename='comment')
urlpatterns = [
path('', include(router.urls)),
]
5. 评论列表组件(lib/components/comment_list.dart)
dart
复制
import 'package:flutter/material.dart';
import 'package:http/http.dart' as http;
import 'package:provider/provider.dart';
import 'dart:convert';
class CommentList extends StatefulWidget {
final String courseId;
const CommentList({Key? key, required this.courseId}) : super(key: key);
@override
_CommentListState createState() => _CommentListState();
}
class _CommentListState extends State {
List _comments = [];
bool _isLoading = true;
@override
void initState() {
super.initState();
_fetchComments();
}
Future _fetchComments() async {
final response = await http.get(
Uri.parse('http://your-api.com/api/comments/?course_id=${widget.courseId}'),
headers: {'Authorization': 'Bearer ${_getToken()}'},
);
if (response.statusCode == 200) {
setState(() {
_comments = jsonDecode(response.body);
_isLoading = false;
});
}
}
String _getToken() {
// 从安全存储获取token
return 'user_auth_token';
}
@override
Widget build(BuildContext context) {
return _isLoading
? Center(child: CircularProgressIndicator())
: ListView.builder(
itemCount: _comments.length,
itemBuilder: (context, index) => CommentItem(
comment: _comments[index],
onLike: _handleLike,
),
);
}
void _handleLike(int commentId) async {
final response = await http.post(
Uri.parse('http://your-api.com/api/comments/$commentId/like/'),
headers: {'Authorization': 'Bearer ${_getToken()}'},
);
if (response.statusCode == 200) {
_fetchComments(); // 刷新评论列表
}
}
}
class CommentItem extends StatelessWidget {
final Map comment;
final Function(int) onLike;
const CommentItem({
Key? key,
required this.comment,
required this.onLike,
}) : super(key: key);
@override
Widget build(BuildContext context) {
return Card(
margin: EdgeInsets.symmetric(vertical: 8),
child: Padding(
padding: EdgeInsets.all(12),
child: Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: [
_buildHeader(),
SizedBox(height: 8),
Text(comment['content']),
_buildLikeButton(),
if (comment['replies'] != null)
...comment['replies'].map((reply) => Padding(
padding: EdgeInsets.only(left: 24, top: 8),
child: CommentItem(
comment: reply,
onLike: onLike,
),
)),
],
),
),
);
}
Widget _buildHeader() {
return Row(
children: [
CircleAvatar(
backgroundImage: NetworkImage(comment['user']['avatar'] ?? ''),
SizedBox(width: 8),
Text(comment['user']['username']),
Spacer(),
Text(
_formatDate(comment['created_at']),
style: TextStyle(color: Colors.grey),
),
],
);
}
Widget _buildLikeButton() {
return Row(
mainAxisAlignment: MainAxisAlignment.end,
children: [
IconButton(
icon: Icon(
comment['is_liked'] ? Icons.favorite : Icons.favorite_border,
color: comment['is_liked'] ? Colors.red : null,
),
onPressed: () => onLike(comment['id']),
),
Text(comment['like_count'].toString()),
],
);
}
String _formatDate(String dateStr) {
final date = DateTime.parse(dateStr);
return '${date.year}-${date.month}-${date.day}';
}
}
6. 评论发布组件(lib/components/comment_input.dart)
dart
复制
import 'package:flutter/material.dart';
import 'package:http/http.dart' as http;
class CommentInput extends StatefulWidget {
final String courseId;
final VoidCallback onCommentAdded;
const CommentInput({
Key? key,
required this.courseId,
required this.onCommentAdded,
}) : super(key: key);
@override
_CommentInputState createState() => _CommentInputState();
}
class _CommentInputState extends State {
final _controller = TextEditingController();
bool _isSubmitting = false;
@override
Widget build(BuildContext context) {
return Padding(
padding: EdgeInsets.all(16),
child: Row(
children: [
Expanded(
child: TextField(
controller: _controller,
decoration: InputDecoration(
hintText: '写下你的评论...',
border: OutlineInputBorder(),
),
maxLines: 3,
),
),
SizedBox(width: 8),
_isSubmitting
? CircularProgressIndicator()
: IconButton(
icon: Icon(Icons.send),
onPressed: _submitComment,
),
],
),
);
}
Future _submitComment() async {
if (_controller.text.isEmpty) return;
setState(() => _isSubmitting = true);
final response = await http.post(
Uri.parse('http://your-api.com/api/comments/'),
headers: {
'Authorization': 'Bearer ${_getToken()}',
'Content-Type': 'application/json',
},
body: jsonEncode({
'course': widget.courseId,
'content': _controller.text,
}),
);
setState(() => _isSubmitting = false);
if (response.statusCode == 201) {
_controller.clear();
widget.onCommentAdded();
}
}
String _getToken() {
// 从安全存储获取token
return 'user_auth_token';
}
}
7、系统功能验证
测试用例
(1)发布评论
(2)点赞功能
(3)查看回复
八、推荐系统深度实现
1. 推荐系统架构
bash
复制
推荐流程:
用户行为收集 -> 特征工程 -> 推荐算法 -> 结果缓存 -> API接口暴露
2. 用户行为日志设计
python
复制
# analytics/models.py
from django.db import models
from django.contrib.auth import get_user_model
from courses.models import Course
User = get_user_model()
class UserBehavior(models.Model):
EVENT_TYPES = [
('view', '浏览'),
('enroll', '报名'),
('play', '播放'),
('search', '搜索'),
('share', '分享')
]
user = models.ForeignKey(User, on_delete=models.CASCADE)
course = models.ForeignKey(Course, null=True, on_delete=models.SET_NULL)
event_type = models.CharField(max_length=20, choices=EVENT_TYPES)
event_data = models.JSONField(default=dict)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
models.Index(fields=['user', 'event_type']),
models.Index(fields=['course', 'event_type'])
]
3. 混合推荐算法实现
python
复制
# recommendations/algorithms.py
import numpy as np
from collections import defaultdict
from django.db.models import Count
from courses.models import Course, Enrollment
class HybridRecommender:
def __init__(self, user):
self.user = user
self.content_weight = 0.6
self.collaborative_weight = 0.4
def get_recommendations(self, num=10):
# 内容推荐
content_scores = self._content_based_filtering()
# 协同过滤
collab_scores = self._collaborative_filtering()
# 混合推荐
combined = defaultdict(float)
for course_id in set(content_scores.keys()) | set(collab_scores.keys()):
combined[course_id] = (
content_scores.get(course_id, 0) * self.content_weight +
collab_scores.get(course_id, 0) * self.collaborative_weight
)
# 获取推荐课程
sorted_ids = sorted(combined.items(), key=lambda x: x[1], reverse=True)[:num]
return Course.objects.filter(id__in=[cid for cid, _ in sorted_ids])
def _content_based_filtering(self):
# 基于用户已学课程的特征推荐
user_courses = Course.objects.filter(enrollment__student=self.user)
if not user_courses.exists():
return self._popular_courses()
# 计算课程特征向量
features = defaultdict(float)
for course in user_courses:
features[course.category_id] += 1
features[f"level_{course.level}"] += 1
# 寻找相似课程
scores = {}
for course in Course.objects.exclude(enrollment__student=self.user):
score = 0
if course.category_id in features:
score += features[course.category_id] * 0.7
if f"level_{course.level}" in features:
score += features[f"level_{course.level}"] * 0.3
scores[course.id] = score
return scores
def _collaborative_filtering(self):
# 基于用户的协同过滤
user_enrollments = set(Enrollment.objects.filter(student=self.user)
.values_list('course_id', flat=True))
# 找到相似用户
similar_users = (Enrollment.objects.exclude(student=self.user)
.filter(course_id__in=user_enrollments)
.values('student')
.annotate(common=Count('course_id'))
.order_by('-common')[:5])
# 收集相似用户的课程
scores = defaultdict(int)
for user in similar_users:
for course in Enrollment.objects.filter(student=user['student']):
if course.course_id not in user_enrollments:
scores[course.course_id] += user['common']
return scores
def _popular_courses(self):
# 热门课程回退策略
return dict(Course.objects.annotate(popularity=Count('enrollment'))
.order_by('-popularity')[:10]
.values_list('id', 'popularity'))
4. 推荐API实现
python
复制
# recommendations/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from .algorithms import HybridRecommender
from courses.serializers import CourseSerializer
class RecommendationAPI(APIView):
def get(self, request):
recommender = HybridRecommender(request.user)
courses = recommender.get_recommendations(10)
serializer = CourseSerializer(courses, many=True)
return Response({
'recommendations': serializer.data,
'algorithm': 'hybrid'
})
九、用户个人中心完整实现
1. 用户模型扩展
python
复制
# users/models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.utils.translation import gettext_lazy as _
class User(AbstractUser):
USER_TYPES = [
('student', '学生'),
('teacher', '教师'),
('admin', '管理员')
]
avatar = models.ImageField(upload_to='avatars/', null=True)
bio = models.TextField(_('个人简介'), blank=True)
user_type = models.CharField(max_length=10, choices=USER_TYPES, default='student')
phone = models.CharField(max_length=20, blank=True)
location = models.CharField(max_length=100, blank=True)
class Meta:
verbose_name = _('用户')
verbose_name_plural = _('用户')
class UserAchievement(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
title = models.CharField(max_length=100)
description = models.TextField()
achieved_at = models.DateTimeField(auto_now_add=True)
icon = models.CharField(max_length=50)
2. 个人中心API
python
复制
# users/views.py
from rest_framework.viewsets import ModelViewSet
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from .models import User, UserAchievement
from .serializers import UserSerializer, AchievementSerializer
class UserProfileViewSet(ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAuthenticated]
@action(detail=False, methods=['get', 'put'])
def me(self, request):
user = request.user
if request.method == 'GET':
serializer = self.get_serializer(user)
return Response(serializer.data)
elif request.method == 'PUT':
serializer = self.get_serializer(user, data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data)
@action(detail=True, methods=['get'])
def achievements(self, request, pk=None):
user = self.get_object()
achievements = UserAchievement.objects.filter(user=user)
serializer = AchievementSerializer(achievements, many=True)
return Response(serializer.data)
3. Flutter个人中心界面
dart
复制
// lib/screens/profile_screen.dart
import 'package:flutter/material.dart';
import 'package:provider/provider.dart';
import '../providers/auth_provider.dart';
import '../widgets/achievement_badge.dart';
class ProfileScreen extends StatelessWidget {
const ProfileScreen({super.key});
@override
Widget build(BuildContext context) {
final authProvider = Provider.of(context);
final user = authProvider.user;
return Scaffold(
appBar: AppBar(title: const Text('个人中心')),
body: SingleChildScrollView(
child: Column(
children: [
_buildProfileHeader(user),
_buildStatisticsSection(),
_buildAchievementsSection(),
_buildSettingsSection(),
],
),
),
);
}
Widget _buildProfileHeader(User user) {
return Container(
padding: const EdgeInsets.all(20),
decoration: BoxDecoration(
color: Colors.blue[50],
borderRadius: BorderRadius.circular(12),
),
child: Row(
children: [
CircleAvatar(
radius: 40,
backgroundImage: user.avatarUrl != null
? NetworkImage(user.avatarUrl!)
: const AssetImage('assets/default_avatar.png') as ImageProvider,
),
const SizedBox(width: 20),
Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: [
Text(
user.username,
style: const TextStyle(
fontSize: 24,
fontWeight: FontWeight.bold,
),
),
Text(user.bio ?? '暂无个人简介'),
Chip(
label: Text(user.userType),
backgroundColor: _getUserTypeColor(user.userType),
),
],
),
],
),
);
}
Widget _buildStatisticsSection() {
return Card(
child: Padding(
padding: const EdgeInsets.all(16),
child: Row(
mainAxisAlignment: MainAxisAlignment.spaceAround,
children: [
_buildStatItem('已学课程', '128'),
_buildStatItem('学习时长', '256h'),
_buildStatItem('成就点数', '980'),
],
),
),
);
}
Widget _buildAchievementsSection() {
return Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: [
const Padding(
padding: EdgeInsets.all(16),
child: Text(
'学习成就',
style: TextStyle(fontSize: 18, fontWeight: FontWeight.bold),
),
),
SizedBox(
height: 120,
child: ListView.builder(
scrollDirection: Axis.horizontal,
itemCount: 5,
itemBuilder: (context, index) => const AchievementBadge(),
),
),
],
);
}
Widget _buildSettingsSection() {
return Column(
children: [
ListTile(
leading: const Icon(Icons.edit),
title: const Text('编辑资料'),
onTap: () => _navigateToEditProfile(context),
),
ListTile(
leading: const Icon(Icons.security),
title: const Text('账户安全'),
onTap: () {},
),
ListTile(
leading: const Icon(Icons.notifications),
title: const Text('通知设置'),
onTap: () {},
),
],
);
}
Color _getUserTypeColor(String type) {
switch (type) {
case 'teacher':
return Colors.amber[200]!;
case 'admin':
return Colors.red[200]!;
default:
return Colors.green[200]!;
}
}
Widget _buildStatItem(String label, String value) {
return Column(
children: [
Text(
value,
style: const TextStyle(
fontSize: 20,
fontWeight: FontWeight.bold,
),
),
Text(
label,
style: TextStyle(
color: Colors.grey[600],
fontSize: 14,
),
),
],
);
}
}
十、教师管理后台实现
1. 教师权限管理
python
复制
# courses/permissions.py
from rest_framework.permissions import BasePermission
class IsCourseTeacher(BasePermission):
def has_object_permission(self, request, view, obj):
return obj.teacher == request.user
class IsTeacherUser(BasePermission):
def has_permission(self, request, view):
return request.user.user_type == 'teacher'
2. 教学数据统计API
python
复制
# analytics/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from django.db.models import Count, Avg
from courses.models import Course, Enrollment
class TeachingAnalyticsAPI(APIView):
permission_classes = [IsTeacherUser]
def get(self, request):
teacher = request.user
courses = Course.objects.filter(teacher=teacher)
# 基础统计
total_students = Enrollment.objects.filter(course__teacher=teacher)
.values('student').distinct().count()
total_income = sum(course.price * course.enrollment_set.count()
for course in courses)
# 课程评分数据
rating_stats = courses.annotate(
avg_rating=Avg('reviews__rating'),
review_count=Count('reviews')
).values('title', 'avg_rating', 'review_count')
# 学习进度统计
progress_data = []
for course in courses:
completed = course.enrollment_set.filter(completed=True).count()
total = course.enrollment_set.count()
progress_data.append({
'course': course.title,
'completion_rate': completed / total * 100 if total > 0 else 0
})
return Response({
'total_courses': courses.count(),
'total_students': total_students,
'total_income': total_income,
'rating_stats': rating_stats,
'progress_data': progress_data
})
3. Flutter教师仪表盘
dart
复制
// lib/screens/teacher_dashboard.dart
import 'package:flutter/material.dart';
import 'package:charts_flutter/flutter.dart' as charts;
class TeacherDashboard extends StatelessWidget {
final Map analyticsData;
const TeacherDashboard({super.key, required this.analyticsData});
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: const Text('教学仪表盘')),
body: ListView(
padding: const EdgeInsets.all(16),
children: [
_buildStatsGrid(),
const SizedBox(height: 24),
_buildRatingChart(),
const SizedBox(height: 24),
_buildProgressChart(),
],
),
);
}
Widget _buildStatsGrid() {
return GridView.count(
shrinkWrap: true,
crossAxisCount: 2,
childAspectRatio: 2,
crossAxisSpacing: 16,
mainAxisSpacing: 16,
children: [
_buildStatCard('总课程数', analyticsData['total_courses'].toString()),
_buildStatCard('总学生数', analyticsData['total_students'].toString()),
_buildStatCard('总收入', 'yen${analyticsData['total_income'].toStringAsFixed(2)}'),
_buildStatCard('平均评分', analyticsData['rating_stats'].isNotEmpty
? analyticsData['rating_stats'][0]['avg_rating'].toStringAsFixed(1)
: '0.0'),
],
);
}
Widget _buildRatingChart() {
return Card(
child: Padding(
padding: const EdgeInsets.all(16),
child: Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: [
const Text(
'课程评分分布',
style: TextStyle(fontWeight: FontWeight.bold),
),
SizedBox(
height: 200,
child: charts.BarChart(
_createRatingSeries(),
animate: true,
domainAxis: const charts.OrdinalAxisSpec(
renderSpec: charts.SmallTickRendererSpec(
labelRotation: 45,
),
),
),
),
],
),
),
);
}
List<charts.Series<Map, String>> _createRatingSeries() {
return [
charts.Series(
id: 'Ratings',
data: analyticsData['rating_stats'],
domainFn: (item, _) => item['title'],
measureFn: (item, _) => item['avg_rating'] ?? 0,
colorFn: (_, __) => charts.MaterialPalette.blue.shadeDefault,
)
];
}
Widget _buildStatCard(String title, String value) {
return Card(
child: Padding(
padding: const EdgeInsets.all(16),
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Text(
value,
style: const TextStyle(
fontSize: 24,
fontWeight: FontWeight.bold,
),
),
Text(
title,
style: TextStyle(
color: Colors.grey[600],
fontSize: 14,
),
),
],
),
),
);
}
}
十一、系统安全加固措施
1. 安全中间件配置
python
复制
# edu_platform/settings.py
SECURE_CONTENT_TYPE_NOSNIFF = True
SECURE_BROWSER_XSS_FILTER = True
X_FRAME_OPTIONS = 'DENY'
CSRF_COOKIE_HTTPONLY = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SECURE_HSTS_SECONDS = 31536000 # 1 year
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
CORS_ORIGIN_WHITELIST = ['https://yourdomain.com']
2. 请求频率限制
python
复制
# utils/throttles.py
from rest_framework.throttling import UserRateThrottle
class BurstRateThrottle(UserRateThrottle):
scope = 'burst'
class SustainedRateThrottle(UserRateThrottle):
scope = 'sustained'
# settings.py配置
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': [
'utils.throttles.BurstRateThrottle',
'utils.throttles.SustainedRateThrottle'
],
'DEFAULT_THROTTLE_RATES': {
'burst': '100/minute',
'sustained': '1000/day'
}
}
3. 敏感操作审计日志(代码简略)
复制
# security/signals.py
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.contrib.admin.models import LogEntry, ADDITION, CHANGE, DELETION
from django.contrib.contenttypes.models import ContentType
十二、持续集成与自动化部署
1. GitLab CI/CD 配置
yaml
复制
# .gitlab-ci.yml
variables:
DOCKER_IMAGE: registry.example.com/edu-platform
PRODUCTION_SERVER: edu-prod.example.com
stages:
- test
- build
- deploy
unit-test:
stage: test
image: python:3.9
services:
- postgres:13
variables:
DATABASE_URL: "postgres://postgres@postgres:5432/test_db"
before_script:
- apt-get update && apt-get install -y build-essential python3-dev
- pip install -r requirements.txt
script:
- pytest --cov=edu_platform --cov-report=xml
build-docker:
stage: build
image: docker:20.10
services:
- docker:20.10-dind
script:
- docker build -t $DOCKER_IMAGE:$CI_COMMIT_SHA .
- docker push $DOCKER_IMAGE:$CI_COMMIT_SHA
deploy-prod:
stage: deploy
image: alpine/ssh
only:
- main
script:
- ssh -o StrictHostKeyChecking=no deploy@$PRODUCTION_SERVER "
docker pull $DOCKER_IMAGE:$CI_COMMIT_SHA &&
docker service update --image $DOCKER_IMAGE:$CI_COMMIT_SHA edu_platform
"
2. Docker 生产环境配置
Dockerfile
复制
# Dockerfile.prod
FROM python:3.9-slim
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
POETRY_VERSION=1.4.2
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
RUN pip install "poetry==$POETRY_VERSION"
COPY pyproject.toml poetry.lock ./
RUN poetry config virtualenvs.create false && \
poetry install --no-interaction --no-ansi --only main
COPY . /app
WORKDIR /app
RUN python manage.py collectstatic --noinput
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "edu_platform.wsgi"]
3. 监控告警配置
yaml
复制
# docker-compose.monitoring.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
grafana:
image: grafana/grafana
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
depends_on:
- prometheus
node-exporter:
image: prom/node-exporter
ports:
- "9100:9100"
volumes:
grafana-data:
十三、消息通知系统
1. 通知模型设计
python
复制
# notifications/models.py
from django.db import models
from django.contrib.auth import get_user_model
User = get_user_model()
class Notification(models.Model):
NOTIFICATION_TYPES = [
('system', '系统通知'),
('course', '课程通知'),
('payment', '支付通知'),
('live', '直播提醒')
]
user = models.ForeignKey(User, on_delete=models.CASCADE)
title = models.CharField(max_length=200)
content = models.TextField()
notification_type = models.CharField(max_length=20, choices=NOTIFICATION_TYPES)
is_read = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
related_object_id = models.CharField(max_length=100, null=True)
class Meta:
indexes = [
models.Index(fields=['user', 'is_read']),
models.Index(fields=['created_at'])
]
ordering = ['-created_at']
2. WebSocket 通知推送
python
复制
# notifications/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
class NotificationConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.user = self.scope["user"]
if self.user.is_authenticated:
self.group_name = f"notifications_{self.user.id}"
await self.channel_layer.group_add(
self.group_name,
self.channel_name
)
await self.accept()
else:
await self.close()
async def disconnect(self, close_code):
if hasattr(self, 'group_name'):
await self.channel_layer.group_discard(
self.group_name,
self.channel_name
)
async def receive(self, text_data):
data = json.loads(text_data)
if data.get('action') == 'mark_read':
await self.mark_notification_read(data['notification_id'])
@database_sync_to_async
def mark_notification_read(self, notification_id):
Notification.objects.filter(
id=notification_id,
user=self.user
).update(is_read=True)
async def send_notification(self, event):
await self.send(text_data=json.dumps({
"type": "notification",
"title": event["title"],
"content": event["content"],
"timestamp": event["timestamp"],
"id": event["id"]
}))
3. Flutter 消息中心实现
dart
复制
// lib/screens/notification_screen.dart
import 'package:flutter/material.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
class NotificationScreen extends StatefulWidget {
const NotificationScreen({super.key});
@override
_NotificationScreenState createState() => _NotificationScreenState();
}
class _NotificationScreenState extends State {
late final WebSocketChannel _channel;
final List _notifications = [];
@override
void initState() {
super.initState();
_channel = WebSocketChannel.connect(
Uri.parse('wss://yourdomain.com/ws/notifications/'),
headers: {'Authorization': 'Bearer ${userToken}'}
);
_channel.stream.listen(_handleMessage);
}
void _handleMessage(dynamic message) {
final data = jsonDecode(message);
setState(() {
_notifications.insert(0, Notification(
id: data['id'],
title: data['title'],
content: data['content'],
timestamp: DateTime.parse(data['timestamp']),
isRead: false
));
});
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text('消息中心'),
actions: [
IconButton(
icon: const Icon(Icons.mark_as_unread),
onPressed: _markAllAsRead,
)
],
),
body: ListView.builder(
itemCount: _notifications.length,
itemBuilder: (context, index) => _buildNotificationItem(_notifications[index]),
),
);
}
Widget _buildNotificationItem(Notification notification) {
return Dismissible(
key: Key(notification.id),
background: Container(color: Colors.red),
onDismissed: (_) => _deleteNotification(notification.id),
child: ListTile(
leading: _getNotificationIcon(notification.type),
title: Text(notification.title),
subtitle: Text(notification.content),
trailing: Text(
DateFormat('HH:mm').format(notification.timestamp),
style: TextStyle(
color: notification.isRead ? Colors.grey : Colors.blue,
fontWeight: notification.isRead ? FontWeight.normal : FontWeight.bold
),
),
onTap: () => _showNotificationDetail(notification),
),
);
}
}
十四、课程证书系统
1. 证书生成模型
python
复制
# certificates/models.py
from django.db import models
from django.contrib.auth import get_user_model
from courses.models import Course
User = get_user_model()
class CourseCertificate(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
course = models.ForeignKey(Course, on_delete=models.CASCADE)
issued_at = models.DateTimeField(auto_now_add=True)
certificate_id = models.CharField(max_length=64, unique=True)
verification_url = models.URLField()
pdf_file = models.FileField(upload_to='certificates/')
metadata = models.JSONField(default=dict)
class Meta:
unique_together = ('user', 'course')
indexes = [
models.Index(fields=['certificate_id']),
models.Index(fields=['user', 'course'])
]
class CertificateTemplate(models.Model):
name = models.CharField(max_length=100)
template_file = models.FileField(upload_to='certificate_templates/')
config = models.JSONField()
is_active = models.BooleanField(default=True)
2. PDF 证书生成服务
python
复制
# certificates/services.py
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.lib.styles import ParagraphStyle
from reportlab.platypus import Paragraph
from io import BytesIO
from django.core.files import File
class CertificateGenerator:
def __init__(self, template):
self.template = template
self.config = template.config
def generate_pdf(self, user, course):
buffer = BytesIO()
c = canvas.Canvas(buffer, pagesize=A4)
# 绘制背景模板
c.drawImage(self.template.template_file.path, 0, 0, width=A4[0], height=A4[1])
# 添加动态内容
self._draw_text(c, self.config['title_position'], course.title)
self._draw_text(c, self.config['student_name_position'], user.get_full_name())
self._draw_qr_code(c, self.config['qr_code_position'], verification_url)
c.save()
buffer.seek(0)
return File(buffer, name=f"certificate_{course.id}.pdf")
def _draw_text(self, canvas, position, text):
style = ParagraphStyle(
name='cert',
fontName='Helvetica-Bold',
fontSize=24,
leading=30,
alignment=1
)
p = Paragraph(text, style)
p.wrapOn(canvas, 400, 100)
p.drawOn(canvas, position['x'], position['y'])
def _draw_qr_code(self, canvas, position, url):
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=6,
border=4,
)
qr.add_data(url)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img_buffer = BytesIO()
img.save(img_buffer, format='PNG')
canvas.drawImage(img_buffer, position['x'], position['y'], width=100, height=100)
3. Flutter 证书展示
dart
复制
// lib/widgets/certificate_viewer.dart
import 'package:flutter_pdfview/flutter_pdfview.dart';
class CertificateViewer extends StatefulWidget {
final String certificateUrl;
const CertificateViewer({super.key, required this.certificateUrl});
@override
_CertificateViewerState createState() => _CertificateViewerState();
}
class _CertificateViewerState extends State {
final Completer _controller = Completer();
int? pages = 0;
bool isReady = false;
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text('课程证书'),
actions: [
IconButton(
icon: const Icon(Icons.share),
onPressed: _shareCertificate,
),
],
),
body: Stack(
children: [
PDFView(
filePath: widget.certificateUrl,
enableSwipe: true,
swipeHorizontal: true,
autoSpacing: false,
pageFling: false,
onRender: (pages) => setState(() {
this.pages = pages;
isReady = true;
}),
onViewCreated: (controller) {
_controller.complete(controller);
},
),
if (!isReady)
const Center(child: CircularProgressIndicator()),
],
),
floatingActionButton: FutureBuilder(
future: _controller.future,
builder: (context, snapshot) {
if (snapshot.hasData) {
return Row(
mainAxisAlignment: MainAxisAlignment.end,
children: [
IconButton(
icon: const Icon(Icons.zoom_in),
onPressed: () => snapshot.data!.setZoom(1.5),
),
IconButton(
icon: const Icon(Icons.zoom_out),
onPressed: () => snapshot.data!.setZoom(1.0),
),
],
);
}
return Container();
},
),
);
}
void _shareCertificate() async {
final file = File(widget.certificateUrl);
await Share.shareFiles(
[file.path],
text: '这是我的课程证书:${widget.certificateUrl}',
);
}
}
十五、系统扩展建议
- AI 助教功能实现
- 使用 Transformer 模型构建智能问答系统
- 集成语音识别实现语音问答
- 自动生成课程摘要和知识点脑图
- 微服务架构改造
- 将用户服务、课程服务拆分为独立微服务
- 使用 gRPC 进行服务间通信
- 引入服务网格进行流量管理
- 区块链证书存证
- 将证书哈希值存储至以太坊网络
- 开发智能合约进行证书验证
- 实现去中心化的学分互认体系
- 虚拟现实教学
- 使用 Unity 开发 VR 课程内容
- 集成 3D 模型展示功能
- 开发虚拟实验室交互系统
以上实现方案涵盖了在线教育系统的完整技术栈,实际开发中需要根据具体需求进行调整。
建议优先实现核心教学功能,再逐步扩展高级功能。
每个模块都应配套完善的单元测试和端到端测试,推荐使用:
- Django: pytest-django, factory-boy
- Flutter: mockito, integration_test
- API测试: Postman, Newman
- 压力测试: Locust, JMeter
系统上线后应建立完善的监控体系,推荐组合:
- 基础设施监控: Prometheus + Grafana
- 应用性能监控: Elastic APM
- 日志分析: ELK Stack
- 错误追踪: Sentry
因为平台对单篇文章的字数限制,部分内容只得省略,敬请谅解!