博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
tornado和subprocess实现程序的非堵塞异步处理
阅读量:6933 次
发布时间:2019-06-27

本文共 5099 字,大约阅读时间需要 16 分钟。

tornado     是由Facebook开源的一个服务器“套装",适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主要以分析tornado的网络部分即异步事件处理与上层的IOstream类提供的异步IO,其他的模块如web的tornado.web 以后慢慢留作分析。

源码组织:

  |---__init__.py

   ---auth.py

   ---......

   ---epoll.c

   ---ioloop.py

   ---iostream.py

   ---...

  tornado网络部分最核心的两个模块就是ioloop.py与iostream.py,我们主要分析的就是这两个部分。

  ioloop.py 主要的是将底层的epoll或者说是其他的IO多路复用封装作异步事件来处理

  iostream.py主要是对于下层的异步事件的进一步封装,为其封装了更上一层的buffer(IO)事件.

这段时间一直在学习tornado的 异步的处理。主要是用ioloop实现多路复用。

下面是个例子,有tornado基础的朋友,一看就懂的~

import subprocessimport tornado.ioloopimport timeimport fcntlimport functoolsimport osclass GenericSubprocess (object):    def __init__ ( self, timeout=-1, **popen_args ):        self.args = dict()        self.args["stdout"] = subprocess.PIPE        self.args["stderr"] = subprocess.PIPE        self.args["close_fds"] = True        self.args.update(popen_args)        self.ioloop = None        self.expiration = None        self.pipe = None        self.timeout = timeout        self.streams = []        self.has_timed_out = False    def start(self):        """Spawn the task.        Throws RuntimeError if the task was already started."""        if not self.pipe is None:            raise RuntimeError("Cannot start task twice")        self.ioloop = tornado.ioloop.IOLoop.instance()        if self.timeout > 0:            self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout )        self.pipe = subprocess.Popen(**self.args)        self.streams = [ (self.pipe.stdout.fileno(), []),                             (self.pipe.stderr.fileno(), []) ]        for fd, d in self.streams:            flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY            fcntl.fcntl( fd, fcntl.F_SETFL, flags)            self.ioloop.add_handler( fd,                                     self.stat,                                     self.ioloop.READ|self.ioloop.ERROR)    def on_timeout(self):        self.has_timed_out = True        self.cancel()    def cancel (self ) :        """Cancel task execution        Sends SIGKILL to the child process."""        try:            self.pipe.kill()        except:            pass    def stat( self, *args ):        '''Check process completion and consume pending I/O data'''        self.pipe.poll()        if not self.pipe.returncode is None:            '''cleanup handlers and timeouts'''            if not self.expiration is None:                self.ioloop.remove_timeout(self.expiration)            for fd, dest in  self.streams:                self.ioloop.remove_handler(fd)            '''schedulle callback (first try to read all pending data)'''            self.ioloop.add_callback(self.on_finish)        for fd, dest in  self.streams:            while True:                try:                    data = os.read(fd, 4096)                    if len(data) == 0:                        break                    dest.extend([data])                except:                    break    @property    def stdout(self):        return self.get_output(0)    @property    def stderr(self):        return self.get_output(1)    @property    def status(self):        return self.pipe.returncode    def get_output(self, index ):        return "".join(self.streams[index][1])    def on_finish(self):        raise NotImplemented()class Subprocess (GenericSubprocess):    def __init__ ( self, callback, *args, **kwargs):        self.callback = callback        self.done_callback = False        GenericSubprocess.__init__(self, *args, **kwargs)    def on_finish(self):        if not self.done_callback:            self.done_callback = True            '''prevent calling callback twice'''            self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))if __name__ == "__main__":    ioloop = tornado.ioloop.IOLoop.instance()    def print_timeout( status, stdout, stderr, has_timed_out) :        assert(status!=0)        assert(has_timed_out)        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)    def print_ok( status, stdout, stderr, has_timed_out) :        assert(status==0)        assert(not has_timed_out)        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)    def print_error( status, stdout, stderr, has_timed_out):        assert(status!=0)        assert(not has_timed_out)        print "OK status:", repr(status), "stdout:", repr(stdout), "stderr:", repr(stderr), "timeout:", repr(has_timed_out)    def stop_test():        ioloop.stop()    t1 = Subprocess( print_timeout, timeout=3, args=[ "sleep","5"] )    t2 = Subprocess( print_ok, timeout=3, args=[ "ip", "a" ] )    t3 = Subprocess( print_ok, timeout=3, args=[ "sleepdsdasdas", "1" ] )    t4 = Subprocess( print_error, timeout=3, args=[ "cat", "/etc/sdfsdfsdfsdfsdfsdfsdf" ] )    t1.start()    t2.start()    try:        t3.start()        assert(false)    except:        print "OK"    t4.start()    ioloop.add_timeout(time.time() + 10, stop_test)    ioloop.start()

转载地址:http://zfgjl.baihongyu.com/

你可能感兴趣的文章
spring框架使用Quartz执行定时任务实例详解
查看>>
全链路跟踪系统设计与实践(转载)
查看>>
支付接口教程,详解支付宝接口(二)
查看>>
SourceTree 教程文档(了解界面)
查看>>
wpf 依赖属性和附加属性
查看>>
rocketMq-producer介绍
查看>>
谨慎的Waymo CEO:未来几十年,自动驾驶无法做到无处不在
查看>>
Django搭建个人博客(二)
查看>>
SSM+maven实现答题管理系统(二)
查看>>
玩转报表排名
查看>>
SQL Server 默认跟踪(Default Trace)
查看>>
[剑指offer] 字符流中第一个不重复的字符
查看>>
平面上给定n条线段,找出一个点,使这个点到这n条线段的距离和最小。
查看>>
Source Insight 3.X 标签插件v1.0发布
查看>>
百度AI生态方法论升级,AI开放平台深入7大细分领域
查看>>
Linux下配置Golang开发环境
查看>>
AI技术出海 - 阿里云GPU服务器助力旷视勇夺4项世界第一
查看>>
《Learning Scrapy》(中文版)第11章 Scrapyd分布式抓取和实时分析
查看>>
[Python]一行代码判断请求参数是否正确
查看>>
gulp前端自动化工具的快速入门案例
查看>>