# Written by bram cohen # see license.txt for license information
# File Name: Rewserver.py # Read Code Diary: 2004-9-7 # I: ZFive5 (Drone Hummer [Lonely Boy Weng, Diwu Hanjiang Snow]) # # 分析 分析 Rawserver class is mainly responsible for Socket Newsletter
#poll
[Timeout])
#Polls the set of registered file descriptors, and returns a Possibly-Empty List Containg
(FD,
#event) 2-Tuples for the Descriptors That Have Events Or Errors to Report.
Fd is the file #descriptor, and
Event is a bitmask with bits set for the reported events for That Descriptor -
#Pollin for waiting input,
Pollout to indeicate That the descriptor can be written to, and so #forth. An Empty List indeicates That the call timed out and no file descriptor had any events to
#report. if
Timeout Is Given, IT Specifies The Length of Time In Milliseconds Which The System Will
#Wait for Events Before Returning. IF
Timeout is omitted, negative, or
None, The Call Will Block
#until there is an es at the point.
from bisect import insort import socket from cStringIO import StringIO from traceback import print_exc from errno import EWOULDBLOCK, EINTR try: from select import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP timemult = 1000 except ImportError: from selectpoll import poll, error, POLLIN , Pollout, Pollerr, Pollhup Timemult = 1 from Threading Import Thread, Event from Time Import Time, Sleep Import Sys from Random IMPORT RANDRANGE
All = pollin | pollout
#Socket class, package the base Socket class, which is the foundation of communication Class SingLesocket: Def __init __ (self, raw_server, sock, handler): self.raw_server = Raw_server self.socket = Sock self.handler = handler self.buffer = [] Self.last_hit = time () self.fileno = Sock.Fileno () Self.connected = false # get the IP address DEF GET_IP (Self): try: return self.socket.getpeername () [0] Except Socket.Error: return 'no connection' # Close the socket connection, here is a bit like Window's CloseSocket () function def close (SELF): SOCK = SELF.SOCKET SELF.SOCKET = None self.buffer = [] Del Self.Raw_Server. Single_sockets [self.fileno] self.raw_server.poll.unregister (sock) sock.close () # This is not to say, it is shutdown () DEF Shutdown (Self, Val): Self. Socket.Shutdown (VAL) # Determined whether the list of buffers is empty DEF IS_FLUSHED (Self): return len (self.buffer) == 0
The # function is to write the data to the buffer list. When the list element is 1, send the data # to the connected to the other party DEF WRITE (Self, s): assert self.socket is not none self.buffer.Append (s) if Len (Self.Buffer) == 1: Self.Try_Write () # Send data to the other party DEF try_write (Self): if self.connected: try: while self.buffer! = []: Amount = Self.socket.send .buffer [0]) if Amount! = le (Self.Buffer [0]): if amount! = 0: self.buffer [0] = self.buffer [0] [Amount:] Break del Self.Buffer [0 ] Except socket.error, E: code, msg = e if code! = Ewouldblock: self.raw_server.dead_from_write.Append (Self) Return # buffer is empty, only register reading data event if self.buffer == [] : Self.raw_server.poll.register (Self. Socket, Pollin) Else: # To be registered with self.raw_server.poll. register (self.socket, all) # server class class RawServer: def __init __ (self, doneflag, timeout_check_interval, timeout, noisy = True, errorfunc = None): self.timeout_check_interval = timeout_check_interval self.timeout = timeout self.poll = poll ( ) # {socket: SingleSocket} self.single_sockets = {} self.dead_from_write = [] self.doneflag = doneflag self.noisy = noisy self.errorfunc = errorfunc self.funcs = [] self.externally_added = [] self.add_task ( SELF.SCAN_FOR_TIMEOUTS, TIMEOUT_CHECK_INTERVAL)
# Add a task to the task list, the last time is time () delay def add_task (self, func, delay): Insort (Self.funcs, (Time () DELAY, FUNC) # Add an additional task to Task list, last run time is time () delay def-external_add_task (self, func, delay = 0): self.externally_added.Append (func, delay))
# Check Server Timeout socket functions, turn off if the timeout handler def scan_for_timeouts (self): self.add_task (self.scan_for_timeouts, self.timeout_check_interval) t = time () - self.timeout tokill = [] for s in self.single_sockets .values (): if s.last_hit # Bundle a port, set a non-blocking method, server socket and event registration correspond to DEF BIND (Self, Port, Bind = ', Reuse = false): Server = socket.socket (socket.af_inet, socket.sock_stream) if Reuse: Server.setsockopt (socket.sol_socket, socket.so_reuseaddr, 1) server.setblocking (0) Server.bind (bind, port)) Server.Listen (5) Self.Poll.Register (Server, Pollin) Self.server = Server # Connect the other party, set the non-blocking method, Socket and event registration correspond to DEF Start_Connection (Self, DNS, Handler = none): if Handler is none: handler = self.handler sock = socket.socket (socket.af_inet, socket.sock_stream) Sock.setBlocking (0) Try: Sock.connect_ex (DNS) Except Socket.Error: Raise Except Exception, E: Raise Socket.error (STR (E)) Self.Poll.Register (Sock, Pollin) S = SingLesocket (Self , SOCK, HANDAL) SELF.SINGLE_SOCKETS [Sock.Fileno ()] = S Return S # Processing Server and All Data Requests Connected to Server, Events are Socket and Career Correspondence List Def Handle_events (Self, Events): for Sock, Event in events: if Sock == SELF.Server.Fileno (): # 错 关 关闭 i Event & (POLLHUP | Pollerr)! = 0: Self.poll.unregister (Self.Server) Self.Server.close () Self.Server.close () Self.Server.close () Self.Server.close () ('Lost Server Socket') ELSE: # Server AC CEPT TRY: NEWSOCK, ADDR = Self.Server.Accept () Newsock.setBlocking (0) NSS = SingLesocket (Self, Newsock, Self.Handler) Self.single_sockets [news.fileno ()] = NSS # Register read data event SELF .poll.register (newsock, pollin) self.handler.external_connection_made (nss) Except socket.error: #accept failed Sleep (1), then continue Sleep (1) Else: s = SELF.SINGLE_SOCKETS.GET (SOCK) IF S. None: Continue S.Connected = True # Error Close Socket IF (Event & (PollHup | Pollerr))! = 0: Self._Close_Socket (s) Continue # Read Data Processing Process IF (Event & Pollin)! = 0: try: s.last_hit = time () data = s. Socket.Recv (100000) if data == '': self._close_socket (s) else: # Call processing function s.handler .DATA_CAME_IN (S, DATA) Except socket.error, E: code, msg = e if code! = ewouldblock: self._close_socket (s) Continue # write data processing process if (Event & pollout)! = 0 And S. Socket Is Not None and NO T. Iis_flushed (): S.TRY_WRITE () if s.is_flushed (): s.handler.connection_flushed (s) # pops up additional task DEF POP_EXTERNAL (Self): while True: (a, b) = Self. EXTERNALLY_ADDED.POP () Self.add_task (a, b) Except IndexError: Pass # d l l 处理 处理 处理 处理 处理 处理 处理 处理 处理 处理 处理: 处理 处理 处理: 处理 处理::::::::: 就 就:: 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 就 执 执 执 执 执 执 就 就 就 就 就 就 就 就TRY: # Add an additional task Self.Pop_External () # task list is empty if len (self.funcs) == 0: period = 2 ** 30 else: # Task list is not empty period = self.funcs [0] [0] - TIME () interval is less than zero, interval is separated from zero if period <0: period = 0 # Set registration event timeout Events = self.poll.poll (Period * TimeMult) # If the completion flag is executed to exit if self.doneflag.isSset (): return # call task handler while len (self.funcs)> 0 and self.funcs [0] [0] <= time (): garbage, Func = self.funcs [0] del self.funcs [0] TRY: FUNC () Except keyboardinterrupt: print_exc () Return Except: if self.noisy: data = stringio () Print_exc (file = data) Self.Errorfunc (data) .GetValue ()) # 关 无 无 无 s et et _ 处理 s 请 请 求 数据 处理 处理 处理 处理,,,,,, # If the completion flag is executed, IF Self.doneFlag.isSset (): returno # Close the non-reflected socket self._close_dead () Except error: if self.doneflag.isset (): Return Except keyboardinterrupt: print_exc () Return Except: print_exc () Return Except: print_exc () Return EXCEPT : Data = stringio () Print_exc (file = data) Self.Errorfunc () Finally: for ss in self.single_sockets.values (): ss.close () Self.server.close () # Connection Def _close_dead (self): while len (self.dead_from_write> 0: OLD = Self.dead_from_write self.dead_from_write = [] for s in in ing: if s.socket is not none: self._close_socket (s) # close Socket connection def _close_socket (Self, s): Sock = S. Socket.Fileno () S . Socket.close () Self.poll.unregister (SOCK) DEL SELF.SINGLE_SOCKETS [SOCK] S. Socket = None S.Handler.Connection_Lost (s) # The following is the test case, omitted here, and then the Storage class stays next time. . . . .