The ace

xiaoxiao2021-03-06  50

Ru-BRD

6.3 The ace_task_class motivation the ace_MESSAGE_QUEUE CLASS Described in Section 6.2 Can Be Used To

Decouple the flow of information from its processing Link threads that execute producer / consumer services concurrently To use a producer / consumer concurrency model effectively in an object-oriented program, however, each thread should be associated with the message queue and any other service-related information. to preserve modularity and cohesion, and to reduce coupling, it's therefore best to encapsulate an ACE_Message_Queue with its associated data and methods into one class whose service threads can access it directly. Thread-spawning capabilities provided by popular OS platforms are based on each spawned thread invoking a C-style function call. The ACE_Thread_Manager wrapper facade class described in Chapter 9 of C NPv1 implements portable multithreading capabilities. However, programmers must still pass a C-style function to its spawn () and spawn_n () methods A spawned thread with access to a c Object Requires a bridge to the C Object Environment. The CLD_Handler: :. Open () method (page 172) illustrated this technique Since implementing this technique manually for each class is repetitive, it's a good candidate for reuse The ACE Task framework therefore defines ACE_Task to encapsulate a class's messaging capability and provide a portable way for. THREAD (S) TO EXECUTE IN The Context of an Object. Motivation describes the ACE_MESSAGE_QUEUE to: Release the information stream and its processing connection to the producer / consumer service thread executed in the object-oriented programming. The use of producers consumers concurrent models, each thread will associate with message queues and service related information. In order to retain modular and cohesive, reduce the coupling, so the ACE_MESSAGE_QUEUE and the data are encapsulated into a class, and the service thread can directly access the data. Class Capabilities Ace_Task Is The Basis of Ace's Object-Oriented Concurrency Framework. It provides the following capain

IT Uses An Instance of Ace_MESSAGE_QUEUE From Section 6.2 To Separate Data And Requests from Their Processing. Using ACE_MESSAGE_QUEUE to separate data and requests. It uses the ACE_Thread_Manager class to activate the task so it runs as an active object [POSA2] that processes its queued messages in one or more threads of control. Since each thread runs a designated class method, they can access all of the task's data members He uses ACE_THREAD_MANAGER to activate the task to run him as an active object, process one or more control threads. Because each thread runs in a specified class method, they can directly access the member data of the task.

It inherits from ACE_Service_Object, so its instances can be configured dynamically via the ACE Service Configurator framework from Chapter 5. he inherited from ACE_Service_Objcect, by ACE Service Configurator framework for dynamic configuration.

IT's a descendant of ace_event_handler, So ITS Instances CAN Also Serve As Event Handlers In The Ace Reactor Framework from Chapter 3. Inherits from ACE_EVENT_HANDLER, so it can be used as an event processor in the ACE Reactor framework.

IT Provides Virtual Hook Methods That Application Classes Can Reimpement for Task-Specific Service Execution and Message Handling. He offers multiple virtual hook methods, and application classes can re-implement task-specific service and message processing.

Our focus in this section is on the ACE_Task capabilities for queueing and processing messages. It obtains its event-handling, configuration, and dynamic linking / unlinking capabilities as a consequence of inheriting from ACE classes described in previous chapters. Our focus in this The section is the performance of the ACE_TASK Passage and Message Process. He or event handles, configuration, dynamic connection / release capabilities through the inheritance of some of the ACE classes described earlier. The interface for ACE_Task is shown in Figure 6.6. Since this class has a rich interface, we group the description of its methods into the three categories described below. Sidebar 60 (page 308) describes some additional ACE_Task methods that are related to the ACE Streams Framework.

Figure 6.6. Task Initialization Methods. Task Initialization Methods. The Methods Used to Initialize A Task Are Shown In The Following Table:

Method Description ACE_TASK () Constructor That CAN Assign Pointers to the ace_MAESSAGE_QUEUE and ACE_THREAD_MANAGER Used by the task constructor, can specify the ace_message_queue and ace_thread_manager. Open () Hook Method That Performs Application-Defined Initialization Activities hooks, performs initialization activities for application definitions. thr_mgr () Get and set a pointer to the task's ACE_Thread_Manager msg_queue () Get and set a pointer to the task's ACE_Message_Queue activate () Convert a task into an active object that runs in one or more threads to the task is converted to run on one or more Active object in a thread. Applications can customize the startup behavior of an ACE_Task by overriding its open () hook method. This method allocates resources used by a task, such as connection handlers, I / O handles, and synchronization locks. Since open () is generally called after the init () method that's inherited via ACE_Service_Object, any passed via options the ACE Service Configurator framework have already been processed. Therefore, open () can act on those configured preferences. The open () method is also often used to convert a task into an Active Object By Calling Ace_task :: ActiVate () (page 187). Application You can customize the starter behavior by redefining the Open hook method. The task allocates resources through this method, such as connecting processors, I / O handles, synchronous locks. Because the OPEN method is usually called after the init method, any options passed through the ACE_SERVICE Configurator framework have been processed. Therefore, you can work on those configured seals. The OPEN method converts the task to an active object by calling ACTIVATE. The thr_mgr () and msg_queue () methods make it possible to access and change the thread management and message queueing mechanisms used by a task. Alternative thread management and message queueing mechanisms can also be passed to the ACE_Task constructor when an instance of the class is Created. thr_mgr () and msg_queue () methods Access and change thread management and message queuing mechanisms. Thread management and message queuing mechanisms can also be incorporated in the ACE_TASK constructor.

