[Wind] ACE Note (6) Asynchronous Socket Development under Proactor Mechanism

xiaoxiao2021-03-05  25

ACE Note (6) Asynchronous Socket Development under Proactor Mechanism

The Proactor mechanism and the Reactor mechanism are different. Under the Reactor mechanism, all I / O requests are synchronized, that is, after the signal request is received, the signal processing is executed immediately, and the signal request will be continued, and the signal request is received. The mechanism is passive and under the Proactor mechanism, the I / O request is asynchronous, that is, after the signal request is received, the signal processing is not executed immediately (but the processing is performed at a time), and then continue listening to the signal request, The mechanism for receiving signal requests is active 2. To meet the signal processing of the Proactor mechanism, you need to derive from the ACE_SERVICE_HANDLER, and the Reactor mechanism signal processing class is derived from ACE_EVENT_HANDLER

ACE_Service_Handler to common callback method definition: /// is called virtual void handle_read_stream (const ACE_Asynch_Read_Stream :: Result & result) when an asynchronous read to complete; /// in UDP SOCKET, when the asynchronous write completion will be called virtual void handle_write_dgram (const ACE_Asynch_Write_Dgram :: Result & result); /// in UDP SOCKET, when the asynchronous read to complete is called virtual void handle_read_dgram (const ACE_Asynch_Read_Dgram :: Result & result); /// when the asynchronous write completion will be called virtual void handle_write_stream (const ACE_Asynch_Write_Stream :: Result & result); /// when asynchronous read complete file will be called virtual void handle_read_file (const ACE_Asynch_Read_File :: Result & result); /// when the asynchronous write to complete the file will be called virtual void handle_write_file ( const ACE_Asynch_Write_File :: Result & result); /// when the asynchronous reception is completed is called virtual void handle_accept (const ACE_Asynch_Accept :: Result & result); /// when the asynchronous connection is completed is called virtual void handle_connect (const ACE_Asynch_Connect :: Result & result); /// asynchronous transfer files when complete will be called virtual void handle_transmit_file (const ACE_Asynch_Transmit_File :: Result & result); is called virtual void handle_time_out (const ACE_Time_Value & tv when /// timeout, Const void * ACT = 0); ACE_SERVICE_HANDLER class Open method User definition: Open (ACE_HANDLE HANDLE, ACE_MESSAGE_BLOCK & Message_block) When the client connection is triggered, this method Message_block parameter comes with a message block sent by the client connection. So In the OPEN method, you should pay attention to whether the message_block parameter comes with a message. If you come with it, if you don't want to change the existing event data unified processing mode, you need to simulate a read completion action, as follows: if (Message_Block.length ()! = 0) {// Copy message block (reference) ACE_MESSAGE_BLOCK & DUPLICATE = * message_block.duplicate ();

// read complete event disguised a target ACE_Asynch_Read_Stream_Result_Impl * fake_result = ACE_Proactor :: instance () -> create_asynch_read_stream_result (* this, this-> handle_, duplicate, initial_read_size, 0, ACE_INVALID_HANDLE, 0, 0); // not to move the write pointer location that is written, because the read to complete the action will automatically move the write pointer size_t bytes_transferred = message_block.length (); duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred); // event completion callback command issued fake_result-> complete (message_block .length (), 1, 0); // Delete the object DELETE FAKE_RESULT;

ACE_ASYNCH_READ_STREAM Class FAQ: Initialization Reading Operation Read method: Read operation, put the data on an ACE_MESSAGE_BLOCK data structure, which automatically moves write pointer (WR_PTR)

ACE_ASYNCH_WRITE_STREAM Class FAM: Initialization Write Write Method: Write an operation, write the specified Handle on the specified ACE_MESSAGE_BLOCK data structure, which automatically moves the read pointer (RD_PTR)

ACE_MESSAGE_BLOCK class FAQ: ACE_MESSAGE_BLOCK (length) RD_PTR (): Returns Wr_PTR (): Return Write Pointer Release (): Release Memory INIT (DATA, LEN): Assign Memory WR_PTR (LEN): Pointer Pointer forward Move the LEN Position WR_PTR (×): Pointing the Pointer to the current pointer Duplicate (): Copy the current message block ACE_ASYNCH_READ_STREAM :: Result class FAQ, the property is used to get the related completion information when the callback is complete, the class BYTES_TO_READ (): Want to read Number of bytes bytes_transferred (): How many bytes are received to be (): The action on that handle (): Operation Success Message_Block (): Returns an asynchronous I / O process under the message block ( The example is derived from the ACE comes with an example, a slightly changed), which is used to receive the client request asynchronously, and display the information requested on the console in the console #include "ace / os_main.h" #include "ACE / service_config.h "#include" ace / proactor.h "#include" ace / ask_IO.H "#include" ace / askYNCH_IO_IMPL.H "#include" ace / ask_ACCEPTOR.H "#include" ace / inet_addr.h "#include" ACE /Sock_connector.h"#include "ACE / SOCK_ACCEPTOR.H" #include "ace / sock_stream.h" #include "ace / message_block.h" #include "ace / get_opt.h" #include "ace / os_ns_sys_stat.h"

