The last article implements a reactive Echo server using the Epoll mechanism provided by Linux core 2.6, using the maximum benefit of the reactive server is the number of threads within the thread pool can be configured in the number of threads instead of the client's concatenation. . I used the PTHREAD library to write a thread pool for the first time, using a thread pool of a work queue. I feel that the thread pool of the queue method can be used as a design pattern. On many platforms, the thread pool can be implemented in this way. From Win32, UNIX to JVM is applicable.
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define maxline 10
#DEfine Open_MAX 100
#define Listenq 20
#define serv_port 5555
#define inFTim 1000
// Thread pool task queue structure
Struct Task {
INT fd; / / need to read and write file descriptors
Struct Task * next; // Next task
}
// Delivery to read two aspects of two aspects
Struct User_data {
Int fd;
Unsigned int n_size;
Char line [maxline];
}
// Thread task function
Void * Readtask; Void * ARGS;
Void * WriteTask (Void * args);
// Declare the variable of the EPOLL_EVENT structure, EV is used to register events, an array is used to return the event to be processed
Struct Epoll_Event EV, Events [20];
Int EPFD;
PTHREAD_MUTEX_T MUTEX;
Pthread_cond_t cond1;
Struct Task * Readhead = null, * readtail = null, * Writehead = NULL;
Void setnonblocking (int Sock)
{
Int OPTS;
OPTS = FCNTL (SOCK, F_GETFL);
IF (OPTS <0)
{
"" "FCNTL (SOCK, GETFL)");
Exit (1);
}
OPTS = OPTS | O_NONBLOCK;
IF (FCNTL (Sock, F_Setfl, OPTS) <0)
{
"" FCNTL (Sock, SETFL, OPTS ");
Exit (1);
}
}
int main ()
{
INT I, MAXI, Listenfd, Connfd, SockFd, NFDS
PTHREAD_T TID1, TID2;
Struct Task * new_task = NULL;
Struct user_data * rdata = null;
Socklen_t clilen;
Pthread_mutex_init (& Mutex, Null);
Pthread_cond_init (& COND1, NULL);
// Initialize threads used to read thread pools
Pthread_create (& TID1, NULL, READTASK, NULL);
Pthread_create (& TID2, NULL, READTASK, NULL);
/ / Generate a file descriptor for handling an EPOLL of Accept
EPFD = EPOLL_CREATE (256);
Struct SockAddr_in ClientAddr; Struct SockAddr_in ServerAddr;
Listenfd = Socket (AF_INET, SOCK_STREAM, 0);
// Set the socket to non-blocking mode
SetnonBlocking (Listenfd);
/ / Set the file descriptor related to the event to be processed
ev.Data.fd = listenfd;
/ / Set the type of event to be processed
Ev.Events = EPOLLIN | EPOLLET;
// Register your Epoll event
EPOLL_CTL (EPFD, EPOLL_CTL_ADD, LISTENFD, & EV);
Bzero (& ServerAddr, SizeOf (ServerAddr));
ServerAddr.sin_Family = AF_INET;
CHAR * local_addr = "200.200.200.222";
inet_aton (local_addr, & (serveraddr.sin_addr); // htons (serv_port);
ServerAddr.sin_Port = HTONS (serv_port);
Bind (Listenfd, SockAddr *) & ServerAddr, SIZEOF (ServerAddr));
Listen (Listenfd, Listenq);
Maxi = 0;
For (;;) {
// Waiting for the occurrence of EPOLL events
NFDS = EPOLL_WAIT (EPFD, Events, 20,500);
// Treat all events happened
For (i = 0; i { IF (Events [i] .data.fd == listenfd) { Connfd = Accept (Listenfd, (SockAddr *) & ClientAddr, & Clilen; IF (connfd <0) { PERROR ("Connfd <0"); Exit (1); } SetnonBlocking (connfd); Char * str = inet_ntoa (clientaddr.sin_addr); Std :: cout << "Connec_ from >>" << str << std :: endl; // Set the file descriptor for read operations Ev.data.fd = connfd; // Set read operating events for the injection Ev.Events = EPOLLIN | EPOLLET; // Register EV EPOLL_CTL (EPFD, EPOLL_CTL_ADD, CONNFD, & EV); } Else IF (Events [i]. Events & EPOLLIN) { Printf ("Reading! / N"); IF ((sockfd = events [i] .data.fd) <0) Continue; New_task = new task (); NEW_TASK-> fd = sockfd; NEW_TASK-> next = NULL; // Add a new reading task PTHREAD_MUTEX_LOCK (& MUTEX); IF (readhead == null) { Readhead = New_TASK; Readtail = New_TASK; } Else { Readtail-> next = new_task; Readtail = New_TASK; } / / Wake all threads waiting for COND1 condition pthread_cond_broadcast (& cond1); Pthread_mutex_unlock (& mutex); ELSE IF (Events [i]. Events & Epollout) { RDATA = (struct user_data *) Events [i] .data.ptr; SOCKFD = RDATA-> FD; Write (sockfd, rdata-> line, rdata-> n_size); Delete rdata; // Set the file descriptor for read operations ev.Data.fd = SOCKFD; // Set read operating events for the injection Ev.Events = EPOLLIN | EPOLLET; / / Modify the event to be processed on SOCKFD to ePolin EPOLL_CTL (EPFD, EPOLL_CTL_MOD, SOCKFD, & EV); } } } } Void * readtask (void * args) { INT fd = -1; Unsigned int N; // is used to pass the read data out Struct User_Data * Data = NULL; While (1) { PTHREAD_MUTEX_LOCK (& MUTEX); // Waiting to the task queue is not empty While (readhead == null) Pthread_cond_wait (& cond1, & mutex); FD = readhead-> fd; // Take a reading task from the task queue Struct Task * TMP = Readhead; Readhead = readhead-> next; DELETE TMP; Pthread_Mutex_Unlock (& Mutex); Data = new user_data (); Data-> fd = fd; IF ((N = Read (FD, Data-> line, maxline) <0) { IF (errno == ECONNRESET) { Close (FD); Else Std :: cout << "Readline Error" << std :: endl; IF (Data! = NULL) delete data; } else if (n == 0) { Close (FD); Printf ("Client Close Connect! / N); IF (Data! = NULL) delete data; } else { Data-> n_size = n; / / Set the data that needs to be passed Ev.data.ptr = data; // Set a write operation event for the injection ev. Events = Epollout | EPOLLET; / / Modify the event to be processed on SOCKFD for ePollout EPOLL_CTL (EPFD, EPOLL_CTL_MOD, FD, & EV); } } }