Eine TCP sockets library für (aktuell) Haskell und Python
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

188 行
5.8KB

  1. import socket
  2. import select
  3. import ssl
  4. import struct
  5. import threading
  6. from threading import Thread
  7. import logging as log
  8. class SocketServer:
  9. NO_BUFFERING = 0
  10. LENGTH_BUFFERING = 1
  11. DELIMITER_BUFFERING = 2
  12. def __init__(self, ssl_cert=None, ssl_key=None, buffering=LENGTH_BUFFERING,
  13. delimiter='\n'):
  14. self.terminated = False
  15. self.connected = False
  16. self.buffering = buffering
  17. self.ssl_cert, self.ssl_key = ssl_cert, ssl_key
  18. self.delimiter = delimiter
  19. log.debug("Initialized socket server")
  20. def connect(self, host, port):
  21. self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  22. self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  23. if self.ssl_cert and self.ssl_key:
  24. self.socket = ssl.wrap_socket(self.socket,
  25. certfile=self.ssl_cert,
  26. keyfile=self.ssl_key,
  27. server_side=True)
  28. try:
  29. self.socket.bind((host, port))
  30. except socket.error as e:
  31. log.critical("Error: %s", str(e))
  32. self.socket.listen(5)
  33. def start(self):
  34. Thread(target=self.wait_for_connections).start()
  35. def wait_for_connections(self):
  36. while not self.terminated:
  37. self.accept_connection()
  38. def accept_connection(self):
  39. if self.terminated:
  40. return
  41. try:
  42. conn, addr = self.socket.accept()
  43. except OSError as e:
  44. log.error("OSError while accepting connection: %s", str(e))
  45. return
  46. Thread(target=self.pre_handle_client, args=(conn, addr)).start()
  47. def clean_up(self):
  48. self.terminated = True
  49. self.socket.shutdown(socket.SHUT_RDWR)
  50. self.socket.close()
  51. def pre_handle_client(self, conn, addr):
  52. log.info("Established connection to %s", addr)
  53. client = ClientConnection(conn, addr, self.buffering, self.delimiter)
  54. connected = self.on_connect(client)
  55. while not self.terminated and connected:
  56. try:
  57. read, write, error = select.select([client.conn, ],
  58. [client.conn, ],
  59. [], 5)
  60. except (select.error, ValueError):
  61. log.warn("FATAL ERROR: Connection error %s", addr)
  62. self.on_disconnect(client)
  63. break
  64. if len(read) > 0:
  65. text = client.recv()
  66. if text is None:
  67. log.critical("no text --> disconnecting")
  68. self.on_disconnect(client)
  69. break
  70. log.debug("Received: %s %s", text, addr)
  71. try:
  72. reply = self.on_receive(client, text)
  73. except TypeError:
  74. pass
  75. else:
  76. if reply is not None:
  77. client.send(reply)
  78. log.critical("Terminating connection with %s", addr)
  79. client.quit()
  80. def on_receive(self, client, query):
  81. return query
  82. def on_connect(self, client):
  83. return True
  84. def on_disconnect(self, client):
  85. pass
  86. class ClientConnection:
  87. def __init__(self, conn, addr, buffering, delimiter):
  88. self.conn = conn
  89. self.addr = addr
  90. self.lock = threading.Lock()
  91. self.buffering, self.delimiter = buffering, delimiter
  92. self.left = b''
  93. def ask(self, key, question):
  94. self.send(key, question)
  95. self.conn.settimeout(2)
  96. return self.recv()
  97. def recv(self):
  98. try:
  99. if self.buffering is SocketServer.DELIMITER_BUFFERING:
  100. recv, self.left = recvuntil(self.conn, self.delimiter,
  101. self.left)
  102. elif self.buffering is SocketServer.LENGTH_BUFFERING:
  103. raw_msglen = recvall(self.conn, 4)
  104. if not raw_msglen:
  105. log.debug("%s No msglen: EOF", self.addr)
  106. return None
  107. msglen = struct.unpack('>I', raw_msglen)[0]
  108. recv = recvall(self.conn, msglen)
  109. else:
  110. recv = self.conn.recv(1024)
  111. return recv.decode("utf-8")
  112. except Exception as e:
  113. log.critical("%s Error while receiving: %s", self.addr, str(e))
  114. return None
  115. def send(self, query):
  116. if self.buffering is SocketServer.DELIMITER_BUFFERING:
  117. msg = (query + self.delimiter).encode()
  118. elif self.buffering is SocketServer.LENGTH_BUFFERING:
  119. msg = struct.pack('>I', len(query)) + query.encode()
  120. else:
  121. msg = query.encode()
  122. with self.lock:
  123. try:
  124. self.conn.sendall(msg)
  125. except Exception as e:
  126. log.debug("%s Failed to send %s: %s", self.addr, msg, e)
  127. def on_recv(self):
  128. pass
  129. def quit(self):
  130. self.conn.close()
  131. def recvall(sock, n):
  132. data = b''
  133. while len(data) < n:
  134. packet = sock.recv(n - len(data))
  135. if not packet:
  136. return None
  137. data += packet
  138. return data
  139. def recvuntil(sock, delimiter, left_over):
  140. data = left_over
  141. left = b''
  142. while True:
  143. packet = sock.recv(512)
  144. if not packet:
  145. return None
  146. data += packet
  147. idx = data.find(delimiter.encode())
  148. if idx >= 0:
  149. data = data[:idx]
  150. left = data[idx+len(delimiter):]
  151. break
  152. return data, left
  153. if __name__ == '__main__':
  154. log.basicConfig(level=log.DEBUG)
  155. server = SocketServer(buffering=SocketServer.DELIMITER_BUFFERING,
  156. delimiter='\r\n')
  157. server.connect('localhost', 4242)
  158. server.start()