shadowsocks源码解读(二):SS Server工作流程
上一节中写到shadowsocks由SS Local和SS Server组成,因为工作原理相似,且SS Server方便调试跟踪,所以以SS Server来分析其中具体流程。
运行ssserver -p 10001 -k pass -m aes-256-cfb
,运行SS Server
* 监听本地10001端口,密码pass,加密方式aes-256-cfb
在server.py的main()
下设置断点方便调试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
def main(): # ....省略配置处理相关代码 tcp_servers = [] udp_servers = [] dns_resolver = asyncdns.DNSResolver() port_password = config['port_password'] del config['port_password'] for port, password in port_password.items(): a_config = config.copy() a_config['server_port'] = int(port) a_config['password'] = password logging.info("starting server at %s:%d" % (a_config['server'], int(port))) 实例化一个tcprelay.TCPRelay类放入到tcp_servers tcp_servers.append(tcprelay.TCPRelay(a_config, dns_resolver, False)) udp_servers.append(udprelay.UDPRelay(a_config, dns_resolver, False)) |
程序一开始先实例化一个TCPRelay类,我们看看TCPRelay在初始化的时候干了什么。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
def __init__(self, config, dns_resolver, is_local, stat_callback=None): # .....省略 # 判断是SS Local还是SS Server if is_local: listen_addr = config['local_address'] listen_port = config['local_port'] else: listen_addr = config['server'] listen_port = config['server_port'] self._listen_port = listen_port addrs = socket.getaddrinfo(listen_addr, listen_port, 0, socket.SOCK_STREAM, socket.SOL_TCP) if len(addrs) == 0: raise Exception("can't get addrinfo for %s:%d" % (listen_addr, listen_port)) af, socktype, proto, canonname, sa = addrs[0] # 创建一个socket server_socket = socket.socket(af, socktype, proto) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 绑定到监听的端口 server_socket.bind(sa) server_socket.setblocking(False) if config['fast_open']: try: server_socket.setsockopt(socket.SOL_TCP, 23, 5) except socket.error: logging.error('warning: fast open is not available') self._config['fast_open'] = False # 监听端口 server_socket.listen(1024) self._server_socket = server_socket |
初始化的时候根据配置项新建一个socket并绑定到端口进行监听,之后进入到run_server()
函数
1 2 3 4 5 6 7 8 9 10 11 12 13 |
def run_server(): # ...... try: # 实例化一个EventLoop类事件循环 # 之后将上面初始化的dns、tcp和udp类加入循环中 loop = eventloop.EventLoop() dns_resolver.add_to_loop(loop) list(map(lambda s: s.add_to_loop(loop), tcp_servers + udp_servers)) daemon.set_user(config.get('user', None)) loop.run() # ... |
再来看看EventLoop初始化干了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
class EventLoop(object): def __init__(self): # 根据操作系统选择不同的方法,将select、poll都包装成了epoll的用法 if hasattr(select, 'epoll'): self._impl = select.epoll() model = 'epoll' elif hasattr(select, 'kqueue'): self._impl = KqueueLoop() model = 'kqueue' elif hasattr(select, 'select'): self._impl = SelectLoop() model = 'select' else: raise Exception('can not find any available functions in select ' 'package') # 这个很重要,建立一个以fd为key,(f,handler)为值的字典,之后会根据这个映射分发事件到各自的handler self._fdmap = {} # (f, handler) self._last_time = time.time() self._periodic_callbacks = [] self._stopping = False logging.debug('using event model: %s', model) |
然后是add_to_loop()
函数
1 2 3 4 5 6 7 8 9 10 11 |
def add_to_loop(self, loop): if self._eventloop: raise Exception('already add to loop') if self._closed: raise Exception('already closed') self._eventloop = loop # 加入到事件循环中,并监听socket的可读和错误事件 self._eventloop.add(self._server_socket, eventloop.POLL_IN | eventloop.POLL_ERR, self) self._eventloop.add_periodic(self.handle_periodic) |
在调用self._eventloop.add()
函数时,会在EventLoop类中建立一个映射
1 2 3 4 5 6 |
def add(self, f, mode, handler): fd = f.fileno() # 这个很重要 self._fdmap[fd] = (f, handler) self._impl.register(fd, mode) |
到这边总结下程序做的工作,初始化一个TCPRelay类监听本地端口,将它加入事件循环,监听socket的可读和错误事件,事件循环中有一个映射,将fd(文件描述符)映射到对应的(socket,handler),以便之后检测到有请求发生的时候,将请求分发给各自的handler处理,事件循环只管监听事件。
接下来进入到核心的loop.run()
函数中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
def run(self): events = [] while not self._stopping: asap = False try: # 当有新的请求发送到监听的本地端口时,TCPRelay可读 events = self.poll(TIMEOUT_PRECISION) # ...... for sock, fd, event in events: # 查询fd对应的handler,这个handler是最开始初始化的TCPRelay handler = self._fdmap.get(fd, None) if handler is not None: handler = handler[1] try: 直接交给handler处理事件 handler.handle_event(sock, fd, event) # ...... |
然后事件循环继续监听事件。
继续看handler.handle_event()
函数怎么处理请求的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def handle_event(self, sock, fd, event): # 这边注意下 if sock == self._server_socket: try: logging.debug('accept') # 接受到新请求之后会生成一个conn conn = self._server_socket.accept() TCPRelayHandler(self, self._fd_to_handlers, self._eventloop, conn[0], self._config, self._dns_resolver, self._is_local) else: if sock: handler = self._fd_to_handlers.get(fd, None) if handler: handler.handle_event(sock, event) |
然后交给TCPRelayHandler处理请求
TCPRelayHandler在初始化的时候会将conn这个socket加入到loop循环中监听可读或者错误事件
if sock == self._server_socket
判断当前socket是客户端第一次请求连接还是客户端连接之后发送的请求。
如果是客户端第一次连接,调用TCPRelayHandler,会将新的socket加入事件循环,并且在TCPRelay._fd_to_handlers添加映射,这个等下再讲用途。
如果是客户端在连接之后发来的请求的话,会在self._fd_to_handlers.get()
查找handler,也就是客户端第一次连接创建的映射,此时handler是TCPRelayHandler,进入handle_event()
函数后,这时候如果请求正常的话会进入到self._on_local_read()
函数
1 2 3 4 5 6 7 8 9 10 11 12 13 |
def _on_local_read(self): # ...... data = None try: # 读取请求内容 data = self._local_sock.recv(BUF_SIZE) # ...... if not is_local: # 将请求解密 data = self._encryptor.decrypt(data) # ...... self._handle_stage_addr(data) |
之后就是TCPRelayHandler在负责处理数据了。
所以处理的流程大致理清了,我画了一张流程图,更好地理清思路。
总结
shadowsocks原理并不难,主要是程序一直处在EventLoop的循环中,需要在循环里面观察各个模块的运行情况,而且SS Server和SS Local共用同一个TCPRelay,所以就增加了代码的复杂度,只要耐心看下去,摸清楚哪个模块干了些什么事之后就简单了。
链接:https://www.ioiogoo.cn/2016/12/22/shadowsocks源码解读(二):ss-server工作流程/
本站所有文章除特殊说明外均为原创,转载请注明出处!