Python socket servers can drop received packets on exit
by Gianni Tedesco
Let’s say we have a datagram server written in python using the shiny new
import socket import asyncio import os class PacketCounter(asyncio.DatagramProtocol): def __init__(self): self.counter = 0 def datagram_received(self, data, addr): self.counter += 1 class Writer: def __init__(self, socket_path, nr_msgs): self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM, 0) self.sock.connect(socket_path) self.sock.setblocking(False) self.sent_msgs = 0 self.nr_msgs = nr_msgs loop.add_writer(self.sock.fileno(), self.writable) self.wait = loop.create_future() def writable(self): while self.sent_msgs < self.nr_msgs: try: self.sock.send(b'Hello') self.sent_msgs += 1 except BlockingIOError: return if self.sent_msgs >= self.nr_msgs: loop.remove_writer(self.sock.fileno()) self.wait.set_result(None) loop = asyncio.get_event_loop() socket_path = 'hello' # Create the server try: os.unlink(socket_path) except FileNotFoundError: pass proto = PacketCounter() t = loop.create_datagram_endpoint(lambda :proto, family=socket.AF_UNIX, local_addr=socket_path) transport, _ = loop.run_until_complete(t) # Create the client and run it sender = Writer(socket_path, 1000) loop.run_until_complete(sender.wait) transport.close() loop.close() print('tx', sender.sent_msgs) print('rx', proto.counter)
If we run it, we get:
tx 1000 rx 841
What’s going on?
When the kernel receives a packet it gets placed in to the relevant socket’s
receive buffer, the socket becomes readable potentially causing a wakeup event
epoll_wait(), or a synchronous
In the case of a datagram socket, a
recv() will be woken up once for each
received datagram. This is how the message boundaries are preserved.
So let’s say the kernel receives 3 messages for a datagram socket. The
application will need to call
recv() 3 times to receive all of those. Or
equivalently, sleep in
poll() 3 times and do a non-blocking
In any case, when it tries to read for a fourth time, it will either sleep if
the socket is a blocking socket (the default), or it will return
BlockingIOError in python). There is no more data remaining in the
kernel’s buffer, so the socket is not ready.
Now, in python’s selector event loop (the default for UNIX-like operating
systems). When socket becomes readable, python wakes up from
(for example), eventually calls
sock.recv() and then calls
will process that one single datagram. After that we’ll go through the other
selector events and then go back to sleep in
However, since the sender has finished it’s work
exits and now we try and close everything down.
But even though I gave the socket server every possible chance to grab
everything in the kernel’s socket buffer (
Those remaining packets that could have finished a perilous journey accross the
world to get in to my kernel’s receive buffer just get dropped to the floor like so many crumpled up cigarette boxes.
Why this sucks
First of all, this confused me because I’m used to event frameworks in C, using
epoll, where the mainloop doesn’t finish an iteration until all event sources
EAGAIN, thereby draining the kernel’s socket buffer of received data
In python asyncio, however, you need to be totally explicit if you want this behaviour. But that’s okay, right? These are all valid design choices, after all.
I’m not so sure. You see, in
BaseSelectorEventLoop._read_from_self() it looks like the right kind of behaviour happens when the self-pipe becomes readable:
def _read_from_self(self): while True: try: data = self._ssock.recv(4096) if not data: break self._process_self_data(data) except InterruptedError: continue except BlockingIOError: break
But if I want this sort of behaviour for my datagram server I can’t easily mess
I have to use some kind of workaround like this:
# ... 8< ... # Create the client and run it sender = Writer(socket_path, 1000) loop.run_until_complete(sender.wait) # Now, drain the kernel socket-buffer while True: try: buf = sock.recv(64 * 1024) except BlockingIOError: break proto.datagram_received(buf, None) transport.close() loop.close() print('tx', sender.sent_msgs) print('rx', proto.counter)
But this is going to get seriously annoying if you have multiple listening sockets all over your program.
On the other hand, if I am worried that my program won’t exit because I’ll be spending forever servicing a never-ending stream of datagrams, I think that that’s a much easier problem to solve. You just close the socket, then there’s no way to keep receiving stuff.
Why not use edge-triggered epoll and implement the edge-triggered behaviour
under the hood (re-try callbacks until
BlockingIOError) without the user
having to know about it? It’s going to result in way fewer system calls, and
way fewer gotchas like this. And if you want the old behaviour it’s easy to
just break out of the loop immediately by calling
There will be concerns about “starvation” but starvation can be easily avoided
by putting ready file-descriptors in to a “ready queue” and looping over the
list calling each callback in round-robin (hell, priority-queue for all I care)
fashion. Then you can just
epoll_wait(..., 0) after some number of iterations
has been done but the ready-queue is still non-empty. That will just get you
any new fd’s which became writable in the meantime and you can add those to the
end of the “ready queue.”
Seriously guys… come on :)tags: python - sockets - asyncio