scaramanga

The hacker with the supernumerary nipple

16 June 2019

Python socket servers can drop received packets on exit

by Gianni Tedesco

Let’s say we have a datagram server written in python using the shiny new asyncio module:

import socket
import asyncio
import os

class PacketCounter(asyncio.DatagramProtocol):
    def __init__(self):
        self.counter = 0
    def datagram_received(self, data, addr):
        self.counter += 1

class Writer:
    def __init__(self, socket_path, nr_msgs):
        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
        self.sock.connect(socket_path)
        self.sock.setblocking(False)
        self.sent_msgs = 0
        self.nr_msgs = nr_msgs
        loop.add_writer(self.sock.fileno(), self.writable)
        self.wait = loop.create_future()

    def writable(self):
        while self.sent_msgs < self.nr_msgs:
            try:
                self.sock.send(b'Hello')
                self.sent_msgs += 1
            except BlockingIOError:
                return
        if self.sent_msgs >= self.nr_msgs:
            loop.remove_writer(self.sock.fileno())
            self.wait.set_result(None)

loop = asyncio.get_event_loop()

socket_path = 'hello'

# Create the server
try:
    os.unlink(socket_path)
except FileNotFoundError:
    pass
proto = PacketCounter()
t = loop.create_datagram_endpoint(lambda :proto,
                                  family=socket.AF_UNIX,
                                  local_addr=socket_path)
transport, _ = loop.run_until_complete(t)

# Create the client and run it
sender = Writer(socket_path, 1000)
loop.run_until_complete(sender.wait)
transport.close()
loop.close()

print('tx', sender.sent_msgs)
print('rx', proto.counter)

If we run it, we get:

tx 1000
rx 841

Hrm…

What’s going on?

When the kernel receives a packet it gets placed in to the relevant socket’s receive buffer, the socket becomes readable potentially causing a wakeup event in poll(), epoll_wait(), or a synchronous read() or recv().

In the case of a datagram socket, a recv() will be woken up once for each received datagram. This is how the message boundaries are preserved.

So let’s say the kernel receives 3 messages for a datagram socket. The application will need to call recv() 3 times to receive all of those. Or equivalently, sleep in poll() 3 times and do a non-blocking recv() each time.

In any case, when it tries to read for a fourth time, it will either sleep if the socket is a blocking socket (the default), or it will return EAGAIN error (or raise BlockingIOError in python). There is no more data remaining in the kernel’s buffer, so the socket is not ready.

Now, in python’s selector event loop (the default for UNIX-like operating systems). When socket becomes readable, python wakes up from epoll_wait() (for example), eventually calls _SelectorDatagramTransport._read_ready which performs the sock.recv() and then calls protocol.datagram_received() That will process that one single datagram. After that we’ll go through the other selector events and then go back to sleep in epoll_wait().

However, since the sender has finished it’s work loop.run_until_complete() exits and now we try and close everything down.

But even though I gave the socket server every possible chance to grab everything in the kernel’s socket buffer (transport.close(), loop.close()). Those remaining packets that could have finished a perilous journey accross the world to get in to my kernel’s receive buffer just get dropped to the floor like so many crumpled up cigarette boxes.

Why this sucks

First of all, this confused me because I’m used to event frameworks in C, using epoll, where the mainloop doesn’t finish an iteration until all event sources have hit EAGAIN, thereby draining the kernel’s socket buffer of received data before exiting.

In python asyncio, however, you need to be totally explicit if you want this behaviour. But that’s okay, right? These are all valid design choices, after all.

I’m not so sure. You see, in BaseSelectorEventLoop._read_from_self() it looks like the right kind of behaviour happens when the self-pipe becomes readable:

def _read_from_self(self):
while True:
    try:
	data = self._ssock.recv(4096)
	if not data:
	    break
	self._process_self_data(data)
    except InterruptedError:
	continue
    except BlockingIOError:
	break

But if I want this sort of behaviour for my datagram server I can’t easily mess around with _SelectorDatagramTransport._read_ready and such-like.

I have to use some kind of workaround like this:

# ... 8< ...
# Create the client and run it
sender = Writer(socket_path, 1000)
loop.run_until_complete(sender.wait)

# Now, drain the kernel socket-buffer
while True:
    try:
        buf = sock.recv(64 * 1024)
    except BlockingIOError:
        break
    proto.datagram_received(buf, None)
transport.close()

loop.close()

print('tx', sender.sent_msgs)
print('rx', proto.counter)

But this is going to get seriously annoying if you have multiple listening sockets all over your program.

On the other hand, if I am worried that my program won’t exit because I’ll be spending forever servicing a never-ending stream of datagrams, I think that that’s a much easier problem to solve. You just close the socket, then there’s no way to keep receiving stuff.

A solution

Why not use edge-triggered epoll and implement the edge-triggered behaviour under the hood (re-try callbacks until BlockingIOError) without the user having to know about it? It’s going to result in way fewer system calls, and way fewer gotchas like this. And if you want the old behaviour it’s easy to just break out of the loop immediately by calling loop.stop().

There will be concerns about “starvation” but starvation can be easily avoided by putting ready file-descriptors in to a “ready queue” and looping over the list calling each callback in round-robin (hell, priority-queue for all I care) fashion. Then you can just epoll_wait(..., 0) after some number of iterations has been done but the ready-queue is still non-empty. That will just get you any new fd’s which became writable in the meantime and you can add those to the end of the “ready queue.”

Seriously guys… come on :)

tags: python - sockets - asyncio