The activate () method uses the ACE_Thread_Manager pointer returned by the thr_mgr () accessor to spawn one or more threads that run within the task. This method converts the task into an active object whose thread (s) direct its own execution and response to events , rather than being driven entirely by passive method calls that borrow the thread of the caller. Sidebar 42 (page 186) describes how to avoid memory leaks when an activated task's thread (s) exit. activate () method using ACE_Thread_Manager to generate one or Multiple threads are running in the task. This method conversion task is an active object, his thread runs and responds directly, not the passive mode driver borrowed the caller thread. . 2. Task communication, processing, and synchronization methods The following table describes the methods used to communicate between tasks and to process messages passively and actively within a task: the task communications, processing and synchronization method.

Method Description SVC () A hook Method That Can IMPLEMENT A TASK'S Service Processing. It is executed by all threads spawned via the Activate () Method. Hook method to implement task processing. He is performed by all threads through the activate () method. Put () A hook method That Can Be buy to pass a message to a task, where it can be processed immediately or queued for subsequent processing by the svc () hook method. A hook method can be used to pass messages to tasks, he It can be processed immediately or in addition to the process of subsequent SVC hook methods. putq () getq () ungetq () Insert, remove, and replace messages from the task's message queue. The putq (), getq (), and ungetq () methods simplify access to the enqueue_tail (), dequeue_head (), and enqueue_head () Methods of a task's message queue, respectively. Insert, delete, and replaced a message from the message queue. PUTQ (), getq (), ungetQ (), simple access enqueue_tail (), degec (), and enqueue_head () method. A subclass of ACE_Task can perform application-defined processing on messages passed to it by overriding its put () and svc () hook methods to implement the following two processing models: a ACE_Task subclass may execute processing application defined, when the message transfer When it is given to him, the PUT () and SVC () hook methods achieve the following two processing models: 1. Passive processing. The put () Method is used to pass message to an ace_task. Pointers to ace_message_blocks area passed Between Tasks to Avoid Data Copying overhead. Task processing can be performed entirely in the context of put (), where the caller's thread is borrowed for the duration of its processing. A task's svc () hook method need not be used if the task only processes requests passively in put ( Voltage processing. The PUT () method is used to deliver a message to the ACE_TASK. ACE_MESSAGE_BLOCKS Pointers Pass between tasks to avoid copying overhead. Task processing can be executed in the PUT () context, and the caller thread will be processed. A task's SVC () hook does not need to use if the task processes the passive request in the PUT ().

Sidebar 42: Avoiding Memory Leaks When Threads Exit Calls to the ACE_Task :: activate () or the ACE_Thread_Manager's spawn () and spawn_n () methods can include either of the following flags: THR_DETACHED, which designates the spawned thread (s) as detached so that when the thread exits, the ACE_Thread_Manager ensures the storage used for the thread's state and exit status is reclaimed. THR_JOINABLE, which designates the spawned thread (s) as joinable so that ACE_Thread_Manager ensures the identity and exit status of an exiting thread is retained until another thread reaps its exit status. The terms detached and joinable stem from POSIX Pthreads [IEE96]. By default, ACE_Thread_Manager (and hence the ACE_Task class that uses it) spawns threads with the THR_JOINABLE flag. To avoid leaking resources that the OS holds for JoinAble Threads, An Application Must Call One of The Following Methods:

ACE_Task :: wait (), which waits for all threads to exit an ACE_Task object ACE_Thread_Manager :: wait_task (), which waits for all threads to exit in a specified ACE_Task object ACE_Thread_Manager :: join (), which waits for a designated thread to exit. If none of these methods are called, ACE and the OS will not reclaim the thread stack and exit status of a joinable thread, and the program will leak memory. If it's inconvenient to wait for threads explicitly in your program, you can simply pass THR_DETACHED when spawning threads or activating tasks. Many networked application tasks and long-running daemon threads can be simplified by using detached threads. However, an application can not wait for a detached thread to finish with ACE_Task :: wait () or obtain its exit status via join (). Applications can, however, use ACE_Thread_Manager :: wait () to wait for both joinable and detached threads managed by an ACE_Thread_Manager to finish. 2. Active processing. A task's application-defined processing can a lso be performed actively. In this case, one or more threads execute the task's svc () hook method to process messages concurrently with respect to other activities in an application. If a task's put () method does not perform all the processing on a Message, IT CAN Use Putq () TO Enqueue The Message and Return To ITS Caller Immediately. Active Processing. A task application definition can be active. In this case, one or more threads perform the SVC () hook of the task to concurrently. If the PUT () method cannot perform all processing on a message. He can use PUTQ () to enter the team immediately returns the caller.

