Предположим, у меня есть некоторые задачи, работающие асинхронно. Они могут быть полностью независимыми, но я все же хочу установить точки, в которых задачи будут приостанавливаться, чтобы они могли выполняться одновременно.
Как правильно выполнять задачи одновременно? В настоящее время я использую await asyncio.sleep(0), но я чувствую, что это добавляет много накладных расходов.
import asyncio
async def do(name, amount):
for i in range(amount):
# Do some time-expensive work
print(f'{name}: has done {i}')
await asyncio.sleep(0)
return f'{name}: done'
async def main():
res = await asyncio.gather(do('Task1', 3), do('Task2', 2))
print(*res, sep='\n')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Выход
Task1: has done 0
Task2: has done 0
Task1: has done 1
Task2: has done 1
Task1: has done 2
Task1: done
Task2: done
Если бы мы использовали простые генераторы, пустое значение yieldприостановило бы выполнение задачи без каких-либо накладных расходов, но пустое значение awaitнедопустимо.
Как правильно установить такие точки останова без накладных расходов?
Решение проблемы
Как упоминалось в комментариях, обычно сопрограммы asyncio автоматически приостанавливаются при вызовах, которые блокируются или засыпают в эквивалентном синхронном коде. В вашем случае сопрограмма привязана к процессору, поэтому ожидания блокирующих вызовов недостаточно, ей необходимо время от времени передавать управление циклу событий, чтобы остальная часть системы могла работать.
Явные выходы не редкость в совместной многозадачности, и использование await asyncio.sleep(0)для этой цели будет работать по назначению, оно сопряжено с риском: слишком часто спите, и вы замедляете вычисления из-за ненужных переключений; спите слишком редко, и вы перегружаете цикл событий, проводя слишком много времени в одной сопрограмме.
Решение, предоставляемое asyncio, состоит в том, чтобы разгрузить код, привязанный к ЦП, в пул потоков с помощью run_in_executor. Ожидание автоматически приостанавливает сопрограмму до тех пор, пока не будет выполнена задача с интенсивным использованием ЦП, без какого-либо промежуточного опроса. Например:
import asyncio
def do(id, amount):
for i in range(amount):
# Do some time-expensive work
print(f'{id}: has done {i}')
return f'{id}: done'
async def main():
loop = asyncio.get_event_loop()
res = await asyncio.gather(
loop.run_in_executor(None, do, 'Task1', 5),
loop.run_in_executor(None, do, 'Task2', 3))
print(*res, sep='\n')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Комментариев нет:
Отправить комментарий