python: aiohttp ## 简单使用 ``` async with aiohttp.request('GET', 'https://api.github.com/events') as resp: json = await resp.json() print(json) ``` ## 使用session完成完整的get/post示例 ``` async with aiohttp.ClientSession() as session: async with session.get('https://api.github.com/events') as resp: print(resp.status) print(await resp.text()) ``` 完整的示例,包含header, timeout和容错 ```python try: # post示例 headers["Content-Type"] = "application/json" async with aiohttp.ClientSession(headers=headers) as session: async with session.post(url, data=json.dumps(body), timeout=3) as resp: if resp.status != 200: print("failed:{}".format(resp.status)) return # html = await resp.text() js_dict = await resp.json() except asyncio.TimeoutError: print("运行失败 超时") except Exception as e: print("运行失败 500", str(e)) ``` ## 使用https 双向验证 代码应该系酱: ``` import ssl import aiohttp sslcontext = ssl.create_default_context(cafile='/path_to_client_root_ca') sslcontext.load_cert_chain(certfile=cert_file, keyfile=client_key) conn = aiohttp.TCPConnector(ssl_context=sslcontext) async with aiohttp.ClientSession(connector=conn) as session: pass ``` 尚未测试。等待使用。 来自 大脸猫 写于 2018-01-26 12:13 -- 更新于2022-11-22 17:27 -- 0 条评论