查看: 321|回复: 0

ESP32 Picoweb教程:修改返回的HTTP代码

[复制链接]
  • TA的每日心情

    2018-1-11 13:58
  • 签到天数: 8 天

    [LV.3]偶尔看看II

    发表于 2019-5-16 12:03 | 显示全部楼层 |阅读模式
    本帖最后由 dfrobot 于 2019-5-13 18:19 编辑

    引 言
    本文主要说明如何在MicroPython Picoweb应用程序中返回特定的HTTP代码。
    我们将使用一个叫做http_error的函数。这个函数其实也可以返回其他类型的HTTP代码,比如在2xx范围内的代码(成功代码)。可用的HTTP响应代码可参见此处(https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html)。
    测试使用的是一个集成在ESP32开发板中的DFRobot的sp - wroom -32设备。代码开发是在MicroPython IDE uPyCraft上完成的。你可以在前一篇文章:ESP32 MicroPython教程:uPyCraft IDE入门 查看如何使用uPyCraft。
    尽管本文将使用一个特殊的函数来返回状态代码,但是上一篇教程中用来将第一部分HTTP响应发送给客户端的start_response函数也有一个可选的状态参数可用于返回特殊的HTTP代码。默认情况下,其返回值是200,对应于OK [1]。


    start_response函数代码如下:

    kittenblock中小学创客名师推荐的图形化编程软件

    
    # Picoweb web pico-framework for MicroPython
    # Copyright (c) 2014-2018 Paul Sokolovsky
    # SPDX-License-Identifier: MIT
    import sys
    import gc
    import micropython
    import utime
    import uio
    import ure as re
    import uerrno
    import uasyncio as asyncio
    import pkg_resources
    
    from .utils import parse_qs
    
    
    def get_mime_type(fname):
        # Provide minimal detection of important file
        # types to keep browsers happy
        if fname.endswith(".html"):
            return "text/html"
        if fname.endswith(".css"):
            return "text/css"
        if fname.endswith(".png") or fname.endswith(".jpg"):
            return "image"
        return "text/plain"
    
    def sendstream(writer, f):
        buf = bytearray(64)
        while True:
            l = f.readinto(buf)
            if not l:
                break
            yield from writer.awrite(buf, 0, l)
    
    
    def jsonify(writer, dict):
        import ujson
        yield from start_response(writer, "application/json")
        yield from writer.awrite(ujson.dumps(dict))
    
    def start_response(writer, content_type="text/html", status="200", headers=None):
        yield from writer.awrite("HTTP/1.0 %s NA\r\n" % status)
        yield from writer.awrite("Content-Type: ")
        yield from writer.awrite(content_type)
        if not headers:
            yield from writer.awrite("\r\n\r\n")
            return
        yield from writer.awrite("\r\n")
        if isinstance(headers, bytes) or isinstance(headers, str):
            yield from writer.awrite(headers)
        else:
            for k, v in headers.items():
                yield from writer.awrite(k)
                yield from writer.awrite(": ")
                yield from writer.awrite(v)
                yield from writer.awrite("\r\n")
        yield from writer.awrite("\r\n")
    
    def http_error(writer, status):
        yield from start_response(writer, status=status)
        yield from writer.awrite(status)
    
    
    class HTTPRequest:
    
        def __init__(self):
            pass
    
        def read_form_data(self):
            size = int(self.headers[b"Content-Length"])
            data = yield from self.reader.read(size)
            form = parse_qs(data.decode())
            self.form = form
    
        def parse_qs(self):
            form = parse_qs(self.qs)
            self.form = form
    
    
    class WebApp:
    
        def __init__(self, pkg, routes=None, serve_static=True):
            if routes:
                self.url_map = routes
            else:
                self.url_map = []
            if pkg and pkg != "__main__":
                self.pkg = pkg.split(".", 1)[0]
            else:
                self.pkg = None
            if serve_static:
                self.url_map.append((re.compile("^/(static/.+)"), self.handle_static))
            self.mounts = []
            self.inited = False
            # Instantiated lazily
            self.template_loader = None
            self.headers_mode = "parse"
    
        def parse_headers(self, reader):
            headers = {}
            while True:
                l = yield from reader.readline()
                if l == b"\r\n":
                    break
                k, v = l.split(b":", 1)
                headers[k] = v.strip()
            return headers
    
        def _handle(self, reader, writer):
            if self.debug > 1:
                micropython.mem_info()
    
            close = True
            try:
                request_line = yield from reader.readline()
                if request_line == b"":
                    if self.debug >= 0:
                        self.log.error("%s: EOF on request start" % reader)
                    yield from writer.aclose()
                    return
                req = HTTPRequest()
                # TODO: bytes vs str
                request_line = request_line.decode()
                method, path, proto = request_line.split()
                if self.debug >= 0:
                    self.log.info('%.3f %s %s "%s %s"' % (utime.time(), req, writer, method, path))
                path = path.split("?", 1)
                qs = ""
                if len(path) > 1:
                    qs = path[1]
                path = path[0]
    
                #print("================")
                #print(req, writer)
                #print(req, (method, path, qs, proto), req.headers)
    
                # Find which mounted subapp (if any) should handle this request
                app = self
                while True:
                    found = False
                    for subapp in app.mounts:
                        root = subapp.url
                        #print(path, "vs", root)
                        if path[:len(root)] == root:
                            app = subapp
                            found = True
                            path = path[len(root):]
                            if not path.startswith("/"):
                                path = "/" + path
                            break
                    if not found:
                        break
    
                # We initialize apps on demand, when they really get requests
                if not app.inited:
                    app.init()
    
                # Find handler to serve this request in app's url_map
                found = False
                for e in app.url_map:
                    pattern = e[0]
                    handler = e[1]
                    extra = {}
                    if len(e) > 2:
                        extra = e[2]
    
                    if path == pattern:
                        found = True
                        break
                    elif not isinstance(pattern, str):
                        # Anything which is non-string assumed to be a ducktype
                        # pattern matcher, whose .match() method is called. (Note:
                        # Django uses .search() instead, but .match() is more
                        # efficient and we're not exactly compatible with Django
                        # URL matching anyway.)
                        m = pattern.match(path)
                        if m:
                            req.url_match = m
                            found = True
                            break
    
                if not found:
                    headers_mode = "skip"
                else:
                    headers_mode = extra.get("headers", self.headers_mode)
    
                if headers_mode == "skip":
                    while True:
                        l = yield from reader.readline()
                        if l == b"\r\n":
                            break
                elif headers_mode == "parse":
                    req.headers = yield from self.parse_headers(reader)
                else:
                    assert headers_mode == "leave"
    
                if found:
                    req.method = method
                    req.path = path
                    req.qs = qs
                    req.reader = reader
                    close = yield from handler(req, writer)
                else:
                    yield from start_response(writer, status="404")
                    yield from writer.awrite("404\r\n")
                #print(req, "After response write")
            except Exception as e:
                if self.debug >= 0:
                    self.log.exc(e, "%.3f %s %s %r" % (utime.time(), req, writer, e))
    
            if close is not False:
                yield from writer.aclose()
            if __debug__ and self.debug > 1:
                self.log.debug("%.3f %s Finished processing request", utime.time(), req)
    
        def mount(self, url, app):
            "Mount a sub-app at the url of current app."
            # Inspired by Bottle. It might seem that dispatching to
            # subapps would rather be handled by normal routes, but
            # arguably, that's less efficient. Taking into account
            # that paradigmatically there's difference between handing
            # an action and delegating responisibilities to another
            # app, Bottle's way was followed.
            app.url = url
            self.mounts.append(app)
    
        def route(self, url, **kwargs):
            def _route(f):
                self.url_map.append((url, f, kwargs))
                return f
            return _route
    
        def add_url_rule(self, url, func, **kwargs):
            # Note: this method skips Flask's "endpoint" argument,
            # because it's alleged bloat.
            self.url_map.append((url, func, kwargs))
    
        def _load_template(self, tmpl_name):
            if self.template_loader is None:
                import utemplate.source
                self.template_loader = utemplate.source.Loader(self.pkg, "templates")
            return self.template_loader.load(tmpl_name)
    
        def render_template(self, writer, tmpl_name, args=()):
            tmpl = self._load_template(tmpl_name)
            for s in tmpl(*args):
                yield from writer.awrite(s)
    
        def render_str(self, tmpl_name, args=()):
            #TODO: bloat
            tmpl = self._load_template(tmpl_name)
            return ''.join(tmpl(*args))
    
        def sendfile(self, writer, fname, content_type=None, headers=None):
            if not content_type:
                content_type = get_mime_type(fname)
            try:
                with pkg_resources.resource_stream(self.pkg, fname) as f:
                    yield from start_response(writer, content_type, "200", headers)
                    yield from sendstream(writer, f)
            except OSError as e:
                if e.args[0] == uerrno.ENOENT:
                    yield from http_error(writer, "404")
                else:
                    raise
    
        def handle_static(self, req, resp):
            path = req.url_match.group(1)
            print(path)
            if ".." in path:
                yield from http_error(resp, "403")
                return
            yield from self.sendfile(resp, path)
    
        def init(self):
            """Initialize a web application. This is for overriding by subclasses.
            This is good place to connect to/initialize a database, for example."""
            self.inited = True
    
        def run(self, host="127.0.0.1", port=8081, debug=False, lazy_init=False, log=None):
            if log is None and debug >= 0:
                import ulogging
                log = ulogging.getLogger("picoweb")
                if debug > 0:
                    log.setLevel(ulogging.DEBUG)
            self.log = log
            gc.collect()
            self.debug = int(debug)
            self.init()
            if not lazy_init:
                for app in self.mounts:
                    app.init()
            loop = asyncio.get_event_loop()
            if debug > 0:
                print("* Running on http://%s:%s/" % (host, port))
            loop.create_task(asyncio.start_server(self._handle, host, port))
            loop.run_forever()
            loop.close()
    



    代 码
    首先,我们要导入HTTP服务器设置所需要的picoweb模块,以及建立ESP32 WiFi网络连接所需要的网络模块。


    然后,连接到WiFi网络。这一段代码是通用代码,在上一篇教程:ESP32 Picoweb教程:提供JSON内容 中有详细说明。您只需要把ssid和密码变量换成您的WiFi网络信息,ESP32就能连接到WiFi网络。
    请注意,我们将在程序末尾把分配给ESP32的IP地址存储到一个变量中,以便稍后将其作为参数传递给app(应用程序)实例的run(运行)方法。

    kittenblock中小学创客名师推荐的图形化编程软件

    ssid = "yourNetworkName"
    password =  "yourPassword"
    station = network.WLAN(network.STA_IF)
    station.active(True)
    station.connect(ssid, password)
    while station.isconnected() == False:
    pass
    ip = station.ifconfig()

    连接到WiFi网络之后,我们就需要创建app(应用程序)实例,并声明它要监听的路由。在本例中,我们只使用一个路由对服务器返回的HTTP内部服务器错误(https://www.lifewire.com/500-internal-server-error-explained-2622938)进行测试。我们将监听“/internalerror”端点。

    kittenblock中小学创客名师推荐的图形化编程软件

    app = picoweb.WebApp(__name__)
    @app.route("/internalerror")
    def internalError(req, resp):
    ##Handling function code

    为简单起见,我们的路由处理函数会立即返回HTTP错误。当然,在实际的应用中,在向某个端点返回一条错误之前,肯定会有一些相关的控制逻辑。
    只需调用picoweb模块的http_error函数(https://github.com/pfalcon/picoweb/blob/master/picoweb/__init__.py#L42)即可返回错误代码,函数参数是以字符串格式表示的客户端数据流写入器对象和错误代码。内部服务器错误对应的代码是500 [1]。您也可以对其他状态代码进行测试。
    由于该函数内部使用了数据流写入器的awrite方法,所以我们需要使用yield from关键字。

    kittenblock中小学创客名师推荐的图形化编程软件

    yield from picoweb.http_error(resp, "500")

    最后,只需调用app(应用程序)实例的run(运行)方法启动服务器即可。最终的源代码如下所示,其中包含了函数调用(使用分配给ESP32的IP地址)。

    kittenblock中小学创客名师推荐的图形化编程软件

    import picoweb
    import network
    ssid = "yourNetworkName"
    password =  "yourPassword"
    station = network.WLAN(network.STA_IF)
    station.active(True)
    station.connect(ssid, password)
    while station.isconnected() == False:
      pass
    ip = station.ifconfig()
    app = picoweb.WebApp(__name__)
    @app.route("/internalerror")
    def internalError(req, resp):
        yield from picoweb.http_error(resp, "500")
    app.run(debug=True, host =ip[0])
    


    测试代码
    要对代码进行测试,只需将脚本上传到ESP32开发板运行即可。脚本执行时,控制台上会显示一条消息,表示服务器正在监听的根路径。
    您只需将URL复制到一个网页浏览器中并在后面加上internalerror字样(对应于应用程序正在监听的路由)。所得到的输出结果如图1所示。请注意,我打开了Chaome浏览器的开发者工具选项,以查看更详细的请求信息。


    图1 - 返回的内部服务器错误HTTP代码。


    如引言部分所述,尽管该函数名为http_error,但是实际上我们也可以用它来返回非错误代码(比如对应于新建资源的代码201 [1])。被返回的HTTP代码如图2所示。注意,我仅仅在之前的代码中把传递给http_error的状态代码从500改成了201,所以路由仍将其称为“internalerror”。

    图2 - 修改的HTTP代码。


    查看更多ESP32/ESP8266教程和项目,请点击 :ESP32教程汇总贴
    英文版教程 : ESP32 tutorial


    打赏作者鼓励一下!
    您需要登录后才可以回帖 登录 | 立即注册  

    本版积分规则

    热门推荐

    Arduino lcd屏幕亮了但是不显示字符
    Arduino lcd屏幕亮了但是
    Arduino的lcd屏亮了但是上面没有字符显示,串进去的滑动变阻器也旋过了,但是还是没有
    【原创】全球最小口袋3D打印机mini one直播教程贴
    【原创】全球最小口袋3D打
    最近闲得蛋疼,没事搞个掌上3D打印机,先放效果图吧。 搞了半天,终于能正常打印,
    [限时福利]5分钟带你快速了解新一代开发板:M5STACK
    [限时福利]5分钟带你快速
    一、什么是M5Stack M5Stack是一种模块化、可堆叠扩展的开发板,每个模块
    【Arduino】108种传感器模块系列实验(98)---L298N电机驱动板
    【Arduino】108种传感器模
    37款传感器与模块的提法,在网络上广泛流传,其实Arduino能够兼容的传感器模块肯定是
    两个ESP8266通过云端实现远程数据交互
    两个ESP8266通过云端实现
    原理简述:利用发布订阅模式。一个ESP8266作为消息发布者,另一个ESP8266作为消息订阅
    Copyright   ©2015-2016  Arduino中文社区  Powered by©Discuz!   ( 蜀ICP备14017632号-3 )
    快速回复 返回顶部 返回列表