python 一次性大量请求api

原本的api:

def sendRequest(param):                response = sess.post(url,data=param,verify=False)                return response.json()

一次api需要0.9秒,如果1000次的话,就需要900秒;
这裏有两个做法:多线程打api,异步打api;
通常这种多io操作的情况下,使用异步打api;

    try:            # sess = requests.Session()            successCount= 0            failCount = 0            payloadList = []            for params in paramsList:                ownerID = params['ownerID']                ownerName = params['ownerName']                doveID = params['doveID']                orgRing = params['orgRing']                dovecoteID = params['dovecoteID']                rfID = params['rfID']                raceID = params['raceID']                payload = {                'updatetype' : 'ownerDove',                'ownerID' : ownerID,                'ownerName' : ownerName,                'doveID' : doveID,                'orgRing' : orgRing,                'dovecoteID' : dovecoteID,                'rfid' : rfID,                'raceID' : raceID                }                payloadList.append(payload)                # payloadList = payloadList[:200]            async def sendRequest(session,param):                async with session.post(url,data=param,verify_ssl =False) as response:                    return await response.json()            async def loopAskapi():                async with aiohttp.ClientSession() as session:                    tasks = []                    for payload in payloadList:                        # task = asyncio.ensure_future(sendRequest(session, payload))                        task = sendRequest(session,payload)                        tasks.append(task)                    responses = await asyncio.gather(*tasks)                    print(len(responses))            def start_loop(loop):                asyncio.set_event_loop(loop)                loop.run_until_complete(loopAskapi())            loop = asyncio.new_event_loop()            t = threading.Thread(target=start_loop, args=(loop,))            t.start()            t.join()            return successCount            # print(responseList)        except Exception as e:            print(e)            return 0

这样使用postman测试是20s打完;
另外还有使用requests.Session的做法:时间差不多:

           sess = requests.Session()            successCount= 0            totalCallCount = 0            for params in paramsList:                ownerID = params['ownerID']                ownerName = params['ownerName']                doveID = params['doveID']                orgRing = params['orgRing']                dovecoteID = params['dovecoteID']                rfID = params['rfID']                raceID = params['raceID']                payload = {                'updatetype' : 'ownerDove',                'ownerID' : ownerID,                'ownerName' : ownerName,                'doveID' : doveID,                'orgRing' : orgRing,                'dovecoteID' : dovecoteID,                'rfid' : rfID,                'raceID' : raceID                }                r = sess.post(url, data=payload, verify=False)                j = json.loads(r.text)                status = j['results']                print('status', status)                if status !=0:                    successCount += 1                totalCallCount+=1                print('totalCallCount:',totalCallCount)            sess.close()            return successCount

关于作者: 网站小编

码农网专注IT技术教程资源分享平台,学习资源下载网站,58码农网包含计算机技术、网站程序源码下载、编程技术论坛、互联网资源下载等产品服务,提供原创、优质、完整内容的专业码农交流分享平台。

热门文章