A task's svc () hook method can use ACE_Task :: getq () to dequeue messages placed onto the message queue and process them concurrently. The getq () method blocks until either a message is available on the message queue or the specified absolute timeout elapses The blocking nature of getq () Allows the thread (s) of a task to block and only wake up when's work, a task SVC () hook can be used with ace_task :: getq () Messages and concurrently handle them. The getq () method blocks until there is a message available or the timeout that is available or established on the queue. Getq () allows the thread to block the task, and it is only waken only in the message queue. Unlike put (), the svc () method is never invoked by a client of a task directly. It's instead invoked by one or more threads when a task becomes an active object after its activate () method is called. The activate () method Uses the ace_thread_manager associated with anape_task to spawn one or more threads, as shown Below: Different from the PUT (), the SVC () method will never be called directly by the task. He is called by one or more threads when a task is turned to active object. Activate method uses ACE_THREAD_MANAGER to generate one or more threads: Template INTACE_TASK :: Activate (Long Flags,

INT N_THREADS,

/ * Other params omitted * /) {

// ...

THR_MGR () -> spawn_n (n_threads,

& Ace_task :: svc_run,

ACE_STATIC_CAST (Void *, this),

Flags, / * Other params omitted * /;

// ...

} The THR_SCOPE_SYSTEM, THR_SCOPE_PROCESS, THR_NEW_LWP, THR_DETACHED, and THR_JOINABLE flags in the table on page 190 of C NPv1 can be passed in the flags parameter to activate (). Sidebar 42 describes how THR_DETACHED and THR_JOINABLE can be used to avoid memory leaks when threads exit. ACE_Task :: svc_run () is a static method used by activate () as an adapter function. It runs in the newly spawned thread (s) of control, which provide an execution context for the svc () hook method. Figure 6.7 illustrates the steps associated with activating an ACE_Task using the Windows _beginthreadex () function to spawn the thread. Naturally, the ACE_Task class shields applications from OS-specific details. ACE_Task :: svc_run () is a static method, activate () Use him as an adapter function. He runs in the newly derived control thread, provides the SVC () method to perform the context. Figure 6.7 Explains the use of Windows _BeginThreadex () to activate threads to activate each of the steps of ACE_TASK. The ACE_TASK class will apply to the OS unique detail shield. Figure 6.7. Activating An Ace Task

When an ACE_Task subclass executes as an active object, its svc () method runs an event loop that uses its getq () method to wait for messages to arrive on the task's message queue. This queue can buffer a sequence of data messages and control messages for subsequent processing by a task's svc () method. As messages arrive and are enqueued by a task's put () method, its svc () method runs in separate thread (s), dequeueing the messages and performing application-defined processing concurrently, as Shown in Figure 6.8. When the ACE_TASK subclass is executed as an active object, his SVC method runs an event cycle using his getq () method to wait for the arrival. This queue can cache message tree and control messages processed by subsequent SVC () methods. The message arrives at the task PUT () method is placed in the queue, and his SVC () method is running in the thread, the queue message is sent, and the processing of the application definition is issued.

Figure 6.8. Passing Messages Between Ace_Task Objects

.. Sidebar 43 compares and contrasts the ACE_Task capabilities with Java's Runnable interface and Thread class 3. Task destruction The methods used in the destruction of all or parts of a task are shown in the following table: destructor.

Method Description ACE_Task () Deletes resources allocated in ACE_Task constructor, which includes the message queue, if it was not passed as a parameter to the constructor. To release the resources allocated in the constructor, includes a message queue, if he is not passed as a parameter to Constructor. Close () Hook Method That Performs Application-Defined Shutdown Activities. This Method Should Generally Not Be Called Directly By Applications, Particularly If The Task is an Active Object. Hook Method Performs a shutdown activity for application definitions. () Any on the task blocking flush Closes the message queue associated with the task, which frees all of its enqueued message blocks and releases any threads blocked on the queue. Close tasks related to message queues, to release all of his incoming team message block and release the rout. THR_Count () returns the number of threads currently active in the ace_task. Returns the number of threads of the current activity. Wait () A Barrier Synchronizer That Waits for All Joinse Threads Running In this task to exit before returbuning. A fence is synchronized to wait for all JoinAble threads to return. The lifetime of an ACE_Task object is not tied to the lifetime of any threads activated in that object. Since deleting an ACE_Task object does not shut down any active threads, the threads must therefore exit before the task object can be deleted. ACE provides various ways to request a task's threads to exit, including the cooperative cancellation mechanism (pages 190? 91 of C NPv1). A task's message queue can also be used to pass a shutdown message to the task's threads, as described in Sidebar 41 ( Page 167). The life cycle of the active thread in the ACE_TASK object and the object is not bundled. Because the delete ACE_TASK object does not end any activity thread, the thread must end before the task object is deleted. ACE_ offers a variety of methods to request the thread of the task, including cooperative cancellation mechanisms. Task Message Queuing can be used to pass end messages to threads.

Applications can customize an ACE_Task's destruction by overriding its close () hook method. This method can free application-defined resources allocated by a task, such as connection control blocks, I / O handles, and synchronization locks. Whereas the open () hook method should be called at most once per instance to initialize an object before it's activated, both the svc () and close () hook methods are called once in each thread. The svc () hook should therefore perform any needed per-thread initialization. The ACE Task framework will call the close () hook method in each thread after the svc () hook method returns, so avoid calling a task's close () hook directly, particularly if a task is an active object. This asymmetry between the open () And Close () Hook Methods Is Necessary Because The There's No Reliable Opportunity To Clean Up A Task's Resources Except in The Task's Threads. Application You can customize the quarter of ACE_TASK to redefine his close () hook. This method can release the resources allocated by the application defined task, such as the Connection Control Blocks, I / O handle, and synchronous locks. The OPEN () method can be called once when each instance is initialized. The SVC () hook should perform an initialization required for any thread. The ACE Task framework calls the close () hook method in any thread after the SVC () hook returns. Avoid direct calling the Task's Close () hook method, especially one task is an active object. This asymmetric Open () and Close () hook methods are necessary because there is no reliable opportunity to clear the task resource in addition to the task thread. Sidebar 43: ACE_TASK VS. Java Runnable and thread if You'Ve Used Java's Runnable Interface and thread class [lea), The ace_task design shop: Ace_Task Design Below:

ACE_Task :: activate () is similar to the Java Thread.start () method since they both spawn internal threads. The Java Thread.start () method spawns only one thread, whereas activate () can spawn multiple threads within the same ACE_Task, making it easy to implement thread pools as shown in the Example part of this section. ACE_Task :: svc () is similar to the Java Runnable.run () method since both methods are hooks that run in newly spawned thread (s) of control . The Java run () hook method executes in only a single thread per object, whereas the ACE_Task :: svc () method can execute in multiple threads per task object. ACE_Task contains a message queue that allows applications to exchange and buffer messages. in contrast, this type of queueing capability must be added by Java developers explicitly. If a task activates multiple threads, the close () method must not free resources (or delete the task object itself) if other threads are still executing. The thr_count () Method Returns the Number of THR eads still active in the task. ACE_Task decrements the thread count before calling close (), soifthr_count () returns a value greater than 0, the object is still active. The wait () method can be used to block until all threads running in this Task EXIT, AT Which Point THR_Count () Equals 0. Sidebar 44 (Page 190) Destribes The Steps To Follow When Destroying An ACE_TASK. A task activates multiple threads, and the close () method cannot release resources (or delete task objects) if Other threads If other threads are still being executed. THR_COUNT () method returns the number of active threads. ACE_TASK Reduce the number of threads before calling Close (), so if thr_count () returns a value greater than 0, the object is still alive. Wait () method can be used to block until the thread in all tasks is end, THR_COUNT () is equal to 0.

Example This example shows how to combine ACE_Task and ACE_Message_Queue with the ACE_Reactor from Chapter 3 and ACE_Service_Config from Chapter 5 to implement a concurrent logging server. This server design is based on the Half-Sync / Half-Async pattern [POSA2] and the eager spawning thread pool strategy described in Chapter 5 of C NPv1. Figure 6.9 (page 191) shows how a pool of worker threads is prespawned when the logging server is launched. Log records can be processed concurrently until the number of simultaneous client requests exceeds the number of worker threads. At this point, the main thread buffers additional requests in a synchronized ACE_Message_Queue until a worker thread becomes available or until the queue becomes full.Figure 6.9. Architecture of the Thread Pool Logging Server

Sidebar 44: Destroying an ACE_Task Special care must be taken when destroying an ACE_Task that runs as an active object Before destroying an active object, ensure that the thread (s) running its svc () hook method have exited Sidebar 41 (page 167.. ) describes several techniques to shut down svc () hook methods that are blocked on a task's message queue. If a task's life cycle is managed externally, whether dynamically allocated or instantiated on the stack, one way to ensure a proper destruction sequence looks like this : My_task * task = new task; // allocate a new task Dynamical.

Task-> Open (); // Initialize the task.

Task-> Activate (); // Run Task as an Active Object.

// ... do Work ...

// deactive the message queue so the svc () Method Unblocks

// and the thread exits.

Task-> msg_queue () -> deactivate ();

Task-> Wait (); // Wait for the thread to exit.

delete task;... // Reclaim the task memory This technique relies on the task to exit all of its threads when the task's message queue is deactivated This design introduces behavioral coupling, however, between the Task class and its users Users depend on particular behavior when the message queue is deactivated, so any change to this behavior would cause undesired ripple effects throughout all systems that use the Task class. If a task is allocated dynamically, it may therefore be better to have the task's close () hook delete itself when the last thread exits the task, rather than calling delete on a pointer to the task directly. you may still want to wait () on the threads to exit the task, however, particularly if you're preparing to shut down the process. On some OS platforms, when the main thread returns from main (), the entire process will be shut down immediately, whether there were other threads active or not. The ACE_Message_Queue plays several roles in our thread pool loggin G Server's Half-Sync / Half-Async Concurrency Design:

It decouples the main reactor thread from the thread pool. This design allows multiple worker threads to be active simultaneously. It also offloads the responsibility for maintaining queues of log record data from kernel space to user space, which has more virtual memory to queue log records than the kernel. It helps to enforce flow control between clients and the server. When the number of bytes in the message queue reaches its high watermark, its flow control protocol blocks the main thread. As the underlying TCP socket buffers fill up, the flow control propagates back to the server's clients. This prevents clients from establishing new connections or sending log records until the worker threads have a chance to catch up and unblock the main thread. Prespawning and queueing help to amortize the cost of thread creation, as well as Constrain The Use of Os Resources, Which Can Significantly Improve Server Scalability. The Following Table Outlines The Classes That We'll Cover In The THRE Ad Pool Logging Server Example Below:

Class Description TP_Logging_Task Runs as an active object, with a pool of threads that process and store log records inserted into its synchronized message queue TP_Logging_Handler Target of upcalls from the ACE_Reactor receives log records from clients and inserts them into the TP_Logging_Task's message queue TP_Logging_Acceptor A factory that that accepts connections and creates TP_Logging_Handler objects to process client requests TP_Logging_Server A facade class that integrates the other three classes together The relationship between these classes is shown in Figure 6.10 (page 192). The TP_Logging_Acceptor and TP_Logging_Handler classes play the reactive role in the half- Sync / Half-Async pattern and the TP_Logging_Task :: svc () method, which runs concurrently in the worker threads, plays the synchronous role in the pattern.Figure 6.10. The Thread Pool Logging Server Classes

Each class in Figure 6.10 is described Below. We start by incruding the next: #include "ace / os.h"

#include "ace / auto_ptr.h"

#include "ace / singleton.h"

#include "ace / synch.h"

#include "ace / task.h"

#include "logging_acceptor.h"

#include "logging_event_handler.h"

#include "Reactor_Logging_Server.h"

#include "tpls_export.h" TP logging task. this class provides the folloading capain

. It derives from ACE_Task, which it instantiates to provide a synchronized ACE_Message_Queue It spawns a pool of worker threads that all run the same svc () method to process and store log records inserted into its synchronized message queue The TP_Logging_Task is shown below:. Class TP_LOGGING_TASK: PUBLIC ACE_TASK {// Instantiated with an Mt Synchronization Trait.

PUBLIC:

ENUM {Max_threads = 4};

// ... Methods Defined Below ...

}; The tp_logging_task :: open () hook maeth Calls ace_task :: activate () to convers Task Into an Active Object, As Follows: Virtual Int Open (Void * = 0)

{Return activate (THR_NEW_LWP, MAX_THREADS);} If activate () returns successfully, the TP_Logging_Task :: svc () method will be running in MAX_THREADS separate threads We show the TP_Logging_Task :: svc () method (page 199) after we describe. the TP_Logging_Acceptor and TP_Logging_Handler classes. The TP_Logging_Task :: put () method inserts a message block containing a log record into the queue.virtual int put (ACE_Message_Block * mblk,

ACE_TIME_VALUE * TIMEOUT = 0)

{Return putq (mblk, timeout);} We need only one instance of TP_Logging_Task, so we convert it into a singleton using the singleton adapter template described in Sidebar 45 (page 194) Since the TP_Logging_Task will be located in a DLL, however. , we must use the ACE_Unmanaged_Singleton rather than ACE_Singleton. This design requires that we close the singleton explicitly when the logging task shuts down in TP_Logging_Server :: fini () (page 201) .typedef ACE_Unmanaged_Singleton

TP_LOGGING_TASK; Since TP_LOGGING_TASK :: instance () is only accessed from the main thread, we use ACE_Null_Mutex as the synchronization type parameter for ACE_Unmanaged_Singleton If this singleton were accessed concurrently by other threads we'd need to parameterize it with ACE_Recursive_Thread_Mutex (Chapter 10 of. .. C NPv1) to serialize access TP_Logging_Acceptor This class is a factory that provides the following capabilities: It accepts connections from client logging daemons It creates TP_Logging_Handlers that receive log records from connected clients The TP_Logging_Acceptor class is shown below..:

Sidebar 45: The ACE_Singleton Template Adapters Although it's possible to code the TP_Logging_Task explicitly to be a singleton, this approach is tedious and error-prone ACE therefore defines the following ACE_Singleton template adapter that applications can use to manage singleton life cycles:. Template

Class ace_singleton: public ace_cleanup {

PUBLIC:

Static Type * Instance (void) {

ACE_SIINGLETON * & S = Singleton_;

IF (s == 0) {

LOCK * LOCK = 0;

ACE_GUARD_RETURN (LOCK, Guard,

ACE_OBJECT_MANAGER :: get_singleton_lock (lock), 0);

IF (s == 0) {

ACE_NEW_RETURN (S, (ACE_SIINGLETON ), 0);

ACE_OBJECT_MANAGER :: at_exit (s);

}

}

Return & S-> Instance_;

}

protected:

ACE_SINGLETON (VOID); // Default Constructionor.

TYPE Instance_; // Contained Instance.

// Single Instance of the Adapter.

Static ace_singleton * singleton_;

};.. The ACE_Singleton :: instance () static method uses the Double-Checked Locking Optimization pattern [POSA2] to construct and access an instance of the type-specific ACE_Singleton It then registers the instance with the ACE_Object_Manager for cleanup at program termination As described in Sidebar 23 of C NPv1 (page 218), the ACE_ObjectManager assumes responsibility for destroying the ACE_Singleton instance, as well as the adapted TYPE instance. A program can crash during singleton cleanup if the object code implementing TYPE is unlinked before the ACE_Object_Manager cleans up singletons, which is often the case for singletons located in dynamically linked services. We therefore recommend using ACE_Unmanaged_Singleton when defining singletons in DLLs that will be linked and unlinked dynamically. This class offers the same double-checked locking optimization to create the singleton. To Destroy The Singleton, However, Requires An Explicit Call To Ace_unmanaged_singleton :: Close (). A DY Namic Service's Fini () Method Is A Good Place To Call This Close () Method, As Shown In The TP_Logging_Server :: Fini () Method (Page 201). Class TP_Logging_acceptor: public logging_acceptor {public:

TP_LOGGING_ACCEPTOR (ace_reactor * r = ace_reactor :: instance ())

: Logging_acceptor (r) {}

Virtual Int Handle_input (ace_handle) {

TP_LOGGING_HANDAL * peer_handler = 0;

ACE_NEW_RETURN (peer_handler,

TP_LOGGING_HANDLER (Reactor ()), -1);

IF (Acceptor_.Accept (peer_handler-> peer ()) == -1) {

Delete peer_handler;

Return -1;

} else if (peer_handler-> open () == -1)

Peer_Handler-> Handle_Close (ACE_INVALID_HANDLE, 0);

Return 0;

}

}; Since TP_Logging_Acceptor inherits from the Logging_Acceptor (page 54) it can override handle_input () to create instances of TP_Logging_Handler TP_Logging_Handler This class provides the following capabilities: It receives log records from a connected client It enqueues the log records into the TP_LOGGING_TASK... Singleton's synchronized message queue. The tp_logging_handler class is shown Below: Class TP_Logging_Handler: Public Logging_Event_Handler {

