开发成长之路(14)-- 小项目:视频点播器服务端(放码过来) 文章目录系列教程一览“看,未来”的个...
作者: CSDN博客 更新时间:2021-05-09 18:56:40 原文链接
文章目录
系列教程一览
开发成长之路(2)-- C语言从入门到开发(函数与定制输入输出控制函数)
开发成长之路(3)-- C语言从入门到开发(讲明白指针和引用,链表很难吗?)
开发成长之路(4)-- C语言从入门到开发(距离开发,还差这一篇)
开发成长之路(5)-- C语言从入门到开发(仿ATM机项目,我写的第一个项目)
开发成长之路(6)-- C++从入门到开发(C++入门不难)
开发成长之路(6)-- C++从入门到开发(C++知名库:STL入门·容器(一))
开发成长之路(7)-- C++从入门到开发(C++知名库:STL入门·容器(二))
开发成长之路(8)-- C++从入门到开发(C++知名库:STL入门·容器(三))
开发成长之路(9)-- C++从入门到开发(C++知名库:STL入门·空间配置器)
开发成长之路(10)-- C++从入门到开发(C++知名库:STL入门·算法)
开发成长之路(12)-- Linux网络服务端编程(通识篇之熟悉操作环境)
开发成长之路(13)-- Linux网络服务端编程(通识篇)“看,未来”的个人简介
朋友们大家好,我是“看,未来”,最近跟CSDN头部大佬们学了这一招,听说挺受用的,我也来试试。
本系列主要面向想要走开发路线,又不知从何学起的大学生。
我本人也是在大一的时候就去参加了培训,后来又自学了一段时间,在这期间,我觉得更重要的是跟行业内的前辈们请教,这比培训来的实在多了。
网页右侧有我的个人微信二维码,如果对学习有困惑的小伙伴可以扫我,知无不言,言无不尽,欢迎来聊。
项目需求分析
我先简单的放一张架构图吧

