Linux 2.6 kernel EPOLL usage illustrates (continued) - Add a read thread pool to the ECHO server

xiaoxiao2021-03-06  56

The last article implements a reactive Echo server using the Epoll mechanism provided by Linux core 2.6, using the maximum benefit of the reactive server is the number of threads within the thread pool can be configured in the number of threads instead of the client's concatenation. . I used the PTHREAD library to write a thread pool for the first time, using a thread pool of a work queue. I feel that the thread pool of the queue method can be used as a design pattern. On many platforms, the thread pool can be implemented in this way. From Win32, UNIX to JVM is applicable.

#include

#include

#include

#include

#include

#include

#include

#include

#include

#define maxline 10

#DEfine Open_MAX 100

#define Listenq 20

#define serv_port 5555

#define inFTim 1000

// Thread pool task queue structure

Struct Task {

INT fd; / / need to read and write file descriptors

Struct Task * next; // Next task

}

// Delivery to read two aspects of two aspects

Struct User_data {

Int fd;

Unsigned int n_size;

Char line [maxline];

}

// Thread task function

Void * Readtask; Void * ARGS;

Void * WriteTask (Void * args);

// Declare the variable of the EPOLL_EVENT structure, EV is used to register events, an array is used to return the event to be processed

Struct Epoll_Event EV, Events [20];

Int EPFD;

PTHREAD_MUTEX_T MUTEX;

Pthread_cond_t cond1;

Struct Task * Readhead = null, * readtail = null, * Writehead = NULL;

Void setnonblocking (int Sock)

{

Int OPTS;

OPTS = FCNTL (SOCK, F_GETFL);

IF (OPTS <0)

{

"" "FCNTL (SOCK, GETFL)");

Exit (1);

}

OPTS = OPTS | O_NONBLOCK;

IF (FCNTL (Sock, F_Setfl, OPTS) <0)

{

"" FCNTL (Sock, SETFL, OPTS ");

Exit (1);

}

}

int main ()

{

INT I, MAXI, Listenfd, Connfd, SockFd, NFDS

PTHREAD_T TID1, TID2;

Struct Task * new_task = NULL;

Struct user_data * rdata = null;

Socklen_t clilen;

Pthread_mutex_init (& Mutex, Null);

Pthread_cond_init (& COND1, NULL);

// Initialize threads used to read thread pools

Pthread_create (& TID1, NULL, READTASK, NULL);

Pthread_create (& TID2, NULL, READTASK, NULL);

/ / Generate a file descriptor for handling an EPOLL of Accept

EPFD = EPOLL_CREATE (256);

Struct SockAddr_in ClientAddr; Struct SockAddr_in ServerAddr;

Listenfd = Socket (AF_INET, SOCK_STREAM, 0);

// Set the socket to non-blocking mode

SetnonBlocking (Listenfd);

/ / Set the file descriptor related to the event to be processed

ev.Data.fd = listenfd;

/ / Set the type of event to be processed

Ev.Events = EPOLLIN | EPOLLET;

// Register your Epoll event

EPOLL_CTL (EPFD, EPOLL_CTL_ADD, LISTENFD, & EV);

Bzero (& ServerAddr, SizeOf (ServerAddr));

ServerAddr.sin_Family = AF_INET;

CHAR * local_addr = "200.200.200.222";

inet_aton (local_addr, & (serveraddr.sin_addr); // htons (serv_port);

ServerAddr.sin_Port = HTONS (serv_port);

Bind (Listenfd, SockAddr *) & ServerAddr, SIZEOF (ServerAddr));

Listen (Listenfd, Listenq);

Maxi = 0;

For (;;) {

// Waiting for the occurrence of EPOLL events

NFDS = EPOLL_WAIT (EPFD, Events, 20,500);

// Treat all events happened

For (i = 0; i

{

IF (Events [i] .data.fd == listenfd)

{

Connfd = Accept (Listenfd, (SockAddr *) & ClientAddr, & Clilen;

IF (connfd <0) {

PERROR ("Connfd <0");

Exit (1);

}

SetnonBlocking (connfd);

Char * str = inet_ntoa (clientaddr.sin_addr);

Std :: cout << "Connec_ from >>" << str << std :: endl;

// Set the file descriptor for read operations

Ev.data.fd = connfd;

// Set read operating events for the injection

Ev.Events = EPOLLIN | EPOLLET;

// Register EV

EPOLL_CTL (EPFD, EPOLL_CTL_ADD, CONNFD, & EV);

}

Else IF (Events [i]. Events & EPOLLIN)

{

Printf ("Reading! / N");

IF ((sockfd = events [i] .data.fd) <0) Continue;

New_task = new task ();

NEW_TASK-> fd = sockfd;

NEW_TASK-> next = NULL;

// Add a new reading task

PTHREAD_MUTEX_LOCK (& MUTEX);

IF (readhead == null)

{

Readhead = New_TASK;

Readtail = New_TASK;

}

Else

{

Readtail-> next = new_task;

Readtail = New_TASK;

}

/ / Wake all threads waiting for COND1 condition

pthread_cond_broadcast (& cond1);

Pthread_mutex_unlock (& ​​mutex);

ELSE IF (Events [i]. Events & Epollout)

{

RDATA = (struct user_data *) Events [i] .data.ptr;

SOCKFD = RDATA-> FD;

Write (sockfd, rdata-> line, rdata-> n_size);

Delete rdata;

// Set the file descriptor for read operations

ev.Data.fd = SOCKFD;

// Set read operating events for the injection

Ev.Events = EPOLLIN | EPOLLET;

/ / Modify the event to be processed on SOCKFD to ePolin

EPOLL_CTL (EPFD, EPOLL_CTL_MOD, SOCKFD, & EV);

}

}

}

}

Void * readtask (void * args)

{

INT fd = -1;

Unsigned int N;

// is used to pass the read data out

Struct User_Data * Data = NULL;

While (1) {

PTHREAD_MUTEX_LOCK (& MUTEX);

// Waiting to the task queue is not empty

While (readhead == null)

Pthread_cond_wait (& cond1, & mutex);

FD = readhead-> fd;

// Take a reading task from the task queue

Struct Task * TMP = Readhead;

Readhead = readhead-> next;

DELETE TMP;

Pthread_Mutex_Unlock (& ​​Mutex);

Data = new user_data ();

Data-> fd = fd;

IF ((N = Read (FD, Data-> line, maxline) <0) {

IF (errno == ECONNRESET) {

Close (FD);

Else

Std :: cout << "Readline Error" << std :: endl;

IF (Data! = NULL) delete data;

} else if (n == 0) {

Close (FD);

Printf ("Client Close Connect! / N);

IF (Data! = NULL) delete data;

} else {

Data-> n_size = n;

/ / Set the data that needs to be passed

Ev.data.ptr = data;

// Set a write operation event for the injection

ev. Events = Epollout | EPOLLET;

/ / Modify the event to be processed on SOCKFD for ePollout

EPOLL_CTL (EPFD, EPOLL_CTL_MOD, FD, & EV);

}

}

}

转载请注明原文地址:https://www.9cbs.com/read-83083.html

New Post(0)