本教程所用项目框架为eggjs 官方文档, 数据库使用 mysql 和 redis 缓存数据库
本教程参照文档: 小程序直播 | 微信开放文档
【获取直播房间列表】接口规则:该接口仅供商家后台调用,调用限额 500 次/天,建议开发者自己做缓存(此接口与下面【获取回放视频】接口共用 500 次/天限制,请合理分配调用频次)
根据以上接口规则, 我们接下来需要做的是:
- 通过调用该接口将直播间列表数据全部保存到数据库
- 前端小程序调用自己的后台接口拉取数据库中的数据
整体项目分为5个部分:
1. 添加插件配置以及数据模型定义
2. 定时获取 access_token 并存储
3. 定义路由
4. 定义service服务层
5. 定义controller控制层
以下是具体实现流程:
一. 添加插件配置以及定义数据模型
- 安装并配置 egg-redis 插件
- 安装
npm i egg-redis --save
- 在
config/plugin.js
中引入 egg-redis 插件
exports.redis = {
enable: true,
package: 'egg-redis',
};
- 在 config/config.default.js 中编写 redis 配置
config.redis = {
client: {
port: 6379,
host: 'xx.xx.xx.xx',
password: 'pwdxxxxxx',
db: 0,
},
};
2. 安装并配置 egg-sequelize 插件
- 安装
npm install --save egg-sequelize mysql2
- 在 config/plugin.js 中引入 egg-sequelize 插件
exports.sequelize = {
enable: true,
package: 'egg-sequelize',
};
- 在
config/config.default.js
中编写 sequelize 配置
config.sequelize = {
dialect: 'mysql',
database: 'live_data',
host: 'xx.xx.xx.xx',
port: '3306',
username: 'user',
password: 'pwdxxxxxxx',
timezone: '+08:00', // 由于orm用的UTC时间,这里必须加上东八区,否则取出来的时间相差8小时
define: { // model的全局配置
timestamps: true, // 添加create,update,delete时间戳
paranoid: false, // 添加软删除
freezeTableName: true, // 防止修改表名为复数
underscored: false, // 防止驼峰式字段被默认转为下划线
},
dialectOptions: {
charset: 'utf8mb4',
typeCast(field, next) {
// for reading from database
if (field.type === 'DATETIME') {
return field.string();
}
return next();
},
},
};
3. 定义直播数据模型
'use strict';
module.exports = app => {
const { INTEGER, STRING, TEXT } = app.Sequelize;
const Wxlive = app.model.define('wxlive', {
id: { type: INTEGER, primaryKey: true, autoIncrement: true },
roomid: { type: INTEGER(11), comment: '直播间id' },
name: { type: STRING(512), comment: '标题' },
cover_img: { type: STRING(512), comment: '封面' },
live_status: { type: INTEGER(11), comment: '直播状态' },
start_time: { type: INTEGER(11), comment: '开始时间' },
end_time: { type: INTEGER(11), comment: '结束时间' },
anchor_name: { type: STRING(255), comment: '主播' },
anchor_img: { type: STRING(512), comment: '主播头像' },
goods: { type: TEXT, comment: '商品' },
live_replay: { type: TEXT, comment: '回放内容' },
is_top: { type: INTEGER(10), defaultValue: 0, comment: '置顶' },
});
return Wxlive;
};
二. 定时获取 access_token 并存储
- 在 service 文件夹下, 新建 wx.js 文件, 定义读取和加载 access_token 的方法
/**
* 读出redis中的accesstoken
*/
async getAccessToken() {
const { app } = this;
let accessToken = await app.redis.get('_accesstoken');
if (!accessToken) {
accessToken = await this.fetchAccessToken();
}
return accessToken;
}
/**
* 网络获取accesstoken
*/
async fetchAccessToken() {
const { ctx, app } = this;
const appId = 'xxxxxx';
const appSecret = 'xxxxxx';
const url = `https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid=${appId}&secret=${appSecret}`;
const res = await ctx.curl(url, { dataType: 'json' });
if (res.status === 200 && res.data.access_token) {
await app.redis.set('_accesstoken', res.data.access_token);
return res.data.access_token;
}
return '';
}
2. 在 schedule 文件夹下,新建 wx_task.js 文件,用于定时每7000秒获取一次access_token
'use strict';
const Subscription = require('egg').Subscription;
class GetAccessToken extends Subscription {
static get schedule() {
return {
immediate: true,
interval: 7000 * 1000,
type: 'all', // 指定所有的 worker 都需要执行
};
}
async subscribe() {
const { ctx, service, app } = this;
ctx.logger.info('【项目运行环境】:' + app.config.env);
const env = app.config.env;
if (env !== 'local') {
await service.wx.fetchAccessToken();
}
}
}
module.exports = GetAccessToken;
三. 定义路由
'use strict';
module.exports = app => {
const { router, controller } = app;
router.get('/live/syncRoomList', controller.live.syncRoomList); // 同步直播间列表
router.get('/live/syncLiveReplay', controller.live.syncLiveReplay); // 同步直播回放
router.get('/live/getRoomList', controller.live.getRoomList); // 前端获取直播间列表
};
四. 定义service服务层
- 定义一个用来记录当天请求次数的方法
/**
* 记录当天的请求次数, type: 1:直播列表 2:回放视频
*/
async syncReqNum(type) {
const { ctx, app } = this;
const date = ctx.helper.getYMD();
const numKey = `${type === 1 ? '_inc_live_roominfo_reqnum_' : '_inc_live_replay_reqnum_'}${date}`;
const roomReqNum = await app.redis.get(numKey);
const num = roomReqNum ? parseInt(roomReqNum) + 1 : 1;
await app.redis.set(numKey, num + '');
}
2. 查询本地数据库,获取直播房间列表
/**
* 获取直播房间列表
*/
async getRoomList(limit, offset) {
const { app } = this;
const options = {
offset,
limit,
order: [[ 'start_time', 'desc' ], [ 'is_top', 'desc' ]],
where: {},
};
return app.model.Wxlive.findAndCountAll(options);
}
3. 传递房间ID,获取对应的回放源数据
/**
* 同步回放源视频
*/
async syncLiveReplay(roomId) {
const { ctx, app, service } = this;
const accessToken = await service.wx.getAccessToken();
if (!accessToken) {
return '';
}
const url = `http://api.weixin.qq.com/wxa/business/getliveinfo?access_token=${accessToken}`;
const params = { start: 0, limit: 100, room_id: roomId, action: 'get_replay' };
const res = await ctx.curl(url, { method: 'POST', contentType: 'json', data: params, dataType: 'json' });
ctx.logger.info('【获取回放源视频】 ==> ' + JSON.stringify(res.data));
await this.syncReqNum(2); // 记录当天的请求次数
if (res.data.errcode === 0 && res.data.errmsg === 'ok') {
const liveReplay = JSON.stringify(res.data.live_replay); // 一场直播可能会产生多个视频片段,此处是列表的字符串
const model = await app.model.Wxlive.findOne({ where: { roomid: roomId } });
if (model) {
await model.update({ live_replay: liveReplay });
}
return liveReplay;
}
return ''; // 代表未创建直播房间
}
4. 循环加载直播房间列表
跟小程序直播后台直播记录保持一致, 有则更新, 无则删除
/**
* 同步直播房间列表
*/
async syncRoomList() {
const { ctx, app, service } = this;
const accessToken = await service.wx.getAccessToken();
if (!accessToken) {
return { code: 0, msg: '同步直播间列表失败: errcode=40001, errmsg=直播间列表为空' };
}
let page = 1;
const pageSize = 50;
const roomIds = []; // 直播间ID列表
const url = `http://api.weixin.qq.com/wxa/business/getliveinfo?access_token=${accessToken}`;
while (true) {
const params = { start: (page - 1) * pageSize, limit: pageSize };
const res = await ctx.curl(url, { method: 'POST', contentType: 'json', data: params, dataType: 'json' });
await this.syncReqNum(1); // 记录当天的请求次数
const errcode = res.data.errcode;
if (errcode !== 0) {
if (errcode === 1) {
return { code: 0, msg: `同步直播间列表失败: errcode=${errcode}, errmsg=直播间列表为空` };
} else if (errcode === 48001) {
return { code: 0, msg: `同步直播间列表失败: errcode=${errcode}, errmsg=小程序没有直播权限` };
}
return { code: 0, msg: `同步直播间列表失败: errcode=${errcode}, errmsg=${res.data.errmsg}` };
}
const roomList = res.data.room_info;
for (const room of roomList) {
const roomId = room.roomid;
roomIds.push(roomId); // 添加到直播间ID列表
const wxlive = await app.model.Wxlive.findOne({ where: { roomid: roomId } });
const updateData = { name: room.name, cover_img: room.cover_img, live_status: room.live_status, start_time: room.start_time, end_time: room.end_time, anchor_name: room.anchor_name, anchor_img: room.anchor_img, goods: JSON.stringify(room.goods) };
if (!wxlive) {
// 不存在,创建一个
const insertData = updateData;
insertData.roomid = roomId;
await app.model.Wxlive.create(insertData);
if (room.live_status === 103) {
// 直播已结束, 需要获取回放源视频保存到数据库
await this.syncLiveReplay(roomId);
}
continue;
}
// 数据库中存在, 判断已结束的直播是否有回放地址
if (wxlive.live_status === 103 && !wxlive.live_replay) {
// 不存在回放地址, 则需要获取回放源视频保存到数据库
await this.syncLiveReplay(roomId);
}
await wxlive.update(updateData); // 更新数据库中的直播数据
}
if (res.data.total < page * pageSize) {
// 总数小于 页码*页长, 跳出循环
break;
}
page++; // 查询下一页
}
// 当所有直播列表都遍历完后, 删除已经不存在的直播数据
const arr = await app.model.Wxlive.findAll({ where: { roomid: { [app.Sequelize.Op.notIn]: roomIds } } });
if (arr && arr.length > 0) {
for (const item of arr) {
await item.destroy();
}
}
return { code: 1, msg: '同步直播间列表成功' };
}
五. 定义controller控制层
'use strict';
const Controller = require('egg').Controller;
class LiveController extends Controller {
/**
* 同步直播房间列表
*/
async syncRoomList() {
const { ctx, service } = this;
ctx.body = await service.live.syncRoomList();
}
/**
* 同步回放源视频
*/
async syncLiveReplay() {
const { ctx, service } = this;
const roomId = parseInt(ctx.query.roomId || '0');
const liveReplay = await service.live.syncLiveReplay(roomId);
ctx.body = { code: 1, msg: 'success', data: liveReplay };
}
/**
* 获取直播房间列表
*/
async getRoomList() {
const { ctx, service } = this;
const limit = ctx.helper.toInt(ctx.query.pageSize || 10);
const pageNum = ctx.helper.toInt(ctx.query.pageNum || 1);
const offset = (pageNum - 1) * limit;
const res = await service.live.getRoomList(limit, offset);
const liveList = res.rows;
if (res.count > 0) {
for (const item of liveList) {
if (item.goods && ctx.helper.isJSON(item.goods)) {
item.goods = JSON.parse(item.goods);
} else {
item.goods = [];
}
if (item.live_replay && ctx.helper.isJSON(item.live_replay)) {
item.live_replay = JSON.parse(item.live_replay);
} else {
item.live_replay = [];
}
if (!item.anchor_img) {
item.anchor_img = 'https://www.xxx.com/app-logo.png';
}
}
}
ctx.body = { total_count: res.count, page_size: limit, cur_page: pageNum, data: liveList };
}
}
module.exports = LiveController;