Python 在 3 中我在版本 5 中添加了 asyncio,但在版本 3 中使用后似乎并不容易使用7、asyncio 有了很大的改进,用起来还不错,asyncio 引入了 async await 语法,有点类似于 nodejs。
最简单的例子:
import asyncioasync def main():print('hello ..') await asyncio.sleep(1) print('...world!')# python 3.7+asyncio.run(main())例如,PHP 的 Swoole 扩展也是基于协程的,但使用起来不如 Python 方便,而且 Swoole 不提供异步 await 语法。 Asyncio 提供了 Create Task 来创建协程任务,可以与 Swoole 的 go 函数进行比较
async def main():task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at ") await task1 await task2 print(f"finished at ")asyncio 还提供了许多帮助程序函数,例如 asyncioGather 用于同时运行多个协程任务,相当于之前使用 Swoole 实现的 groupwait。 对应的 swoole 通道也有一个版本的 asyncioqueue。还有多个队列,例如 priorityqueue 和 lifoqueue,因此 Asyncio 也完全适合 CSP 编程模型。
使用 asyncio 创建 TCP 服务:
import asyncioasync def handle echo(reader, writer): while true: 确定 EOF 终止符以关闭链接 if readerat_eof():break data = await reader.readline() message = data.decode() addr = writer.get_extra_info('peername') print(f"received from ") print(f"send: ") writer.write(data) await writer.drain() print("close the connection") writer.close()async def tcp_server_task():server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'serving on ') async with server: await server.serve_forever()async def main():task = asyncio.create_task(tcp_server_task())await taskasyncio.run(main())现在我们假设我们需要建立一个集中的TCP日志采集服务,并将TCP端口接收到的日志存储在PostgreSQL或者MySQL中,日志请求的数量会比较大,所以我们需要有良好的性能,使用以下简单的实现方式:
import asynciofrom psycopg2.pool import **connectionpoolfrom datetime import datetimeclass asynctask: def __init__(self): self.task_queue = asyncio.queue() self.db_pool = **connectionpool( 2, 20, dbname='echoes', host='192.168.2.10', user='twn39', password='tangweinan') async def handle echo(self, reader, writer): while true: 确定 EOF 终止符以关闭链接 if readerat_eof():break data = await reader.readline() message = data.decode() self.task_queue.put_nowait(message.strip())writer.write("ok".encode())await writer.drain() print("close the connection") writer.close() async def tcp_server_task(self): server = await asyncio.start_server(self.handle_echo, '127.0.0.1', 8888) addr = server.sockets[0].getsockname() print(f'serving on ') async with server: await server.serve_forever() async def consume_task(self, name: str): while true: data = await self.task_queue.get() conn = self.db_pool.getconn() print(conn) cur = conn.cursor() cur.execute("insert into logs (level, message, created_at) values (%s, %s, %s)", (200, 'log test', datetime.now() conn.commit() self.db_pool.putconn(conn) print(f'work: consume data: ') self.task_queue.task_done()if __name__ == '__main__': async def main():async_task = asynctask() task = asyncio.create_task(async_task.tcp_server_task())task1 = asyncio.create_task(async_task.consume_task('worker-1')) task2 = asyncio.create_task(async_task.consume_task('worker-2')) await asyncio.gather(task, task1, task2, return_exceptions=true) asyncio.run(main())程序中引入了 Queue,它可以异步写入数据库以提高高峰请求期间的稳定性,我们创建了三个主要任务,一个是监听 TCP 端口,获取日志数据,将日志放入队列中,另外两个是消耗队列,当队列中有数据时写入数据库, 并在没有数据时等待数据。
* 有两个无限循环(实际上是三个),程序运行时,相当于并行,但实际上它是单线程的,协程可以中断,当多个协程运行时,其实函数之间切换,但切换时间很短,所以反映在并行上。 多线程编程是由操作系统实现的,因此在它们之间切换的成本相对较高,而协程则由用户决定何时切换,也称为用户端线程。