Python Coroutines: Words of Advice
The Initial Project
Around 2018 Sirepo had outgrown Celery as a job management system. We decided to implement a tailored solution to our problem, which involves jobs running from a few seconds to a few days. We also had a requirement to integrate with job managers on 3rd party supercomputers with independent authentication.
To manage the events in a job manager, we would need to manage asynchrony. Python supports several mechanisms for asynchrony: coroutines, threads, and multiprocessing. We had used Flask, which relies on threads (and multiple processes) for asynchrony. Our experience with Flask was not great.
We also reasoned that multi-threading is hard to implement correctly. With Python, the global interpreter lock always looms large in threading discussions, anyway. We thought fully separate memory spaces (processes) or cooperative multi-tasking (coroutines) would be more reliable.
We had good experience in BOP relying on PostgreSQL for locking and inter-process communication for jobs. However, Sirepo is different. It’s all about the jobs whereas BOP applications are mostly about the database.
We considered a publish-subscribe database like Redis to solve the queuing/locking problem with multiple processes. Even with pub-sub, we realized we would need a process manager so that jobs could be started, awaited on, cancelled, and timed out. An important part of this change was being able to charge for different levels of CPU resources.
Cancellation and timeouts led me to Nathaniel J Smith’s Timeouts and cancellation for humans. It’s well-written and worth a read. Nathaniel and I struck up a conversation about coroutines. This further convinced me that coroutines were the way to go, especially because they were cancellable. I did not have experience with modern day coroutines so we hired Nathaniel who helped us get started on the project. This was a good call. I am very grateful for Nathaniel for helping me (in particular) understand the ins and outs of Python’s asyncio.
Fast forward six years, the Sirepo job system has been in production for several years. Suffice it to say, we have learned a thing or two about job management, coroutines, cancellations, timeouts, locking, etc. This article collects our experience.
Job Supervisor and Coroutines
Sirepo jobs run on our cluster and at NERSC, which uses Slurm for job management. On our cluster, we control CPU and memory utlization with containers. We have paying customers with few complaints about the job system. The “one-click” 3rd party supercomputer integration is a godsend to many of our users. It is very nice to have happy customers.
And, we have had many issues with the job supervisor. Many are the natural outcome of emergent design, and others are intrinsic to coroutines in combination with Python’s easier to ask forgiveness than permission approach to cancellation, timeouts, errors, and other exceptional conditions.
We have expanded our use of coroutines to other projects. All new projects, which require asynchrony, use Tornado. We replaced Flask with Tornado for the API server so now all services rely on Tornado. Coroutines work well. We have not encountered many bugs with Tornado or asyncio. We think at this point we also know a bit better how to write reliable services based on coroutines.
Cooperative vs Preemptive Multitasking
In order to understand coroutines, we need to separate concurrency from parallelism, and cooperative multitasking preemptive multitasking.
For most programmers, concurrency implies preemptive multitasking. Threads and multiprocessing are the most common form of concurrent programming we encounter. Coroutines while being an old invention in programming, only recently started to become popular. They weren’t added (formally) to Python until 2015. Programming language courses do not emphasize them. This is why they are tricksy, and why we can easily confuse concurrency with parallelism.
I think the confusion starts with the word concurrent, which Merriam-Webster defines as “operating or occurring at the same time.” Python coroutines execute concurrently, but they do not execute “at the same time”.
A better definition is found on the Wikipedia Concurrency page:
In computer science, concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order, without affecting the outcome. This allows for parallel execution of the concurrent units.
The word allows is key. Concurrency allows for out of order execution. Coroutines can execute out of order, but their execution does not happen simultaneously as with (preemptible) threads.
Parallelism is overlapping execution. Coroutines do not run in parallel. They execute in a single Python thread.
Coroutine execution order is controlled by the asyncio event loop Coroutines are cooperative, not preemptive multitasking. That’s their main attraction: there can be no race conditions.
Rob Pike’s talk Concurrency is not parallelism is worth a watch. The caveat here is that Goroutines are not coroutines, because they are allowed to execute in parallel, which Python coroutines cannot. Still, the talk explains concurrency and parallelism clearly and with some good examples.
Concurrency Requires Logging
Let’s move on to something practical: Debugging concurrent code is hard, precisely because execution is out of order. Good logging is essential in order to make it easy to debug, especially in production.
We have had numerous failures due to concurrency, most of which are easily explainable in hindsight. Debugging consists of staring at logs for hours on end, because the difficult to find defects only occur in production, in real-time. They are difficult to reproduce.
Over the years, we have had to improve logging in the job system. Here are some guidelines we use:
- Always catch and log exceptions in coroutines with sufficient context. Sufficient will become apparent over time.
- Use something like
__repr__
or the more robust pkdebug_str to create consistent context for objects in log message. - Include detailed logs with timestamps in issues (bug reports). In public repos, like Sirepo, use a log trimmer to avoid exposing personally identifiable information (PII).
Here’s how job_supervisor._Op
logs its context:
def pkdebug_str(self):
def _internal_error():
if not self.get("internal_error"):
return ""
return ", internal_error={self.internal_error}"
return pkdformat(
"_Op({}{}, {:.4}{})",
"DESTROYED, " if self.get("is_destroyed") else "",
self.get("op_name"),
self.get("op_id"),
_internal_error(),
)
Track Object Life Cycle
Note the is_destroyed
flag in the previous code snippet. In
shared-memory, asynchronous code, an object can be destroyed by one
coroutine while it still is being used by another. This is the
cause of numerous failures: using state when it is no longer
valid.
For example, in the supervisor, a job might be canceled asynchronously by an API call triggered by a user pressing a cancel button. The coroutine handling the request that cancels the job is not the coroutine which is monitoring the job. The coroutine monitoring the job holds a copy of the job object, which is destroyed by the coroutine canceling the job.
That’s why in Sirepo asynchronous code, objects that are referenced
concurrently by two coroutines are destroyed explicitly. Coroutines
are obligated to check object’s is_destroyed
flag to determine the
object’s validity after an await
. This means state management can
get complicated.
In _send
,
is_destroyed
is checked:
async def _send(op):
if not await op.prepare_send() or op.is_destroyed:
return None, False
if not op.send():
return None, False
if (r := await op.reply_get()) is None:
return None, False
This check ensures that op
has not been canceled. Once checked, we
know that op
can’t be destroyed until after the await
op.reply_get
.
asyncio.Task.cancel
That’s a lot of code and logic just to send and get a reply. Why not
just
Task.cancel
the _send
coroutine?
After all, as the documentation says, “Tasks can easily and safely be cancelled.”
We
did
not
find
Task.cancel
to
be easy to manage. It
is very hard to write correct cancellation code.
We found one defect
related to cancel in Tornado itself. Admittedly, Tornado was written
well before asyncio, and before tasks could be canceled.
Cancelling is hard. For example, you can only cancel a concurrent.futures.Future when it is pending. Already executing futures cannot be canceled, even when they are waiting on another coroutine.
Another example is threading.Timer.cancel, which “will only work if the timer is still in its waiting stage.”
You cannot cancel a
threading.Thread
:
“threads cannot be destroyed, stopped, suspended, resumed, or interrupted.”
And, you guessed it, you can’t kill a Goroutine, a Rust thread, a Java
thread, etc. Indeed, Java had Thread.stop
,
and it was removed for
good reasons.
That’s why canceling a Python coroutine is problematic, too. Cancelling coroutines is hard. In Python 3.11, Task.uncancel and Task.cancelling were added, which to me is yet another clue that it is hard to implement cancelling. Piling on more methods, doesn’t fix the fundamental issue.
Use asyncio.Queue
We have settled on
asyncio.Queue
as the sole means of communication and synchronization in
coroutines. The advantage of Queues over
Locks
and
Events
is that a Queue allows you to pass values. As you saw in the example
above,
reply_get
returns None
when there is no reply, and that only happens when the send was
canceled. This is a clean way to communicate out-of-band state. With
Events and Locks, there’s no value – just one state change. This
means you have to have some other value to release a Lock or Event in
a way that clearly communicates this alternative state.
Python 3.13 added Queue.shutdown, which makes this communication even clearer. Here’s the code in pykern.api.client, to destroy and API call:
if x := getattr(self._reply_q, "shutdown", None):
x.shutdown(immediate=True)
else:
# Inferior to shutdown, but necessary pre-Python 3.13
self._reply_q.put_nowait(None)
And, the corresponding code in: result_get:
try:
rv = await self._reply_q.get()
except Exception as e:
if (x := getattr(asyncio, "QueueShutDown", None)) and isinstance(e, x):
raise util.APIDisconnected()
raise
This code allows for clean communication that the
APIDisconnected
, and the API call did not complete.
Coroutines Block on I/O
Calls to open
, read
, etc. are blocking, that is, they block all
coroutines until the operating system fulfills the I/O
operation(s). You need parallelism in order to implement asynchronous
I/O in Python. (I have no idea why they call the coroutine module
asyncio
, since it does not support I/O.)
A Tornado specific problem is that reply functions block and are not thread safe. This means a single response blocks the entire server. Sirepo has an outstanding issue with sending data without blocking. With websockets, we will eventually chunk messages to avoid this issue.
We use aiohttp
and
aiofiles
to avoid
blocking on other types of I/O. The implementation uses
ThreadPoolExecutor
to parallelize the I/O operations.
To achieve true parallelism in production, we run
Tornado in multiple processes
behind a proxy. This is true for any asyncio
-based web server.
Backfitting is Hard
When an existing function is converted into a coroutine, all callers have to be modified. This makes updating exiting code very difficult. This is by design. It also is annoying, let’s face it.
One way to fix this problem is to create a new function that calls the async function with asyncio.run, which allows non-async functions to call coroutines. This allows you to deprecate the usage and migrate your code slowly. Here’s a simple example from Sirepo:
def call_api_sync(self, *args, **kwargs):
import asyncio
return asyncio.run(self.call_api(*args, **kwargs))
This allows non-async code to use call_api
, which is async.
Programmers Infer Parallelism
The way coroutines work is more than semantics. It directly affects what is going on in programs that use them. I think programmers infer parallelism from the asyncio objects, e.g. Lock and Semaphore. These are words we learn in operating system courses. Coroutines execute in a single thread.
When you write some asyncio code that reads from a file in an
asyncio-based web server such as Tornado, no other coroutine can
preempt the read loop unless there is a call to await
.
This may seem obvious in the context of this article, but the language of preemption is implied in the asyncio documentation:
asyncio synchronization primitives are designed to be similar to those of the threading module with two important caveats:
- asyncio primitives are not thread-safe, therefore they should not be used for OS thread synchronization (use threading for that);
- methods of these synchronization primitives do not accept the timeout argument; use the asyncio.wait_for() function to perform operations with timeouts.
To me, the most important caveat is that coroutines are not parallel. The language implies an equivalence to threads, which are totally unrelated. All asyncio operations are not thread safe except call_soon_threadsafe.
Yield to the Event Loop
You have to pay attention when coding coroutines. Cooperative
multitasking requires yielding to the event loop whenever real work is
being done. By real work, this could be blocking I/O or
computation. Blocking I/O is solved by aiofiles
(discussed
above).
If a coroutine has a loop, it needs yield to the event loop in its
loop unless the loop is “fast” or calls await
in the loop. The
meaning of “fast” is obviously in the eye of the beholder.
In the job_supervisor
we yield to the event loop when garbage
collecting old jobs:
for u, jids in (await _uids_to_jids(too_old, qcall)).items():
with qcall.auth.logged_in_user_set(u):
for j in jids:
_purge_job(jid=j, too_old=too_old, qcall=qcall)
await sirepo.util.yield_to_event_loop()
sirepo.util.yield_to_event_loop
is wrapper, which allows us to document this magic:
async def yield_to_event_loop():
await asyncio.sleep(0)
asyncio.sleep(0)
has special semantics:
Setting the delay to 0 provides an optimized path to allow other tasks to run. This can be used by long-running functions to avoid blocking the event loop for the full duration of the function call.
(Nit: I hate tricks like this. They should just have provided
asyncio.yield_to_event_loop
.)
await
does not Always Yield
The use of await
does mean “yield to the event loop”. In the
following code await some_func()
executes completely synchronously:
async def some_func():
for i in range(1000):
await sync_func()
async def sync_func():
pass
This code would execute the same in the event loop if we removed
async
and await
keywords. This may seem obvious in this simple
example, it’s not obvious in any non-trivial coroutine.
This is important, because unless you know exactly what the coroutine
being awaited on does, there’s no guarantee that a loop which contains
an await
actually releases the processor. This means you have to
break the abstraction of any coroutines you call, or you have to
always call yield_to_event_loop
. This would be annoying, and it’s a
real problem in asyncio-based code. This is a Python specific issue,
and is a design flaw in my opinion.
Tornado.on_message
When we first wrote the supervisor, we assumed that
WebSocketHandler.on_message
is a coroutine, because it can be defined that way. This is another
example of inferring something the behavior of coroutines, which I
believe would true for thread-based web server.
There’s an
open issue about
this in Tornado so we aren’t the only ones who made this assumption.
The fix is to simply create a task in on_message
as is
done in pykern.api.server
:
async def on_message(self, msg):
try:
# WebSocketHandler only allows one on_message at a time
pykern.pkasyncio.create_task(
self.pykern_api_connection.handle_on_message(msg)
)
except Exception as e:
pkdlog("exception={} stack={}", e, pkdexc())
I want to emphasize again the importance of logging at this level. If
there is an exception in handle_on_message
, a stack trace will be
logged. The exception is not raised, because there’s no value in
having Tornado process the exception. WebSocket messages are
simple. There are no replies so there’s nothing for Tornado to do, and
it has no useful context for this problem (in our code).
Debugging
In our experience, debugging coroutines is just as hard as threads,
and possibly harder. A coroutine is not visible to the operating
system. If a thread is an infinite loop, for example, you can easily
see that with a operating system thread monitoring tool like
top
. Coroutines
can’t be monitored externally. All you see is the whole process is
busy computing.
Coroutines do not have race conditions, which eliminates one very difficult class of defect. However, they can deadlock. When two or more coroutines are deadlocked, there’s no insight with normal operating system tools. There are tools to see threads which are blocked, which greatly helps debugging.
The way we debug is to have very good logging. Every new hard to debug defect usually results in improved log messages either to include more context or messages in places that were missing.
Sirepo uses
pykern.pkdebug
which allows (real-time) control of logging on a per line, function,
or module basis. A regular expression controls the output. If the
controlling regular expression is not set (normal case), the log
function does nothing, which is efficient. This type of logging can be
useful in particularly difficult defects, where you don’t want to
flood the production logs when the system is running normally.
asyncio
has a static environment variable
PYTHONASYNCIODEBUG
,
which logs interesting information about coroutines, e.g. long running
coroutines, not awaited coroutines (common problem), and exceptions
raised when calling asyncio APIs from the wrong thread.
Summary
asyncio
is part of Python now, and more and more code will use
it. Hopefully this article helps you write more effective
coroutines. It’s not as easy as it looks, but as you develop your own
coroutine coding patterns, it becomes manageable.
Other people have written extensively about coroutines. Here are some useful references in alphabetical order: