博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
深入tornado中的http1connection
阅读量:4353 次
发布时间:2019-06-07

本文共 19742 字,大约阅读时间需要 65 分钟。

前言

  tornado中http1connection文件的作用极其重要,他实现了http1.x协议。

  本模块基于gen模块和iostream模块实现异步的处理请求或者响应。

  阅读本文需要一些基础的http知识。

 

正文:

  http协议是建立在tcp基础上的应用层协议,tcp层由TCPServer,IOStream负责,对http报文的读取与解析则由http1connection负责。当http报文被解析完成再交由给某个delegate类实例负责进行后续处理。

接下来说一下大体的过程(tornado作为服务器端):

  如果客户端要向服务端发送http请求,首先要建立tcp连接,

  tornado在连接建立完成后会将连接封装为一个IOStream对象,这个对象可以异步的从连接中读写数据

  tornado中又实现了HTTP1ServerConnection与HTTP1Connection两个类,他们依赖于底层的IOStream从套接字中读写,并共同合作完成了http1.x协议。

  HTTP1Connection实际主要是用来处理http事务(http权威指南:http事务是由一条请求以及对应该请求的响应组成),当然他自己实现了前半部分,也就是对报文起始行、首部、主体进行读,解析;后半部分需要配合HTTPMessageDelegate进行工作。

  HTTPMessageDelegate对经过HTTP1Connection解析后的报文进行分配,然后由其他类(比如说RequestHandler)执行具体的业务逻辑。

  其他类生成响应,并且把响应发送到IOStream中,这时就表示着这条http事务已经完成,我们需要根据情况判断是否关闭连接。

  HTTP1ServerConnection则是不断的生成HTTP1Connection实例,也就是不断的处理http事务,直到连接关闭。

HTTP1Connection类:

  先看HTTP1Connection这个类。这个类实际上主要完成的工作可以概括为两个:

    1  读取并解析报文消息

    2  写入报文

1 读取并解析报文

  当请求或者响应到来的时候,read_response是解析消息的入口,尽管该方法读起来好像仅仅是针对响应,但因为不管是请求或者是响应格式是相差不大的,所以不管是请求或者是响应他都是可以处理的。read_response中主要调用了_read_message方法,解析报文的逻辑也都是在_read_message方法中,另外,本文主要是对其作为服务端时进行分析

先来说一下_read_mssage的大体逻辑:

  首先HTTP1Connection基于iostream读取请求报文,并对请求报文进行解析,分离出起始行 请求首部,并根据请求首部判定是否读取消息主体以及消息主体的长度。

  在这个过程中,分析起始行的信息然后委托给代理(HTTPMessageDelegate)获取对应的RequestHandler(这一步主要是_RequestDispatcher类实现的),并实例化,根据起始行的method调用相关方法,在调用方法执行业务逻辑时可能会用到模板语言,cookie,csrf等等其他东西,但最终会产生响应,并将响应发送到IOStream中。这些工作都是delegate干的。

  最后HTTP1Connection等待响应发送完成(这一步操作是异步的),根据是否支持keep-alive决定是否处理完后关闭连接

def read_response(self, delegate):        """Read a single HTTP response.        """        if self.params.decompress:            delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)        return self._read_message(delegate)    @gen.coroutine    def _read_message(self, delegate):    # self是HTTP1Connection实例对象,delegate是_ServerRequestAdapter实例对象        need_delegate_close = False        try:            header_future = self.stream.read_until_regex(b"\r?\n\r?\n", max_bytes=self.params.max_header_size)            # 两种方式来等待请求头(在服务器模式下是请求,客户端模式下是响应)的读,第一种是什么时候发送过来什么时候读,            # 第二种是设置超时时间时长的定时任务,如果这段时间内没有发送过来那么就关闭连接            if self.params.header_timeout is None: # 第一种                header_data = yield header_future  # 获取起始行以及头部信息的bytes流            else:    # 第二种                try:                    header_data = yield gen.with_timeout(                        self.stream.io_loop.time() + self.params.header_timeout,                        header_future,                        io_loop=self.stream.io_loop,                        quiet_exceptions=iostream.StreamClosedError)                except gen.TimeoutError:                    self.close()                    raise gen.Return(False)            start_line, headers = self._parse_headers(header_data)    # 获取起始行以及头部信息            if self.is_client:                start_line = httputil.parse_response_start_line(start_line)                self._response_start_line = start_line            else:                start_line = httputil.parse_request_start_line(start_line)                self._request_start_line = start_line                self._request_headers = headers            self._disconnect_on_finish = not self._can_keep_alive(start_line, headers) # 如果不是keep alive在响应结束后关闭连接            need_delegate_close = True            with _ExceptionLoggingContext(app_log):                # 这一步会做了很多东西,如果是服务器端,这一步会设置request对象,并根据请求中的url选择对应handler                header_future = delegate.headers_received(start_line, headers)                if header_future is not None:                    yield header_future            if self.stream is None:                # We've been detached.                need_delegate_close = False                raise gen.Return(False)            skip_body = False            if self.is_client:  # 作为client                if (self._request_start_line is not None and                        self._request_start_line.method == 'HEAD'):  # 如果方法是HEAD,那么默认是没有主体的。即使有主体也会被忽略掉                    skip_body = True                code = start_line.code                # 如果客户端发送了一个带条件的GET 请求且该请求已被允许,而文档的内容(自上次访问以来或者根据请求的条件)并没有改变,则服务器应当返回这个304状态码                if code == 304:                      # 304报文可能会包含content-length首部属性,但实际上是没有消息主体的                    # http://tools.ietf.org/html/rfc7230#section-3.3                    skip_body = True                if code >= 100 and code < 200: # 临时的响应。客户端在收到常规响应之前,应准备接收一个或多个1XX响应                    # 1xx 报文是不能包含主体信息的                    if ('Content-Length' in headers or                            'Transfer-Encoding' in headers):                        raise httputil.HTTPInputError(                            "Response code %d cannot have body" % code)                    # 我们所需要的真正的响应还没有接收到,所以继续接收                    yield self._read_message(delegate)            else:                # 1、Expect:100-continue 用于客户端在发送POST数据给服务器前,征询服务器情况,看服务器是否处理POST的数据,                # 如果不处理,客户端则不上传POST数据,如果处理,则POST上传数据。在现实应用中,通过在POST大数据时,才会使用100-continue协议。                # http://www.cnblogs.com/tekkaman/archive/2013/04/03/2997781.html                if (headers.get("Expect") == "100-continue" and not self._write_finished):                    # 默认是支持的,所以收到请求后,返回100。                    self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")            if not skip_body: # 这一步读取消息主体                body_future = self._read_body(start_line.code if self.is_client else 0, headers, delegate)                if body_future is not None:                    if self._body_timeout is None:                        yield body_future                    else:                        try:                            yield gen.with_timeout(                                self.stream.io_loop.time() + self._body_timeout,                                body_future, self.stream.io_loop,                                quiet_exceptions=iostream.StreamClosedError)                        except gen.TimeoutError:                            gen_log.info("Timeout reading body from %s", self.context)                            self.stream.close()                            raise gen.Return(False)            self._read_finished = True            if not self._write_finished or self.is_client:                need_delegate_close = False                with _ExceptionLoggingContext(app_log):                    # 如果是服务器端,这一步会生成对应的handler实例,然后执行业务逻辑,最后将响应写入IOStream中                    delegate.finish()            # If we're waiting for the application to produce an asynchronous            # response, and we're not detached, register a close callback            # on the stream (we didn't need one while we were reading)            # 等待异步响应完成,所有数据都写入 fd,才继续后续处理,详细见 _finish_request/finish 方法实现。            # 当异步写完成,在HTTPServerRequest中调用当前对象的finish方法,finish方法则会调用_finisth_request方法,该方法内部会对_finish_future对象set_result            if (not self._finish_future.done() and self.stream is not None and not self.stream.closed()):                self.stream.set_close_callback(self._on_connection_close)                yield self._finish_future            # 判定是否关闭连接,服务器端一般等待客户端主动关闭,而如果是客户端则根据是否持久连接进行关闭            if self.is_client and self._disconnect_on_finish:                    self.close()            if self.stream is None:                raise gen.Return(False)        except httputil.HTTPInputError as e:            gen_log.info("Malformed HTTP message from %s: %s", self.context, e)            self.close()            raise gen.Return(False)        finally:            if need_delegate_close:                with _ExceptionLoggingContext(app_log):                    delegate.on_connection_close()            self._clear_callbacks()        raise gen.Return(True)

值得注意的是,该方法中读取报文主体的几种方式:

  1 假设没有开启keep-alive,那么我们将连接结束作为报文终止的标志

  2 而如果开启了keep-alive,那么我们根据Content-Length确定当前报文的终止位置

  3 如果开启了分块传输编码(Transfer-Encoding:chunked,这时候Content-Length就不起作用了,实际上在tornado中如果Content-Length以及分块传输编码都指定则会返回错误)那么就会根据分块传输编码的格式一直读取,直到读取到b"0\r\n"时就可以确定当前报文已终止

_read_body方法则根据情况选择读取报文主体的方式,以上三种选择分别对应于以下三种方法:

  1 _read_body_until_close

  2 _read_fixed_body

  3 _read_chunked_body

 来看一下代码:

def _read_body(self, code, headers, delegate):        # https://imququ.com/post/transfer-encoding-header-in-http.html        if "Content-Length" in headers:            if "Transfer-Encoding" in headers:                # Response cannot contain both Content-Length and                # Transfer-Encoding headers.                # If a message is received with both a Transfer-Encoding and a                # Content-Length header field, the Transfer-Encoding overrides the                # Content-Length.                  # http://tools.ietf.org/html/rfc7230#section-3.3.3                raise httputil.HTTPInputError("Response with both Transfer-Encoding and Content-Length")            if "," in headers["Content-Length"]:                # Proxies sometimes cause Content-Length headers to get                # duplicated.  If all the values are identical then we can                # use them but if they differ it's an error.                pieces = re.split(r',\s*', headers["Content-Length"])                if any(i != pieces[0] for i in pieces):                    raise httputil.HTTPInputError(                        "Multiple unequal Content-Lengths: %r" %                        headers["Content-Length"])                headers["Content-Length"] = pieces[0]            try:                content_length = int(headers["Content-Length"])            except ValueError:                # Handles non-integer Content-Length value.                raise httputil.HTTPInputError(                    "Only integer Content-Length is allowed: %s" % headers["Content-Length"])            if content_length > self._max_body_size:                raise httputil.HTTPInputError("Content-Length too long")        else:            content_length = None        if code == 204:        # 状态码204(无内容)            # This response code is not allowed to have a non-empty body,            # and has an implicit length of zero instead of read-until-close.            # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3            if ("Transfer-Encoding" in headers or content_length not in (None, 0)):                raise httputil.HTTPInputError(                    "Response with code %d should not have body" % code)            content_length = 0        if content_length is not None: # 而如果开启了keep-alive,那么我们根据Content-Length确定当前报文的终止位置            return self._read_fixed_body(content_length, delegate)        if headers.get("Transfer-Encoding") == "chunked": # 开启了分块传输编码            return self._read_chunked_body(delegate)        if self.is_client: # 非持久连接            return self._read_body_until_close(delegate)        return None    @gen.coroutine    def _read_fixed_body(self, content_length, delegate):        while content_length > 0:            body = yield self.stream.read_bytes(min(self.params.chunk_size, content_length), partial=True)            content_length -= len(body)            if not self._write_finished or self.is_client:                with _ExceptionLoggingContext(app_log):                    ret = delegate.data_received(body)                    if ret is not None:                        yield ret    @gen.coroutine    def _read_chunked_body(self, delegate):        # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1        total_size = 0        while True:            # 先读取chunk长度            chunk_len = yield self.stream.read_until(b"\r\n", max_bytes=64)            chunk_len = int(chunk_len.strip(), 16)            # 如果chunk长度为0,表示分块传输的终止            if chunk_len == 0:                return            total_size += chunk_len            # 检测长度是否过大            if total_size > self._max_body_size:                raise httputil.HTTPInputError("chunked body too large")            bytes_to_read = chunk_len            while bytes_to_read:                # 读取该长度对应的data                chunk = yield self.stream.read_bytes(                    min(bytes_to_read, self.params.chunk_size), partial=True)                bytes_to_read -= len(chunk)                if not self._write_finished or self.is_client:                    with _ExceptionLoggingContext(app_log):                        # 读取的消息主体要交给代理(HTTPMessageDelegate)处理                        ret = delegate.data_received(chunk)                        if ret is not None:                            yield ret            # chunk ends with \r\n            # 每一个data后面都有一个CRLF            crlf = yield self.stream.read_bytes(2)            assert crlf == b"\r\n"    @gen.coroutine    def _read_body_until_close(self, delegate):        body = yield self.stream.read_until_close()        if not self._write_finished or self.is_client:            with _ExceptionLoggingContext(app_log):                delegate.data_received(body)
View Code

 2 写入报文

  写入报文主要可以分两步:

    1  写入报文起始行以及头部

    2  写入报文主体

与之相关的方法主要有三个,来看源码:

def write_headers(self, start_line, headers, chunk=None, callback=None):        """Implements `.HTTPConnection.write_headers`.写入起始行和消息头"""        lines = []        if self.is_client:    # 客户端,那就是发送请求了            self._request_start_line = start_line            lines.append(utf8('%s %s HTTP/1.1' % (start_line[0], start_line[1])))            # Client requests with a non-empty body must have either a            # Content-Length or a Transfer-Encoding.            self._chunking_output = (                start_line.method in ('POST', 'PUT', 'PATCH') and                'Content-Length' not in headers and                'Transfer-Encoding' not in headers)        else:    # 服务端,那就是发送响应了            self._response_start_line = start_line            lines.append(utf8('HTTP/1.1 %d %s' % (start_line[1], start_line[2])))            self._chunking_output = (                # TODO: should this use                # self._request_start_line.version or                # start_line.version?                self._request_start_line.version == 'HTTP/1.1' and                # 304 responses have no body (not even a zero-length body), and so                # should not have either Content-Length or Transfer-Encoding.                # headers.                start_line.code not in (204, 304) and                # No need to chunk the output if a Content-Length is specified.                'Content-Length' not in headers and                # Applications are discouraged from touching Transfer-Encoding,                # but if they do, leave it alone.                'Transfer-Encoding' not in headers)            # If a 1.0 client asked for keep-alive, add the header.            if (self._request_start_line.version == 'HTTP/1.0' and                (self._request_headers.get('Connection', '').lower() ==                 'keep-alive')):                headers['Connection'] = 'Keep-Alive'        # tornado无论作为客户端还是服务端默认是支持分块传输的        if self._chunking_output:            headers['Transfer-Encoding'] = 'chunked'        # 这下这种情况消息主体应为空        if (not self.is_client and            (self._request_start_line.method == 'HEAD' or             start_line.code == 304)):            self._expected_content_remaining = 0        elif 'Content-Length' in headers:            self._expected_content_remaining = int(headers['Content-Length'])        else:            self._expected_content_remaining = None        # TODO: headers are supposed to be of type str, but we still have some        # cases that let bytes slip through. Remove these native_str calls when those        # are fixed.        header_lines = (native_str(n) + ": " + native_str(v) for n, v in headers.get_all())        if PY3:            lines.extend(l.encode('latin1') for l in header_lines)        else:            lines.extend(header_lines)        for line in lines:            if b'\n' in line:                raise ValueError('Newline in header: ' + repr(line))        future = None        if self.stream.closed():            future = self._write_future = Future()            future.set_exception(iostream.StreamClosedError())            future.exception()        else:            if callback is not None:                self._write_callback = stack_context.wrap(callback)            else:                future = self._write_future = Future()            data = b"\r\n".join(lines) + b"\r\n\r\n"            if chunk:                data += self._format_chunk(chunk)            self._pending_write = self.stream.write(data)            self._pending_write.add_done_callback(self._on_write_complete)        return future    def _format_chunk(self, chunk):        '''如果采用了分块传输编码,则将参数chunk转换为http中规定的格式。如果没有使用分块传输编码则原样返回'''        if self._expected_content_remaining is not None:            self._expected_content_remaining -= len(chunk)            if self._expected_content_remaining < 0:                # Close the stream now to stop further framing errors.                self.stream.close()                raise httputil.HTTPOutputError(                    "Tried to write more data than Content-Length")        if self._chunking_output and chunk:            # Don't write out empty chunks because that means END-OF-STREAM            # with chunked encoding            return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n"        else:            return chunk    def write(self, chunk, callback=None):        """Implements `.HTTPConnection.write`. 写入报文主体        For backwards compatibility is is allowed but deprecated to        skip `write_headers` and instead call `write()` with a        pre-encoded header block.        """        future = None        if self.stream.closed():            future = self._write_future = Future()            self._write_future.set_exception(iostream.StreamClosedError())            self._write_future.exception()        else:            if callback is not None:                self._write_callback = stack_context.wrap(callback)            else:                future = self._write_future = Future()            self._pending_write = self.stream.write(self._format_chunk(chunk))            self._pending_write.add_done_callback(self._on_write_complete)        return future
View Code

 

