美耐皿 技术支持 东莞网站建设,全国最新网站备案查询,wordpress 分类 seo,网站备案 新闻类前置审批文章目录 实现目标实现步骤封装日志类封装线程池封装线程封装锁封装线程池 TCP通信的接口和注意事项accept TCP封装任务客户端Client.hppClient.cc 服务端Server.hpp Server.cc实现效果 守护进程服务端守护进程化 实现目标
利用线程池多线程并发实现基于TCP通信的多个客户端与… 文章目录 实现目标实现步骤封装日志类封装线程池封装线程封装锁封装线程池 TCP通信的接口和注意事项accept TCP封装任务客户端Client.hppClient.cc 服务端Server.hpp Server.cc实现效果 守护进程服务端守护进程化 实现目标
利用线程池多线程并发实现基于TCP通信的多个客户端与服务端之间的交互客户端发送数据服务端接收后处理数据并返回。服务端为守护进程
实现步骤
封装一个记录日志的类将程序运行的信息保存到文件封装线程类、服务端处理任务类以及将锁进行封装为方便实现线程池实现服务端使服务端能接收客户端所发来的数据处理数据后返回。服务端采用多线程并发处理封装守护进程方法使服务端为守护进程实现客户端可以向服务端发送数据并接收到服务端发送回来的数据
封装日志类
将程序运行的信息保存到指定文件例如创建套接字成功或者失败等信息。以【状态】【时间】【信息】的格式保存。
状态可分为五种“DEBUG”,“NORMAL”,“WARNING”,“ERROR”,“FATAL”
日志类保存的信息需带有可变参数
#pragma once#include iostream
#include string
#include cstdarg
#include ctime
#include unistd.husing namespace std;#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char *to_levelstr(int level)
{switch (level){case DEBUG:return DEBUG;case NORMAL:return NORMAL;case WARNING:return WARNING;case ERROR:return ERROR;case FATAL:return FATAL;default:return nullptr;}
}void LogMessage(int level, const char *format, ...)
{
#define NUM 1024char logpre[NUM];snprintf(logpre, sizeof(logpre), [%s][%ld][%d], to_levelstr(level), (long int)time(nullptr), getpid());char line[NUM];// 可变参数va_list arg;va_start(arg, format);vsnprintf(line, sizeof(line), format, arg);// 保存至文件FILE* log fopen(log.txt, a);FILE* err fopen(log.error, a);if(log err){FILE *curr nullptr;if(level DEBUG || level NORMAL || level WARNING) curr log;if(level ERROR || level FATAL) curr err;if(curr) fprintf(curr, %s%s\n, logpre, line);fclose(log);fclose(err);}
}封装线程池
封装线程
将线程的创建等待封装成类的成员函数。不再需要单个的条用线程库接口以对象的方式创建。
需要注意在类里面的线程回调方法必须设为static类型而静态的方法是不能访问类内成员的因此传给回调函数的参数需要将整个对象传过去通过对象来获取类内成员
#pragma once#include iostream
#include string
#include cstring
#include cassert
#include functional
#include pthread.htypedef std::functionvoid *(void *) func_t;class Thread
{
private:// 在类内创建线程想让线程执行对应的方法需要将方法设置成为staticstatic void *start_routine(void *args) // 类内成员有缺省参数{Thread *_this static_castThread *(args);return _this-callback();}public:// 构造函数里直接生成线程名利用静态变量从1开始Thread(){char namebuffer[1024];snprintf(namebuffer, sizeof namebuffer, thread-NO.%d, threadnum);_name namebuffer;}// 线程启动void start(func_t func, void *args nullptr){_func func;_args args;// 由于静态的方法是不能访问类内成员的// 因此传给回调函数的参数需要将整个对象传过去通过对象来获取类内成员// 也就是this指针int n pthread_create(_tid, nullptr, start_routine, this);assert(n 0);(void)n;}// 线程等待void join(){int n pthread_join(_tid, nullptr);assert(n 0);(void)n;}~Thread(){}void *callback(){return _func(_args);}private:std::string _name; // 类名func_t _func; // 线程回调函数void *_args; // 线程回调函数的参数pthread_t _tid; // 线程idstatic int threadnum; // 线程的编号为生成线程名
};
// static的成员需在类外初始化
int Thread::threadnum 1;封装锁
同样的为了不再需要一直调用系统接口可以将整个方法封装成类通过类的对象实现加锁过程
#pragma once#include iostream
#include pthread.h// 加锁 解锁
class Mutex
{
public:Mutex(pthread_mutex_t *lock_p nullptr) : _lock_p(lock_p){}// 加锁void lock(){if (_lock_p)pthread_mutex_lock(_lock_p);}// 解锁void unlock(){if (_lock_p)pthread_mutex_unlock(_lock_p);}~Mutex(){}private:pthread_mutex_t *_lock_p;
};// 锁的类
class LockGuard
{
public:LockGuard(pthread_mutex_t *mutex) : _mutex(mutex){_mutex.lock(); // 在构造函数中进行加锁}~LockGuard(){_mutex.unlock(); // 在析构函数中进行解锁}private:Mutex _mutex;
};封装线程池
在类里面的线程回调方法必须设为static类型而静态的方法是不能访问类内成员的因此传给回调函数的参数需要将整个对象传过去通过对象来获取类内成员。
线程池需要实现为单例模式 第一步就是把构造函数私有再把拷贝构造和赋值运算符重载delete在设置获取单例对象的函数的时候注意要设置成静态成员函数因为在获取对象前根本没有对象无法调用非静态成员函数可能会出现多个线程同时申请资源的场景所以还需要一把锁来保护这块资源而这把锁也得设置成静态因为单例模式的函数是静态的 #pragma once#include Thread.hpp
#include log.hpp
#include Lock.hpp
#include vector
#include queue
#include mutex
#include pthread.h
#include unistd.husing namespace std;// 线程池类定义位于下面因此属性类想要获取到
// 就必须在前面声明
template class T
class ThreadPool;template class T
class ThreadData
{
public:ThreadPoolT *threadpool; // 线程所在的线程池获取到线程的this指针std::string _name; // 线程的名字public:ThreadData(ThreadPoolT *tp, const std::string name) : threadpool(tp), _name(name){}
};template class T
class ThreadPool
{
private:// 线程最终实现的方法static void *handlerTask(void *args){ThreadDataT *td (ThreadDataT *)args;while (true){T t;{LockGuard lockguard(td-threadpool-mutex());while (td-threadpool-isQueueEmpty()){td-threadpool-threadWait();}t td-threadpool-pop();}t();}delete td;return nullptr;}ThreadPool(const int num 10) : _num(num){pthread_mutex_init(_mutex, nullptr);pthread_cond_init(_cond, nullptr);for (int i 0; i _num; i){_threads.push_back(new Thread());}}void operator(const ThreadPool ) delete;ThreadPool(const ThreadPool ) delete;public:// 将加锁 解锁 判断任务队列是否为空 和条件变量等待全部封装成类内方法// 方便在线程的回调方法中通过对象直接调用void lockQueue() { pthread_mutex_lock(_mutex); }void unlockQueue() { pthread_mutex_unlock(_mutex); }bool isQueueEmpty() { return _task_queue.empty(); }void threadWait() { pthread_cond_wait(_cond, _mutex); }// 任务队列删除队头并返回队头的任务T pop(){T t _task_queue.front();_task_queue.pop();return t;}pthread_mutex_t *mutex(){return _mutex;}public:// 让每个线程对象调用其启动函数并将线程辅助类和最终执行的任务方法传入函数中// 线程的辅助类对象里包含了线程当前线程池对象也就是可以// 通过辅助类对象可以调用到线程池对象里的成员void run(){for (const auto t : _threads){ThreadDataT *td new ThreadDataT(this, t-threadname());t-start(handlerTask, td);// 创建成功后打印日志LogMessage(DEBUG, %s start ..., t-threadname().c_str());}}// 往任务队列里插入一个任务void push(const T in){LockGuard lockguard(_mutex);_task_queue.push(in);pthread_cond_signal(_cond);}~ThreadPool(){pthread_mutex_destroy(_mutex);pthread_cond_destroy(_cond);for (const auto t : _threads)delete t;}// 实现单例模式static ThreadPoolT *getInstance(){if (nullptr tp){_singlock.lock();if (nullptr tp){tp new ThreadPoolT();}_singlock.unlock();}return tp;}private:int _num;//线程的数量std::vectorThread * _threads;//线程组std::queueT _task_queue;//任务队列pthread_mutex_t _mutex;//锁pthread_cond_t _cond;//条件变量static ThreadPoolT *tp;static std::mutex _singlock;
};template class T
ThreadPoolT *ThreadPoolT::tp nullptr;template class T
std::mutex ThreadPoolT::_singlock;TCP通信的接口和注意事项
为了实现TCP版的通信首先来了解一下相关接口和注意事项 TCP需要在通信前先创建链接因此在TCP没有链接之前其创建的套接字并不是用来通信的而是用来监听的。一旦创建链接成功后才会返回一个用来通信的套接字TCP时面向字节流的因此其通信就是往文件上IO因此不用指定的调用某接口去完成直接用文件接口读写就可以完成 accept
#include sys/types.h /* See NOTES */
#include sys/socket.hint accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);这就是用来创建链接的接口
参数一为负责监听的套接字
参数二就是socket的结构体
参数三为结构体的大小
返回值成功创建链接之后会返回一个值这个值就是负责通信的套接字也就是后面利用文件通信的文件描述符
TCP
封装任务
因为上述说到TCP是可以直接使用文件操作来完成通信的那么也就是说其通信根本就用不到其他的成员了只需要知道一个套接字即可。那么这个方法就可以不放在类内因为这就是线程最后的执行目的因此可以将这个任务单独放到一个头文件中。因为线程池是一个模板类则可以封装一个任务类。
#pragma once#include iostream
#include string
#include cstdio
#include functional
#include log.hpp// TCP的通信
// 线程的最终执行方法
void ServerIO(int sock)
{char buffer[1024];while (true){ssize_t n read(sock, buffer, sizeof(buffer) - 1);if (n 0){// readbuffer[n] 0;std::cout recv message: buffer std::endl;// writestd::string outbuffer buffer;outbuffer server[echo];write(sock, outbuffer.c_str(), outbuffer.size());}else if (n 0){// 代表client退出LogMessage(NORMAL, client quit, me too!);break;}}close(sock);
}// 任务类
// 为了最终执行的方法而服务
class Task
{using func_t std::functionvoid(int);public:Task(){}Task(int sock, func_t func): _sock(sock), _callback(func){}void operator()(){_callback(_sock);}private:int _sock; // 通信套接字func_t _callback;
};客户端
客户端不需要显示的绑定端口号而是由操作系统随机去绑定。TCP的客户端也不需要监听因为并没有去主动链接客户端所以不需要accept。TCP的客户端只需要向服务端发起链接请求
Client.hpp
#pragma once#include iostream
#include string
#include cstring
#include sys/socket.h
#include sys/types.h
#include netinet/in.h
#include arpa/inet.h
#include unistd.h
#include log.hppusing namespace std;class Client
{
public:Client(const string serverip, const uint16_t port): _serverip(serverip), _port(port), _sock(-1){}void Init(){// 创建套接字_sock socket(AF_INET, SOCK_STREAM, 0);if (_sock 0){LogMessage(FATAL, create socket error);exit(1);}// TCP的客户端也不需要显示绑定端口让操作系统随机绑定// TCP的客户端也不需要监听因为并没有去主动链接客户端所以不需要accept// TCP的客户端只需要向服务端发起链接请求}void start(){// 向服务端发起链接请求struct sockaddr_in local;memset(local, 0, sizeof(local));local.sin_family AF_INET;local.sin_port htons(_port);local.sin_addr.s_addr inet_addr(_serverip.c_str());if (connect(_sock, (struct sockaddr *)local, sizeof(local)) 0)LogMessage(ERROR, connect socket error);// 和服务端通信else{string line;while (1){cout Please cin: endl;getline(cin, line);// 向服务端写write(_sock, line.c_str(), line.size());// 读服务端返回来的数据char buff[1024];int n read(_sock, buff, sizeof(buff) - 1);if (n 0){buff[n] 0;cout 接收到的消息为 buff endl;}elsebreak;}}}~Client(){if(_sock 0)close(_sock);}private:int _sock;string _serverip;uint16_t _port;
};Client.cc
#include Client.hpp
#include memory// 输出命令错误函数
void Usage(string proc)
{cout Usage:\n\t proc local_ip local_port\n\n;
}int main(int argc, char* argv[])
{// 再运行客户端时输入的指令需要包括主机ip和端口号if(argc ! 3){Usage(argv[0]);exit(1);}string serverip argv[1];uint16_t port atoi(argv[2]);unique_ptrClient client(new Client(serverip, port));client-Init();client-start();return 0;
}服务端
那么对于服务端而言必须要显式的去绑定端口号。则创建的套接字并不是负责通信的。创建好套接字和绑定完网络信息后需要设置创建的套接字为监听状态。和UDP一样服务端是不能指定IP的.
还需要注意的是因为封装的线程池是单例模式所以不需要创建对象直接调用静态对象去调用类方法即可 步骤可分为 创建监听套接字绑定网络信息设置套接字为监听状态获取链接得到通信的套接字通信关闭不需要的套接字 Server.hpp
#pragma once#include Task.hpp
#include ThreadPool.hpp
#include sys/types.h
#include sys/socket.h
#include cstring
#include netinet/in.h
#include arpa/inet.hclass Server
{
public:Server(const uint16_t port 8000): _port(port){}void Init(){// 创建负责监听的套接字 面向字节流_listenSock socket(AF_INET, SOCK_STREAM, 0);if (_listenSock 0){LogMessage(FATAL, create socket error!);exit(1);}LogMessage(NORMAL, create socket %d success!, _listenSock);// 绑定网络信息struct sockaddr_in local;memset(local, 0, sizeof(local));local.sin_family AF_INET;local.sin_port htons(_port);local.sin_addr.s_addr INADDR_ANY;if (bind(_listenSock, (struct sockaddr *)local, sizeof(local)) 0){LogMessage(FATAL, bind socket error!);exit(3);}LogMessage(NORMAL, bind socket success!);// 设置socket为监听状态if (listen(_listenSock, 5) 0){LogMessage(FATAL, listen socket error!);exit(4);}LogMessage(NORMAL, listen socket success!);}void start(){while (1){// 因为线程池时单例模式所以直接调用初始化ThreadPoolTask::getInstance()-run();LogMessage(NORMAL, Thread init success);// server获取建立新连接struct sockaddr_in peer;memset(peer, 0, sizeof(peer));socklen_t len sizeof(peer);// 创建通信的套接字// accept的返回值才是真正用于通信的套接字_sock accept(_listenSock, (struct sockaddr *)peer, len);if (_sock 0){// 获取通信的套接字失败并不影响未来的操作只是当前的链接失败而已LogMessage(ERROR, accept socket error, next);continue;}LogMessage(NORMAL, accept socket %d success, _sock);cout sock: _sock endl;// 往线程池的任务队列里插入任务ThreadPoolTask::getInstance()-push(Task(_sock, ServerIO));}}private:int _listenSock; // 负责监听的套接字int _sock; // 通信的套接字uint16_t _port; // 端口号
};Server.cc
#include Server.hpp
#include daemon.hpp
#include memory// 输出命令错误函数
void Usage(string proc)
{cout Usage:\n\t proc local_ip local_port\n\n;
}int main(int argc, char* argv[])
{// 启动服务端不需要指定IPif(argc ! 2){Usage(argv[0]);exit(1);}uint16_t port atoi(argv[1]);unique_ptrServer server(new Server(port));server-Init();server-start();return 0;
}实现效果 可以看到多个客户端同时访问也没有问题并且所对应的套接字也就是文件描述符也不一样。
守护进程
守护进程是一种特殊的孤儿进程其运行于后台生存期较长并且独立与终端周期性的执行任务或者等待处理任务
进程分为前台运行和后台运行每一个进程都会属于一个会话组里。每一个会话组都有且只有能一个前台进程。像上述的服务端当运行服务端时操作系统会将其分到含有bash的会话组内并且将服务端置为前台任务进程因此服务端运行时bash把放置后台这也就是为什么用户不能再bash继续输入命令的原因。
每一个会话组都会有一个组长一般而言在bash中输入命令执行的进程都会分到bash的会话组内这个会话组的组长即为bash。可以通过查看进程的SID确认进程的会话组 可以看到上述图片中运行了三个进程并置于后台他们的SID也就是会话组都是一样的。那么如果将他们置于前台运行会发生什么呢 可以看到置于前台运行后命令行输入什么都没有反应了。也就是说此时的bash被自动的放到了后台运行证实了一个会话组只能有一个前台进程 输入ctr Z 之后前台的进程就会把切回后台但是切回后台后进程是阻塞状态的因此输入bg 作业号就可让进程启动。
服务端守护进程化
那么很显然在业务逻辑上服务端肯定是需要守护进程化的。因为服务端没有特殊情况是不会关闭的需要一直运行。如果服务端是前台进程的话那服务端运行时bash都不能用了显然不符合。
这里要介绍一个接口
#include unistd.h
pid_t setsid(void);这个接口的作用是使调用的进程独立成为一个会话组并且为该组的组长。但是调用这个接口是有前置条件的调用这个接口的进程不能为某个会话组的组长 守护进程化的步骤 让调用进程忽略掉异常信号因为其不受终端控制的让调用进程不为组长关闭或者重定向之前默认打开的文件如0 1 2文件描述符 #pragma once#include unistd.h
#include signal.h
#include cstdlib
#include cassert
#include sys/types.h
#include sys/stat.h
#include fcntl.h#define DEV /dev/nullvoid daemonSelf(const char *currPath nullptr)
{// 1. 让调用进程忽略掉异常的信号signal(SIGPIPE, SIG_IGN);// 2. 让自己不是组长setsidif (fork() 0)exit(0);// 子进程 -- 守护进程精灵进程本质就是孤儿进程的一种pid_t n setsid();assert(n ! -1);// 3. 守护进程是脱离终端的关闭或者重定向以前进程默认打开的文件int fd open(DEV, O_RDWR);if(fd 0){dup2(fd, 0);dup2(fd, 1);dup2(fd, 2);close(fd);}else{close(0);close(1);close(2);}
}接着只需要服务端在初始化完成后调用这个函数将自己设为守护进程化即可 一起来看看效果 可以看到服务端启动后并不会影响bash仍然可以在bash上输入指令去执行。客户端也能够很好的接收到数据这就符合现实中服务端的逻辑。