Static u_short port = ACE_DEFAULT_SERVER_PORT;

Static int done = 0;

Static int initial_read_size = bufsiz;

Class Receiver: public ace_service_handler {public: Receiver; ~ Receiver (Void); Virtual Void Open (ACE_HANDLE HANDLE, ACE_MESSAGE_BLOCK & Message_block);

Protected: Virtual Void Handle_read_Stream (const a_asynch_read_stream :: result); virtual void handle_write_file (const ace_asynch_write_file :: result& ";

Private: int initiate_read_stream (void); ace_asynch_read_stream rs_; ace_handle handle_; // handle for o o remote peer.};

Receiver :: Receiver (Void): dump_file_ (ace_invalid_handle), Handle_ (ACE_INVALID_HANDLE) {}

Receiver :: ~ Receiver (void) {}

videceiver :: Open (ACE_HANDLE HANDLE, ACE_MESSAGE_BLOCK & Message_block {ace_debug ((lm_debug, "% N:% l: receiver :: open called / n")); this-> handle_ = handle; // Open Socket Reading Flow IF (this-> rs_.open (* this, this-> Handle_) == -1) {ACE_ERROR ((LM_ERROR, "% P / N", "ACE_ASYNCH_READ_STREAM :: Open")); Return;}

IF (message_block.length ()! = 0) {// Copy message block (reference) ACE_MESSAGE_BLOCK & DUPLICATE = * message_block.duplicate ();

// read complete event disguised a target ACE_Asynch_Read_Stream_Result_Impl * fake_result = ACE_Proactor :: instance () -> create_asynch_read_stream_result (* this, this-> handle_, duplicate, initial_read_size, 0, ACE_INVALID_HANDLE, 0, 0); // not to move the write pointer location that is written, because the read to complete the action will automatically move the write pointer size_t bytes_transferred = message_block.length (); duplicate.wr_ptr (duplicate.wr_ptr () - bytes_transferred); // event completion callback command issued fake_result-> complete (message_block .length (), 1, 0);

/ / Delete the camouflaged object delete fake_result;} else // does not come with data, start a new read operation if (this-> Ite_read_stream () == -1) Return;}

INTRECEIVER :: Initiate_read_stream (void) {ace_message_block * MB = 0; ACE_NEW_RETURN (MB, ACE_MESSAGE_BLOCK (BUFSIZ 1), -1); // Start Readup IF (this-> rs_.read (* MB, MB-> size () - 1) == -1) ACE_ERROR_RETURN ((LM_ERROR, "% p / n", "ACE_Asynch_Read_Stream :: read"), -1); return 0;} voidReceiver :: handle_read_stream (const ACE_Asynch_Read_Stream :: Result & result) {// Start reading ACE_DEBUG ((LM_Debug, "Handle_Read_Stream Called / N")))); // Displaying the information Result.Message_Block () .rd_ptr () [Result.bytes_TransferRed ()] = '/ 0'; ACE_DEBUG ((LM_Debug, "********************* / n")); ACE_DEBUG ((LM_DEBUG, "% s =% D / N", "bytes_to_read", Result.bytes_to_read ())); ACE_DEBUG ((LM_Debug, "% s =% D / N", "Handle", Result.Handle ()); ACE_DEBUG ((LM_Debug, "% s =% D / N", "BYTES_TRANSFERED", Result.bytes_Transferred ()); ACE_DEBUG ((LM_Debug, "% s =% D / N", "ACT", (U_LONG) Result.act () ))); ACE_DEBUG ((LM_Debug, "% S =% D / N", "Success", Result.Success ())); ACE_DEBUG ((LM_DEBUG, "% s =% D / N", "completion_key", U_long) result.completion_key ())); ACE_DEBUG ((LM_DEBUG, "% S =% D / N", "error", result.error ())); ACE_DEBUG ((lm_debug, "% s =% s / n "," Message: ", Result.Message_block () .rd_ptr ()); ACE_DEBUG ((LM_Debug," ****************** / n "))))) ;

IF (Result.Success () && results ()! = 0) {result.Message_block () .release (); // If there is still not read data, continue reading if (this-> itiate_read_stream () == -1) return;} else {// does not exist, release the message block and close the socket connection ACE_DEBUG ((LM_Debug, "Receiver CompleteD / N)); Result.Message_Block () .release (); DONE = 0 ACE_OS :: CloseSocket (this-> Handle_);}}

INTACE_TMAIN (int Argc, ACE_TCHAR * ARGV []) {

ACE_ASYNCH_ACCEPTOR Acceptor; // Open Socket Port IF (Acceptor.open (ACE_INET_ADDR (Port), INITIAL_READ_SIZE, 1) == -1) Return -1;

INT Success = 1;

While (Success> 0 &&! done) // Processing and Distributing Events Success = ACE_Proactor :: Instance () -> Handle_Events ();

return 0;} // This helps when the compiler to parse the above template #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) template class ACE_Asynch_Acceptor ; # elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) #pragma instantiate ACE_Asynch_Acceptor #endif / * ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION * /

转载请注明原文地址:https://www.9cbs.com/read-36016.html

New Post(0)