这次这个小项目分为 前置服务器、后置服务器、进程间通信、数据库通信、性能检测、日志六个模块。
前置服务器:
使用epoll+socket模式,实现大量并发连接的快速处理。并接收通讯数据包,判断是否该传入后置服务器(心跳包就地抛弃)
进程间通信:
shm共享内存
后置服务器:
线程池处理业务、解压包模块内置、数据库连接查询、数据包协议
数据库通信:
sqlite3、MySQL选一个
性能检测:
单用户单业务、单用户多业务、多用户单业务、多用户多业务。
日志:
记录运行过程的日常日志、记录故障的故障日志。
业务分类:
连接、登录、注册、查询视频、查询历史记录、请求播放、退出连接
时间安排:20天周期。三天需求分析、三天demo测试、八天代码编写、四天联调、两天优化代码与文档撰写。
放码过来
很长的啊,我稍微放几个比较重要的出来。
有需要参照的小伙伴可以扫右侧我的微信找我拿,没思路也可以找我聊。
线程池.cpp
#include "e_pthread_pool.h" E_PThread_Pool::E_PThread_Pool(unsigned int max,unsigned int min,unsigned int wait) { //配置基本参数 count = 0; waitcount = 0; mincount = min; maxcount = max; waitsec = wait; Stop = false; //上锁,创建一定数量的线程作为初始线程池 cond.lock(); for (unsigned i = 0; i < mincount; i++) { createThread(); } cond.unlock(); } E_PThread_Pool::~E_PThread_Pool() { destroyThread(); } void E_PThread_Pool::addTask(Task *task) { if (Stop) //线程池是否停止工作 { return; } //向任务队列中添加新任务 taskCond.lock(); taskList.push_back(task); taskCond.unlock(); cond.lock(); if(waitcount)//需要注意这里的waitcount配置 { if(count < maxcount) { createThread(); } cond.signal(); } else if(count < maxcount) { createThread(); } cond.unlock(); } void E_PThread_Pool::createThread() { pthread_t tid; int ret = pthread_create(&tid, NULL, taskThread, (void *)this); if (ret < 0) { perror("pthread create error"); } else { count++; } } //线程池创建一定数量的线程 void E_PThread_Pool::start(unsigned NumCreate) { cond.lock(); for (unsigned i = 0; i < NumCreate && i < maxcount-2; i++) //这里为什么是减2 { createThread(); } cond.unlock(); } // 工作线程 void * E_PThread_Pool::taskThread(void *arg) { pthread_detach(pthread_self()); //设置自分离 E_PThread_Pool *pool=(E_PThread_Pool *)arg; while(1) { pool->cond.lock(); //如果没有工作线程在等待 if (pool->taskList.empty()) { if(pool->Stop) { pool->count--; pool->cond.unlock(); pthread_exit(NULL); } pool->waitcount++; bool bSignal = pool->cond.timewait(pool->waitsec); //新任务等待被唤醒 pool->waitcount--; // 删除无用线程 if (!bSignal && pool->count > pool->mincount) { pool->count--; pool->cond.unlock(); pthread_exit(NULL); } } pool->cond.unlock(); //如果有工作线程在等待 if (!pool->taskList.empty()) { //获取一个工作线程并执行 pool->taskCond.lock(); Task *t = pool->taskList.front(); pool->taskList.pop_front(); //移除工作线程 pool->taskCond.unlock(); t->run(); //任务开始 delete t; } } pthread_exit(NULL); } void E_PThread_Pool::destroyThread() { printf("destroy?\n"); #if 0 //等待所有线程执行完毕 list_task::iterator it = taskList.begin(); for (; it! = taskList.end(); it++) { Task *t = *it; delete t; t = NULL; } taskList.clear(); #endif Stop = true; while (count > 0) { cond.lock(); cond.broadcast(); cond.unlock(); // 等待所有线程执行完毕 sleep(1); } }
shm共享内存.cpp
#include "f_shm.h" #include <sys/types.h> #include <unistd.h> #include <sys/sem.h> #include <sys/ipc.h> #include <sys/shm.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <semaphore.h> typedef struct shmhead_st { int shmid; // 共享内存ID unsigned int blksize; // 块大小 unsigned int blocks; // 总块数 unsigned int rd_index; // 读索引 unsigned int wr_index; // 写索引 //必须放在共享内存内部才行 sem_t sem_mutex; // 用来互斥用的信号量 sem_t sem_full; // 用来控制共享内存是否满的信号量 sem_t sem_empty; // 用来控制共享内存是否空的信号量 }shmhead_t; F_Shm::F_Shm(key_t key, int blksize, int blocks) { this->open_shm(key, blksize, blocks); } F_Shm::F_Shm() { shmhead = NULL; payload = NULL; open = false; } F_Shm::~F_Shm() { this->close_shm(); } //返回头地址 bool F_Shm::creat_shm(key_t key, int blksize, int blocks) { int shmid = 0; //1. 查看是否已经存在共享内存,如果有则删除旧的 shmid = shmget(key, 0, 0); if (shmid != -1) { shmctl(shmid, IPC_RMID, NULL); // 删除已经存在的共享内存 } //2. 创建共享内存 shmid = shmget(key, sizeof(shmhead_t) + blksize*blocks, 0666 | IPC_CREAT | IPC_EXCL); if(shmid == -1) { ERR_EXIT("shmget"); } printf("Create shmid=%d size=%u \n", shmid, sizeof(shmhead_t) + blksize*blocks); //3.连接共享内存 shmhead = shmat(shmid, (void*)0, 0); //连接共享内存 if(shmhead == (void*)-1) { ERR_EXIT("shmat"); } memset(shmhead, 0, sizeof(shmhead_t) + blksize*blocks); //初始化 //4. 初始化共享内存信息 shmhead_t * pHead = (shmhead_t *)(shmhead); pHead->shmid = shmid; //共享内存shmid pHead->blksize = blksize; //共享信息写入 pHead->blocks = blocks; //写入每块大小 pHead->rd_index = 0; //一开始位置都是第一块 pHead->wr_index = 0; // sem_init(&pHead->sem_mutex, 1, 1); // 第一个1表示可以跨进程共享,第二个1表示初始值 sem_init(&pHead->sem_empty, 1, 0); // 第一个1表示可以跨进程共享,第二个0表示初始值 sem_init(&pHead->sem_full, 1, blocks);// 第一个1表示可以跨进程共享,第二个blocks表示初始值 //5. 填充控制共享内存的信息 payload = (char *)(pHead + 1); //实际负载起始位置 open = true; return true; } void F_Shm::dsy_shm() { shmhead_t *pHead = (shmhead_t *)shmhead; int shmid = pHead->shmid; //删除信号量 sem_destroy (&pHead->sem_full); sem_destroy (&pHead->sem_empty); sem_destroy (&pHead->sem_mutex); shmdt(shmhead); //共享内存脱离 //销毁共享内存 if(shmctl(shmid, IPC_RMID, 0) == -1) //删除共享内存 { printf("Delete shmid=%d \n", shmid); ERR_EXIT("shmctl rm"); } shmhead = NULL; payload = NULL; open = false; } void F_Shm::Destroy(key_t key) { int shmid = 0; //1. 查看是否已经存在共享内存,如果有则删除旧的 shmid = shmget(key, 0, 0); if (shmid != -1) { printf("Delete shmid=%d \n", shmid); shmctl(shmid, IPC_RMID, NULL); // 删除已经存在的共享内存 } } //返回头地址 bool F_Shm::open_shm(key_t key, int blksize, int blocks) { int shmid; this->close_shm(); //1. 查看是否已经存在共享内存,如果有则删除旧的 shmid = shmget(key, 0, 0); if (shmid == -1) { return this->creat_shm(key, blksize, blocks); } //2.连接共享内存 shmhead = shmat(shmid, (void*)0, 0); //连接共享内存 if(shmhead == (void*)-1) { ERR_EXIT("shmat"); } printf("Open shmid=%d size=%u \n", shmid, sizeof(shmhead_t) + blksize*blocks); //3. 填充控制共享内存的信息 payload = (char *)((shmhead_t *)shmhead + 1); //实际负载起始位置 open = true; return true; } //关闭共享内存 void F_Shm::close_shm(void) { if(open) { shmdt(shmhead); //共享内存脱离 shmhead = NULL; payload = NULL; open = false; } } void F_Shm::write_into_shm(const void *buf) { shmhead_t *pHead = (shmhead_t *)shmhead; sem_wait(&pHead->sem_full); //是否有资源写? 可用写资源-1 sem_wait(&pHead->sem_mutex); //是否有人正在写? printf("write to shm[%d] index %d \n", pHead->shmid, pHead->rd_index); memcpy(payload + (pHead->wr_index) * (pHead->blksize), buf, pHead->blksize); pHead->wr_index = (pHead->wr_index+1) % (pHead->blocks); //写位置偏移 sem_post(&pHead->sem_mutex); //解除互斥 sem_post(&pHead->sem_empty); //可用读资源+1 } void F_Shm::read_from_shm(void *buf) { shmhead_t *pHead = (shmhead_t *)shmhead; sem_wait(&pHead->sem_empty); //检测写资源是否可用 printf("read from shm[%d] index %d \n", pHead->shmid, pHead->rd_index); memcpy(buf, payload + (pHead->rd_index) * (pHead->blksize), pHead->blksize); //读位置偏移 pHead->rd_index = (pHead->rd_index+1) % (pHead->blocks); sem_post(&pHead->sem_full); //增加可写资源 }
协议包包头.h
#ifndef I_PACKET_PUBLIC_H #define I_PACKET_PUBLIC_H #include <string> #include <iostream> using namespace std; #define MAX_LEN 256 /************接入层数据包头************/ typedef struct packet_header_st { int fd;//用于前后端通信即目标客户端fd(服务器用到) int funcId; // 功能号 //登录包0x01,注册包0x02,找回密码0x03,心跳包0x04 //客户端上传视频点播请求功能0x10 客户端上传视频点播时长功能0x11 //播放历史请求包0x20 //请求视频列表0x30 int optid; // 操作码:请求0x00 和 应答0x01 int usrlenth;// 包体的长度 int packet_seq; //包序号 int packet_sum; //包总数 char srcAddr[6]; //预留 char dstAddr[6]; //预留 int syn; // 判断包头是否正确 0x04 }packet_header_t; /************接入层数据包尾************/ typedef struct packet_tali_st//包尾,用来验证数据包的完整性 { int pack_tail;//设置为0x05 }packet_tali_t; /************数据包报文整体************/ typedef struct packet_all_st { packet_header_t head; char body[1024-sizeof(packet_header_t)-sizeof(packet_tali_st)]; packet_tali_st tail; //unsigned len; //要发送的数据有效长度 }packet_all_st; /************业务层数据包体************/ typedef struct Pant//心跳结构体 功能号 0000 { bool flag;//心跳状态,true-在线,flase-离线 }Pant_t; //客户端登录请求包 typedef struct login { int id; char psw[10]; //密码 }Login_t; //登录应答包 typedef struct res_login_st { int login_ret; //登录结果: 1-登录成功,0-登录失败 }res_login_t; //客户端注册请求包 typedef struct Register { int id; //账号 char psw[10]; //密码 }Register_t; //注册应答包 typedef struct res_register_st { int register_ret; //注册结果: 0-注册成功,1-注册失败 }res_register_t; //客户端修改密码请求包 typedef struct Change_PWD { int id; //账号 char psw[10]; //密码 }c_pwd_t; //注册应答包 typedef struct res_changepwd_st { int change_ret; //注册结果: 0-注册成功,1-注册失败 }res_cpwd_t; //找回密码请求包 typedef struct findpwd { int id; }F_Pwd_t; //忘记密码应答包 typedef struct res_findpwd_st { int find_ret; //找回结果: 0-找回成功,1-找回失败 char pwd[20]; //密码 }res_fpwd_t; /************请求业务交易流水请求包************/ //上传视频点播请求包 typedef struct VideoPlay { int id; char video_id[20];//视频ID }VideoPlay_t; //点播视频应答包 typedef struct res_VideoPlay_st { char video_id[20]; //视频ID char video_name[20]; //视频名称 }res_videopaly_t; //上传停止播放请求包 typedef struct VideoStop { int user_id; //用户ID char video_id[20];//视频ID char video_seek[20];//播放时长 }VideoStop_t; //视频列表请求包 typedef struct VideoList { int funcId; /* 电视剧0x01,动漫0x02,电影0x03,综艺0x04,纪实0x05,少儿0x06,新闻0x07 */ }VideoList_t; //视频列表应答包 typedef struct Res_VideoList { int funcId; //功能号,即频道 char V[800]; }res_vl; //播放历史请求包 typedef struct Histort { int user_id; }his_t; //播放历史应答包 typedef struct History_get { char V[200];//视频id数组 // int percent[5];//播放时间数组 }hostory_t; //历史记录上传 typedef struct Up_Record { int user_id; int percent; char v_name[10]; char v_time[20]; }up_record_t; #endif // PACKET_BASE_H
解压包代码.cpp
#include "i_packet_base.h" #include <iostream> #include <unistd.h> #include <string.h> using namespace std; I_Packet_base::I_Packet_base(int size) :m_size(size) { m_data = new char[m_size]; } I_Packet_base::I_Packet_base(const I_Packet_base &packet) { m_size = packet.m_size; m_data = new char[packet.m_size]; memcpy(m_data, packet.m_data, packet.m_size); } I_Packet_base::~I_Packet_base() { delete []m_data; } char *I_Packet_base::getData() { return m_data; } unsigned int I_Packet_base::getSize() { return m_size; } I_Packet_base & I_Packet_base::operator =(const I_Packet_base &packet) { if(m_size != packet.m_size) { delete []m_data; m_size = packet.m_size; m_data = new char[packet.m_size]; } memcpy(m_data, packet.m_data, packet.m_size); return *this; }
嗯,实在太多,全放进来的话这篇文章就会因为太长而无法发布,有兴趣的小伙伴可以私信我。