friend class TP_Logging_Acceptor; Since this class derives from Logging_Event_Handler (page 59 in Section 3.3), it can receive log records when dispatched by a reactor The destructor is defined as protected to ensure dynamic allocation However, since the TP_Logging_Acceptor :: handle_input ().. method deletes objects of this type, TP_Logging_Acceptor must be a friend of this class. Declaring a friend class is appropriate in this case because these classes exhibit a high degree of cohesion, and it's important to maintain the restriction on dynamic allocation of TP_Logging_Handler. The three Data Members Are Used TO Implement The Protocol for Closing TP_Logging_Handler Objects Concurrently, AS Described in Sidebar 46 (Page 196) .protacected:

Virtual tp_logging_handler () {} // no-op destructor.

// Number of Pointers to this class instance That Currently

// Reside in the Singleton's Message Queue.

INT Queued_count_;

// indeicates WHETER

// must be caled to cleanup and delete this object.

INT deferred_close_;

// Serialize Access to and .

ACE_THREAD_MUTEX LOCK_;

Sidebar 46: Closing TP_Logging_Handlers Concurrently A challenge with thread pool servers is closing objects that can be accessed concurrently by multiple threads In our thread pool logging server, TP_Logging_Handler pointers are used by TP_LOGGING_TASK threads These service threads are separate from the thread running the reactor.. event loop that's driving callbacks to TP_Logging_Handler. We must therefore ensure that a TP_Logging_Handler object is not destroyed while there are still pointers to it in use by TP_LOGGING_TASK. When a logging client closes a connection, TP_Logging_Handler :: handle_input () (page 197) returns -1. The reactor then calls the handler's handle_close () method, which ordinarily cleans up resources and deletes the handler. Unfortunately, that would wreak havoc if one or more pointers to that handler were still enqueued or being used by threads in the TP_LOGGING_TASK Pool. we therefore use a reason the alling protocol to ensure the handler isn't destroyed while a pointer to it is still in use The UML activity diagram below illustrates the behavior this protocol enforces:.. The protocol counts how many times a handler resides in the TP_LOGGING_TASK singleton's message queue If the count is greater than 0 when the logging client socket is closed , TP_Logging_Handler :: handle_close () can not yet destroy the handler. Later, as the TP_LOGGING_TASK processes each log record, the handler's reference count is decremented. When the count reaches 0, the handler can finish processing the close request that was deferred earlier ................................ ..