HTTP1ServerConnection类

HTTP1ServerConnection比较简单,主要实现了服务端处理逻辑:

  在本条tcp连接上,不停的处理http事务(当然也有可能客户端不支持持久连接所以处理完一条http事务后,tcp连接被关闭)

  当发生异常时,关闭连接。

其中start_serving方法是入口,其内部调用了_server_request_loop,来看代码

def start_serving(self, delegate):        """            Starts serving requests on this connection.            :arg delegate: a `.HTTPServerConnectionDelegate`        """        assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)         self._serving_future = self._server_request_loop(delegate)        # Register the future on the IOLoop so its errors get logged.        self.stream.io_loop.add_future(self._serving_future, lambda f: f.result())    @gen.coroutine    def _server_request_loop(self, delegate):        try:            while True:                # 不断处理http事务,知道连接关闭或者出现异常                conn = HTTP1Connection(self.stream, False, self.params, self.context)                request_delegate = delegate.start_request(self, conn)                    try:                    ret = yield conn.read_response(request_delegate)                except (iostream.StreamClosedError, iostream.UnsatisfiableReadError): # 连接关闭                    return                except _QuietException:                    # This exception was already logged.                    conn.close()                    return                except Exception:                    gen_log.error("Uncaught exception", exc_info=True)                    conn.close()                    return                if not ret:                    return                yield gen.moment        finally:            delegate.on_close(self)
View Code

 

参考:

  

  

 

转载于:https://www.cnblogs.com/MnCu8261/p/6888904.html

你可能感兴趣的文章
七尖记
查看>>
[翻译] VBFPopFlatButton
查看>>
PlaceholderImageView
查看>>
Ubuntu14.04一直进入guest session解决办法
查看>>
BZOJ2462[Beijing2011]矩阵模板(二维Hash)
查看>>
SAP(最短增广路算法) 最大流模板
查看>>
用极大化思想解决矩形问题学习笔记
查看>>
Django REST Framework 简单入门
查看>>
Hibernate中fetch和lazy介绍
查看>>
修改ip脚本
查看>>
解析xlsx与xls--使用2012poi.jar
查看>>
java5,java6新特性
查看>>
【LOJ】#2290. 「THUWC 2017」随机二分图
查看>>
SSL-ZYC 活动安排
查看>>
Git clone 报错 128
查看>>
在Python中执行普通除法
查看>>
编译原理(第三版) 语法分析器
查看>>
c# 动态绘制直线和曲线
查看>>
Spring理解?
查看>>
删除无限循环的文件夹-删除递归文件夹
查看>>