TP_LOGGING_HANDLER (ACE_RAACTOR * Reactor): Logging_Event_Handler (Reactor),

Queued_count_ (0),

DeferRed_close_ (0) {}

// Called WHEN INPUTS OCCUR, E.G., Connection or Data.

Virtual Int Handle_input (ace_handle);

// Called When this Object IS Destroyed, E.G., When IT'S

// Removed from a reactor.

Virtual Int Handle_close (ACE_HANDLE, ACE_RAACTOR_MASK);

}

ACE_FACTORY_DECLARE (TPLS, TP_Logging_Handler) TP_Logging_Handler :: handle_input () plays the reactive role in the HalfSync / Half-Async pattern. It differs from the Logging_Event_Handler :: handle_input () method (page 59) since it does not process a log record immediately after receiving it. Instead, it inserts each log record into the TP_LOGGING_TASK singleton's message queue, where it will be processed concurrently. However, while processing the log record, the TP_LOGGING_TASK needs to access this handler's log file (inherited from Logging_Event_Handler). to facilitate this, handle_input () combines the log record with a message block containing the handler's pointer and inserts the resulting composite message at the end of the TP_LOGGING_TASK singleton's message queue, as shown below: 1 int TP_Logging_Handler :: handle_input (ACE_HANDLE) {

2 ACE_MESSAGE_BLOCK * MBLK = 0;

3 IF (logging_handler_.recv_log_record (mblk)! = -1) {

4 ACE_MESSAGE_BLOCK * log_blk = 0;

5 ACE_NEW_RETURN

6 (log_blk, ACE_MESSAGE_BLOCK

7 (ACE_REINTERPRET_CAST (CHAR *, THIS)), -1);

8 log_blk-> cont (MBLK);

9 ACE_GUARD_RETURN (ACE_THREAD_MUTEX, Guard, Lock_, -1);

10 IF (TP_Logging_Task :: Instance () -> Put (log_blk) == -1)

11 {log_blk-> release (); return -1;}

12 queued_count_; 13 return 0;

Else return -1;

? 15} Lines 2 Read a log record from a connected socket into a dynamically allocated ACE_Message_Block Lines 4 Create an ACE_Message_Block called log_blk that contains a pointer to this handler The ACE_Message_Block constructor simply "borrows" the this pointer and sets the ACE_Message_Block.?.: : DONT_DELETE flag internally to ensure that the handler itself is not destroyed when the message block is released in TP_Logging_Task :: svc () (page 199) The mblk is attached to log_blk's continuation chain to form a composite message Lines 9 0..? Use an ACE_Guard to acquire the lock_ that serializes access to queued_count_. to avoid a race condition in which a service thread processes the record before queued_count_ can be incremented, the lock is acquired before calling put () to insert the composite message block into the TP_LOGGING_TASK Singleton's Message Queue. LINES 11? 3 Release the log_blk resources and return -1 if put () fails. if it succeeds, increment the country of the number of time handler's in the TP_LOGGING_TASK's queue. In either case, the return statement causes the guard to release lock_. Line 14 If the client closes the connection, or a serious error occurs, handle_input () returns -1. This value causes the reactor to call TP_Logging_Handler :: handle_close (), which implements a key portion of the protocol for closing TP_Logging_Handlers concurrently, as described in Sidebar 46 (page 196) The handle_close () method is shown below:. 1 int TP_Logging_Handler :: handle_close (ACE_HANDLE handle,

2 ace_reactor_mask {

3 INT CLOSE_NOW = 0;

4 IF (Handle! = ACE_INVALID_HANDLE) {5 ACE_GUARD_RETURN (ACE_THREAD_MUTEX, Guard, Lock_, -1);

6 IF (Queued_count_ == 0) Close_Now = 1;

7 else deferred_close_ = 1;

Else {

9 ACE_GUARD_RETURN (ACE_THREAD_MUTEX, Guard, Lock_, -1);

10 queued_count _--;

11 IF (Queued_Count_ == 0) Close_Now = Deferred_Close_;

12}

13

14 IF (close_now) Return Logging_Event_Handler :: Handle_close ();

15 return 0;

16} Line 3 The close_now variable records whether the Logging_Event_Handler :: handle_close () method should be called on line 14. This decision depends on the reference count decisions made in the rest of this method. Lines 4? This code runs when handle_close () is called when handle_input () returns -1. Lines 6? are performed within a critical section protected by an ACE_Guard that acquires and releases the lock_ automatically within the scope of the if clause. If queue_count_ equals 0, there are no references to this object remaining in the TP_LOGGING_TASK, so we set the close_now local variable to 1. Otherwise, we set the deferred_close_ data member to note that as soon as the reference count reaches 0 this handler should be destroyed since the client has already closed its socket endpoint. As Log Records Are Processed, The TP_Logging_Task :: SVC () Method () Calls Handle_close () Calls Handle_close () Again, AND WILL EXECUTE The LINES DESCRIBED BELOW. LINES 8? 2 The Handle Equals ACE_INVALID_HENDLE WHE n handle_close () is called in TP_Logging_Task :: svc () (page 199) after the handler has been removed from the task's message queue. We therefore decrement the queue_count_. If the count is now 0, we record whether we need to finish processing a close request that was deferred earlier. As in lines 5 ?, we use an ACE_Guard to acquire and release the lock_ automatically within the scope of the else clause. Line 14 Call Logging_Event_Handler :: handle_close () (page 60) if the local variable Close_now is true to close the socket and log file, and then delete itself. Now That We '

ve shown the TP_Logging_Handler class, we can show the TP_Logging_Task :: svc () method, which runs concurrently in each of the worker threads and implements the synchronous role in the Half-Sync / Half-Asynch pattern. This method runs its own event loop that blocks on the synchronized message queue. After a message is enqueued by TP_Logging_Handler :: handle_input () (page 197), it will be dequeued by an available worker thread and written to the appropriate log file corresponding to the client, as shown below. 1 int TP_logging_task :: SVC () {2 for (ACE_MESSAGE_BLOCK * log_blk; getq (log_blk)! = -1;) {

3 tp_logging_handler * tp_handler = ace_reinterpret_cast

4 (tp_logging_handler *, log_blk-> rd_ptr ());

5 logging_handler logging_handler (tp_handler-> log_file ());

6 logging_handler.write_log_record (log_blk-> up ());

7 log_blk-> release ();

8 TP_HANDLER-> HANDLE_CLOSE (ACE_INVALID_HANDLE, 0);

9 }

10 returnograph;

? 11} Lines 2 Call the getq () method, which blocks until a message block is available As shown in Figure 6.11, each message block is a composite message containing three message blocks chained together via their continuation pointers in the following order.:

Figure 6.11. Message Block Chain of Log Record Information

A pointer to the TP_Logging_Handler that contains the log file where the log record will be written The hostname of the connected client The marshaled log record contents Lines 5? Initialize logging_handler with the log_file and then call Logging_Handler :: write_log_record (), which writes the log record to the log file. The write_log_record () method is responsible for releasing its message block chain, as shown in Chapter 4 of C NPv1. Lines 7? Call log_blk-> release () to reclaim allocated resources. Since the TP_Logging_Handler pointer is borrowed rather than allocated dynamically, however, we must explicitly call TP_Logging_Handler :: handle_close () (page 198) on tp_handler. This method decreases the TP_Logging_Handler reference count and cleans up the object properly, using the protocol described in Sidebar 46 (page 196 ). TP_LOGGING_SERVER. THIS FACADE CLASS INHERITS from ACE_SERVICE_OBJECT, Contains a Reactor_logging_server, and uses the tp_logging_task singleton, as shown Below.class TP_Logging_Server: Public ace_service_object {protected: 0000-00-00: PUBLIC ACE_SERVICE:

// Contains the Reactor, Acceptor, And Handlers.

TYPEDEF Reactor_logging_server

Logging_dispatcher;

Logging_dispatcher * logging_dispatcher_;

PUBLIC:

TP_Logging_server (): Logging_dispatcher_ (0) {}

// other methods defined Below ...

}; The tp_logging_server :: init () hook method enhances the Reactive Logging Server Implementation from Chapter 3 As Follows: Virtual Int Init (int Argc, ACE_TCHAR * Argv []) {

INT I;

Char ** array = 0;

ACE_NEW_RETURN (Array, Char * [Argc], -1);

ACE_AUTO_ARRAY_PTR char_argv (array);

For (i = 0; i

CHAR_ARGV [I] = ace :: strnew (ACE_TEXT_ALWAYS_CHAR (Argv [i]));

ACE_NEW_NORETURN (logging_dispatcher_,

TP_LOGGING_SERVER :: Logging_Dispatcher

(i, char_argv.get (), ace_reactor :: instance ()))));

For (i = 0; i

IF (logging_dispatcher_ == 0) return -1;

Else Return TP_Logging_Task :: Instance () -> open ();

} This init () method is similar to the one on page 122. It allocates an instance of TP_Logging_Server :: LOGGING_DISPATCHER and stores its pointer in the logging_dispatcher_ member. It also calls TP_Logging_Task :: open () to prespawn a pool of worker threads that Process log records concurrently. The tp_logging_server :: Fini () Method is shown next: 1 Virtual int fini () {

2 tp_logging_task :: instance () -> flush ();

3 tp_logging_task :: instance () -> wait ();

4 tp_logging_task :: close ();

5 delete logging_dispatcher_;

6 returnograph;

7} Line 2 Call the flush () method of the TP_LOGGING_TASK singleton, thereby closing the message queue associated with the task, which deletes all the queued messages and signals the threads in the pool to exit. Lines 3? Use the ACE_Thread_Manager's barrier synchronization feature to wait for the pool of threads spawned by TP_Logging_Task :: open () to exit and then explicitly close the singleton because this DLL is about to be unlinked. Lines 5? Delete the logging_dispatcher_ allocated in init () and return. for brevity, we omit the suspend (), resume (), and info () hook methods, which are similar to those shown in earlier examples. Finally, we place ACE_FACTORY_DEFINE into TP_Logging_Server.cpp.ACE_FACTORY_DEFINE (TPLS, TP_Logging_Server) This macro automatically defines the _make_TP_Logging_Server ( ) factory function that's used in the following svc.conf file: dynamic TP_Logging_Server Service_Object * TPLS: _make_TP_Logging_Server () "$ TP_LOGGING_SERVER_PORT" This file directs the ACE Serv Ice Configurator Framework to Configure The Thread Pool Logging Server Via the Following Steps:

Dynamically link the TPLS DLL into the address space of the process. Use the ACE_DLL class to extract the _make_TP_Logging_Server () factory function from the TPLS DLL symbol table. This function is called to obtain a pointer to a dynamically allocated TP_Logging_Server. The Service Configurator framework calls the TP_Logging_Server :: init () hook method through this pointer, passing the value of the TP_LOGGING_SERVER_PORT environment variable as its single argument. This string designates the port number where the logging server listens for client connection requests. If init () succeeds, the TP_Logging_Server pointer is stored in the ACE_Service_Repository under the name "TP_Logging_Server" Once again, the ACE Service Configurator framework enables us to reuse the main () program from Configurable_Logging_Server.cpp (page 147) .Ru-Brd