Reactor Pattern for Scaling I/O Bound Server

28 Dec 2019


Let’s say you run a chat server that becomes extremely popular, due to the fact that Kim Kardashian decided that your incredible chat would be the new platform for her to keep in touch with her fans. All of a sudden you realize you have 10K concurrent connections, all trying to talk with Kim. How would one simple machine handle it?

Image We can think about it in two aspects:

Contents


First solution: a thread per connection using Blocking I/O

The echo server below does the following:

  1. Wait for incoming connections on a loop
  2. For each incoming connection, start its own thread
    • On the thread, receive and echo messages in an infinite loop

\ When we create a socket, its default state is blocking mode (you can validate by using socket.getblocking()).

In blocking mode, operations block until complete or the system returns an error (such as connection timed out).

It means that whenever the socket is waiting for I/O (for example, recv or send functions), it would block until the I/O is available (or some error occurred).

So in this implementation, the function echo simply blocks the thread until data is received (on the recv function). So if we want to have 1K concurrent connections, we would have to create 1K threads.

drawing

This is extremely inefficient, due to several reasons:

  1. Context switching threads take precious CPU time. The more threads we have the more time we waste on switching instead of running our code.
  2. Most of the threads would block and do nothing while waiting for IO, yet the CPU will continue to run, and execute the blocking threads. Waist of CPU time here, also.
  3. let’s say we got a message on thread number 1000. Yet, we would have to check all threads 1-999 before (let’s say that our scheduling mechanism is by the thread number). That means we would wait a long time until we can respond to the user. That’s bad. Also, think about the case where all the previous 999 threads are just blocking and wasting our CPU time. total shame!
  4. threads consume space on our limited RAM.

So even though this method is really simple for the developer, since we the thread encapsulate our logic, (assuming we are avoiding all the problems multithreading is bringing to the table), it’s not efficient at all.

BlockingEchoServer.py
import socket
import threading

def echo(conn):
   while True:
       # blocks the thread until data is received
       data = conn.recv(1024)
       if not data: break
       conn.sendall(data)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.bind(('localhost', 1234))
   s.listen(1000)
   while True:
       conn, addr = s.accept()
       threading.Thread(target=echo, args=(conn,)).start()
BlockingEchoClient.py
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.connect(('localhost', 1234))
   while True:
       s.sendall(input().encode('utf-8'))
       print(s.recv(1024))


Second solution: Single Threaded Busy-Waiting on Multiple Non-Blocking I/O sockets

Since one of the issues with the previous implementation was that each thread needed to block and wait until the recv function would return with some data, let’s use another mechanism of accessing sockets: Non-Blocking sockets.

In non-blocking mode, operations fail (with an error that is, unfortunately, system-dependent) if they cannot be completed immediately: functions from the select can be used to know when and whether a socket is available for reading or writing.

Let’s ignore the select mentioning and save it for later. We would also remove the threads - they are not giving us any special benefits (on the contrary, only consume more RAM and waste time on context switches). \ The server below does the following:

  1. create a list of sockets
  2. loop over the sockets
    • for each non-blocking socket, try to read and write. These actions would return immediately (even if we have ready IO or we don’t)

Image

\ In the example below we can witness the implementation. We first create a non-blocking server socket, that would accept incoming connections. It’s worth mentioning what happens in case of return without a new connection (99% of the time) - in this case, the accept function(or recv/send/…) raise a BlockingIOError Exception. That leads to the fact that our control flow is based on exceptions. That is a very busy busy-loop. After we add some connections, we would iterate over them, and try to read a message and write it back (echo). On the recv function, How do we distinguish the case of connection close and “no data available”? when the connection is closed, recv would return an empty string, b””. when there is no data available, recv would raise BlockingIOError. We need to remember which connections are closed and remove them before the next iteration. generally speaking, removing items while still in loop can cause some problems (depending on the iterated data structure) so i would advise against it.

BusyWaitNonBlockingEchoServer.py

import socket

sockets = set()
remove_pending = set()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
   server.bind(('localhost', 2002))
   server.listen(1000)
   server.setblocking(False)      # set Non-Blocking socket
   while True:
       try:
           conn, addr = server.accept()
           conn.setblocking(False)
           sockets.add(conn)
       except BlockingIOError: # [Errno 35] Resource temporarily unavailable - indicates that "accept" returned without results
           pass

       remove_pending.clear()
       for conn in sockets:
           try:
               data = conn.recv(1024)
               if not data: # connection closed
                   remove_pending.add(conn)
               else:
                   print(data)
                   conn.sendall(data)
           except BlockingIOError:  # recv/send return without data.
               pass

       # remove closed connections
       for conn in remove_pending:
           sockets.remove(conn)

BusyWaitNonBlockingEchoClient.py

import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.connect(('localhost', 2002))
   while True:
       s.sendall(input().encode('utf-8'))
       print(s.recv(1024))

\ So why is this solution still really inefficient?

  1. We would go over all of the sockets, all the time, regardless of their state. For each socket, we would make at least one system call, and if we have 10K sockets, we iterate all over them, only to find a message waiting impatiently on the socket #9K. That means we are keeping incoming messages wait, and spending precious time and effort just checking the sockets state.
  2. Even though most of the time we just poll on each socket to check its condition, we constantly are using the CPU. Meaning - 99% of our CPU time will be spent polling instead of executing our server logic.


So after considering everything above, it leads us to a secret wish - What if we would have a magic function, that given the array of sockets above, would return me only the sockets that are ready for IO operation (read/write)? that would be magnificent.

Let’s have an introduction with select.


Third solution: (Single Threaded) Synchronous Event Demultiplexer

Its name may differ on various operating systems, but the concept is the same - When you want to monitor multiple files (pipes/sockets/…) without blocking - you call select.


The Select System Call

man select
SYNOPSIS

#include <sys/select.h>
int
     select(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict errorfds, struct timeval *restrict timeout);

DESCRIPTION
select() and pselect() allow a program to monitor multiple file
       descriptors, waiting until one or more of the file descriptors become
       "ready" for some class of I/O operation (e.g., input possible).  A
       file descriptor is considered ready if it is possible to perform a
       corresponding I/O operation (e.g., read(2), or a sufficiently small
       write(2)) without blocking.  

select() examines the I/O descriptor sets whose addresses are passed in readfds, writefds, and errorfds to see if some of their descriptors are ready for reading, are ready
     for writing, or have an exceptional condition pending, respectively. On return, select()
     replaces the given descriptor sets with subsets consisting of those descriptors that are ready for the requested operation.  select() returns the total number of ready
     descriptors in all the sets.

select is a system call which monitors the state of file descriptors regarding I/O, and return when they are ready for read/write/(error).
select has 3 sets of files descriptors as inputs, and when it returns, it would change the input sets to indicate the output: 1. readfds - set of files descriptor to be monitored. If any of them is ready for read, select would include it in the output set. 2. writefds - set of files descriptor to be monitored. If any of them is ready for write, select would include it in the output set. 3. errorfds - set of files descriptor to be monitored. If any of them is has an error condition, select would include it in the output set.
select can block until a relevant event occurs (read/write on monitor file descriptor).

Instead of select (that has its limitations - the number of monitored file descriptors is finite (1024), the file descriptor number must be less than 1024, the time complexity is O(n)) - we have better syscalls that allow us to check whether I/O is possible for a set of file descriptors: - select (all platforms) - poll (most POSIX platforms) - epoll (Linux) - uses red-black tree to store the monitored file descriptors - kqueue (FreeBSD, macOS) - IOCP - Input/output completion port (Windows) - /dev/poll - Solaris

Python Library Implementation

Python library offers us two modules that support synchronous I/O multiplexing: select and selectors.

while select is more low level and expose you to the specific select-family syscalls, selectors is a higher level module that:

  1. Choose the best implementation on your system, roughly epoll|kqueue|devpoll > poll > select
  2. Defines a BaseSelector class, that allows us to register/unregister file descriptor for the specific IO operation we would like to monitor. Spare us the need for saving sets of read/write file descriptors.

In the next sections we would implement Echo Server and Chat Server using the selectors module.
selector.py

# Choose the best implementation, roughly:
#    epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

Python Echo Server and Client Implementation, using Selectors

Let’s examine a simple echo server and client taken from the python library documentation.

Image

Python Echo Server

you can find the echo server code here.

sel = selectors.DefaultSelector()
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

First, we would like to create a selector. Then, we would create a server socket, bind it to port 1234 and start listening. We would set it in Non-Blocking mode, so that each operation on the socket (send/recv/…) would be non-blocking:

  • In blocking mode, operations block until complete or the system returns an error (such as connection timed out).
  • In non-blocking mode, operations fail (with an error that is unfortunately system-dependent) if they cannot be completed immediately: functions from the select can be used to know when and whether a socket is available for reading or writing.

So basically, we don’t need to block on anything since select will tell us when the socket is ready for reading/writing. All we have to do is to register the socket on the selector, and tell the selector when to check this socket. Here, if the socket is ready for reading (selectors.EVENT_READ) the function accept would be called. \ Initially, the state of the files (sockets, pipes,…) the selector currently monitor currently is the following:

  1. [sock, selectors.EVENT_READ, accept] # monitor sock for read operation, and if available for read, call accept
while True:
   events = sel.select()
   for key, mask in events:
       callback = key.data
       callback(key.fileobj, mask)

Then we finlay see the Event Loop - with the select function call in it. While True, wait for the select function to return. The select function will return wherever any of the following files its monitoring is ready for its registered operation (read/write/both). Then, it will call the callback associated with this event. We will discuss later the types of data/callbacks that can be associated with registering for an event.

Let’s say a new user connects to our server. so, select will detect our server socket is ready for reading, and call our function, accept. This function would accept the incoming connection, and would create a new socket (conn) (our connection between the server and the user). Since it’s new, we need to set it in Non-Blocking mode, like before. Now we would register our new socket (conn) in the selector, tell him that whenever we see that the socket is ready for read operation, call read function.
The state of the files (sockets, pipes,…) the selector currently monitor currently is the following:

  1. [sock, selectors.EVENT_READ, accept] # monitor sock for read operation, and if available for read, call accept
  2. [conn, selectors.EVENT_READ,read] # monitor conn for read operation, and if available for read, call read


def accept(sock, mask):
   conn, addr = sock.accept()  # Should be ready
   print('accepted', conn, 'from', addr)
   conn.setblocking(False)
   sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
   data = conn.recv(1000)  # Should be ready
   if data:
       print('echoing', repr(data), 'to', conn)
       conn.send(data)  # Hope it won't block
   else:
       print('closing', conn)
       sel.unregister(conn)
       conn.close()

So let’s say the user sends “Hello Kim” on its conn. On the Event Loop, the function select returns, since it detected that the monitored conn is available for reading, and call its registered read function. The read function then recv the data, and echo it back. if no data is presented, then we need to close the connection and unregister it from the selector, since the user probably killed the connection on his side. Notice the comment # Hope it won’t block.

Why are the pythonist who write this code is hoping and not sure? If we were to be pedantic (which we are not, since this is an example code), we would probably need first to register the conn socket to selectors.EVENT_WRITE, and use recv only when the select function tell us the conn is ready for writing.

Python Echo Client

you can find the echo client code here. (together with classic implementation of an echo server)

# Echo client program
import socket

HOST = 'localhost'        # The remote host
PORT = 1234               # The same port as used by the server
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.connect((HOST, PORT))
   while True:
       msg = input("Enter msg: ").encode('utf8')
       s.sendall(msg)

       data = s.recv(1024)
       print('Received: ', data.decode('utf-8'))

The client connects to the server, sends a message and wait for a response from the server. Notice that here, the recv function is blocking until we get our echo from the server.
Here is the output of each process:

EchoClient.py
Enter msg: Hello
Received:  Hello
Enter msg: How r u?
Received:  How r u?
Enter msg: Great, Thanks for asking.
Received:  Great, Thanks for asking.
EchoServer.py
accepted <socket.socket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 65167)> from ('127.0.0.1', 65167)
echoing b'Hello' to <socket.socket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 65167)>
echoing b'How r u?' to <socket.socket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 65167)>
echoing b'Great, Thanks for asking.' to <socket.socket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 65167)>

Python Chat Server and Client Implementation using Selectors

Python Chat Server

So now that we got the basic idea of how to use select inorder to handle multiple connections without blocking on IO, how would we implement a basic chat server?
Basic implementation would include:

  1. Saving a list of connections
  2. Upon arrival of a new message, broadcast it to all the other connections
class ChatServer:

   def __init__(self, **kwargs):
       self._selector = selectors.DefaultSelector()
       self._connections_msg_queue = {}
       self._host = kwargs['host']
       self._port = kwargs['port']

We would use the same data structure for both: self._connections_msg_queue = {} is a dict, with the socket as a key (sockets are hashable!), and its value is collections.deque(), of queue of messages pending to be sent to that connection. Whenever a new message arrives, we will append it to the deque of each connection, and would send it when the socket is ready for write operation.

We are using collections.deque instead of just a list because all we need is to append new messages and pop(0) when we need to send it, and python list is not an efficient data structure for this use case:

Though list objects support similar operations, they are optimized for fast fixed-length operations and incur O(n) memory movement costs for pop(0) and insert(0, v) operations which change both the size and position of the underlying data representation.

Accepting a New Connection

On the run function, the EventLoop is pretty much the same as before. First, we create the server socket, setting it in a Non-Blocking mode, and register it on the selector for read operations (selectors.EVENT_READ), and the callback to be called upon this event. Then, the select function would return with the list of sockets that are ready for IO. We would go over them, and execute the callbacks associated with them.

   def run(self):
       # create and register server socket for reading (accepting new connections)
       server_sock = socket.socket()
       server_sock.bind((self._host, self._port))
       server_sock.listen(SERVER_NUM_CONNECTIONS)
       server_sock.setblocking(False)
       self._selector.register(server_sock, selectors.EVENT_READ, self._accept)

       while True:
           events = self._selector.select()
           for key, mask in events:
               callback = key.data
               callback(key.fileobj, mask)

Notice a nice thing - let’s recall the register definition:

register(fileobj, events, data=None) data is an Optional opaque data object associated to this file object: for example, this could be used to store a per-client session ID.

since data is an (optional) opaque object (any arbitrary Python object for this discussion), we can attach here anything we want that would be related to this socket. Meaning, we can store per-client session ID (as the documentation suggested), save reference for the callback (this is what we are doing in this code), and additional parameters, and on and on. Notice that while in the previous example we passed a simple function, here we are passing self._accept - a bounded function which is part of a class. Luckily, we don’t need to pass self here, because self._accept is a bounded method which is pretty cool. As you can see here, a.run is actually an object that can be called, and contains a reference to it self.

class Hila:
...    def __init__(self, i):
...        self.i = i
...        
...    def run(self):
...        print(self.i)
...        
a = Hila(333)
a.run
<bound method Hila.run of <__main__.Hila object at 0x10b7906a0>>
a.run.__call__()
333
a.run.__self__
<__main__.Hila object at 0x10b7906a0>
a
<__main__.Hila object at 0x10b7906a0>


Back to our code. Initially, the state of the files (sockets, pipes,…) the selector currently monitor currently is the following:

  1. [sock, selectors.EVENT_READ, self._accept] # monitor sock for read operation, and if available for read, call self._accept

Adding a New Connection

after calling _accept, a call to _add_connection is executed, and new connection is added. We are setting its state to non-blocking, creating for this new connection a queue of messages (to be sent to it in the future, currently empty), and register it on the selector.

   def _accept(self, sock, mask):
       conn, addr = sock.accept()
       self._add_connection(conn)

   def _add_connection(self, conn):
       # register new client connection for reading (accepting new messages)
       print(f'{conn.getpeername()} hello!')
       self._connections_msg_queue[conn] = collections.deque()
       conn.setblocking(False)
       self._selector.register(conn, selectors.EVENT_READ, self._read)


the selector currently monitors the following (let’s say we have 2 users connected to our chat):

  1. [sock, selectors.EVENT_READ, self._accept] # monitor server socket for read operation, and if available for read, call self._accept
  2. [conn_1, selectors.EVENT_READ, self._read] # monitor chat client connection for read operation, and if available for read, call self._read. We can have multiple of connections like this monitored by the selector
  3. [conn_2, selectors.EVENT_READ, self._read]

Recieve a new message from a connection

Let’s say user 1 (with connection number 1) sent a message. After the select function returns, we would execute self._read(), which will execute self._read_message(conn1). _read_message will first receive the message, and then add the message to each active connection’s message queue:

   def _read(self, conn, mask):
       self._read_message(conn)

   def _read_message(self, conn):
       data = conn.recv(BUFFER_SIZE)  # Should be ready
       if data:
           self._add_message(conn, data)
       else:
           self._remove_connection(conn)

   def _add_message(self, sender_conn, raw_msg):
       try:
           msg = json.loads(raw_msg)
           message = Message(msg['user'], msg['text'])
           print(f"{sender_conn.getpeername()}: [{msg['user']}] {msg['text']}")
       except (json.JSONDecodeError, KeyError,) as e:
           print(f"We got unknown type of message: {raw_msg}; error: {e}")
           return

       # register every client connection for writing (broadcast recent messages)
       for conn, messages in self._connections_msg_queue.items():
           conn.setblocking(False)  # not sure if needed
           self._selector.modify(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self._read_write)
           messages.append(message)

Notice that now, we need a change in the type of events we want to monitor. Since we want to send the new pending message(s) to each connection, we need to request the select function to monitor when the socket is ready for writes. But, we also would like to keep monitoring read events, in case of any new messages are coming. so, instead of using unregister and then register with the new monitored events , we would use the function modify, which takes the same parameters as register.
we would modify the selector to monitor both (selectors.EVENT_READ | selectors.EVENT_WRITE) using bitwise OR, and call the self._read_write function upon event. On the Event loop, the returned mask (associated with each socket), contains the relevant state of the IO (ready for read/write).

   def _read_write(self, conn, mask):
       if mask & selectors.EVENT_READ:
           self._read(conn, mask)
       if mask & selectors.EVENT_WRITE:
           self._write(conn, mask)


the selector is currently monitoring the following (let’s say we have 2 users connected to our chat):

  1. [sock, selectors.EVENT_READ, self._accept] # monitor server socket for read operation, and if available for read, call self._accept
  2. [conn_1, selectors.EVENT_READ | selectors.EVENT_WRITE, self._read_write] # monitor chat client connection for read and write operation, and if available, call self._read_write. We can have multiple of connections like this monitored by the selector
  3. [conn_2, selectors.EVENT_READ | selectors.EVENT_WRITE, self._read_write]

Broadcasting recent messages to all connections

  def _write(self, conn, mask):
       self._write_pending_messages(conn)

   def _write_pending_messages(self, conn):
       messages = self._connections_msg_queue[conn]
       while messages:
           msg = messages.popleft()
           try:
               conn.send(f'[{msg.user}] {msg.text}'.encode('utf-8'))
           except Exception as e:
               print('Error occurred', e)
               self._remove_connection(conn)
               return

       # if no more message to send, don't listen to available for write
       conn.setblocking(False)  # not sure if needed
       self._selector.modify(conn, selectors.EVENT_READ, self._read)

So if the state of the socket is valid for writing, we would try to send all the pending messages in its queue. If there are no more messages left, we don’t need to monitor writes anymore - we can modify socket state on the selector back the previous state, where we just monitor if the socket is ready for reading.
So, We’re done with the server! here is the full code.

import argparse
import collections
import json
import selectors
import socket


SERVER_NUM_CONNECTIONS = 1000
BUFFER_SIZE = 1000

Message = collections.namedtuple('Message', ['user', 'text'])


class ChatServer:

   def __init__(self, **kwargs):
       self._selector = selectors.DefaultSelector()
       self._connections_msg_queue = {}
       self._host = kwargs['host']
       self._port = kwargs['port']

   ##### SELECT FUNCTIONS ########################

   def _accept(self, sock, mask):
       conn, addr = sock.accept()
       self._add_connection(conn)

   def _read_write(self, conn, mask):
       if mask & selectors.EVENT_READ:
           self._read(conn, mask)
       if mask & selectors.EVENT_WRITE:
           self._write(conn, mask)

   def _read(self, conn, mask):
       self._read_message(conn)

   def _write(self, conn, mask):
       self._write_pending_messages(conn)

   ##### CHAT FUNCTIONS ########################

   def _add_connection(self, conn):
       # register new client connection for reading (accepting new messages)
       print(f'{conn.getpeername()} hello!')
       self._connections_msg_queue[conn] = collections.deque()
       conn.setblocking(False)
       self._selector.register(conn, selectors.EVENT_READ, self._read)

   def _remove_connection(self, conn):
       print(f'{conn.getpeername()} bye bye!')
       self._selector.unregister(conn)
       conn.close()
       del self._connections_msg_queue[conn]

   def _read_message(self, conn):
       data = conn.recv(BUFFER_SIZE)  # Should be ready
       if data:
           self._add_message(conn, data)
       else:
           self._remove_connection(conn)

   def _add_message(self, sender_conn, raw_msg):
       try:
           msg = json.loads(raw_msg)
           message = Message(msg['user'], msg['text'])
           print(f"{sender_conn.getpeername()}: [{msg['user']}] {msg['text']}")
       except (json.JSONDecodeError, KeyError,) as e:
           print(f"We got unknown type of message: {raw_msg}; error: {e}")
           return

       # register every client connection for writing (broadcast recent messages)
       for conn, messages in self._connections_msg_queue.items():
           conn.setblocking(False)  # not sure if needed
           self._selector.modify(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self._read_write)
           messages.append(message)

   def _write_pending_messages(self, conn):
       messages = self._connections_msg_queue[conn]
       while messages:
           msg = messages.popleft()
           try:
               conn.send(f'[{msg.user}] {msg.text}'.encode('utf-8'))
           except Exception as e:
               print('Error occurred', e)
               self._remove_connection(conn)
               return

       # if no more message to send, don't listen to available for write
       conn.setblocking(False)  # not sure if needed
       self._selector.modify(conn, selectors.EVENT_READ, self._read)

   def run(self):
       # create and register server socket for reading (accepting new connections)
       server_sock = socket.socket()
       server_sock.bind((self._host, self._port))
       server_sock.listen(SERVER_NUM_CONNECTIONS)
       server_sock.setblocking(False)
       self._selector.register(server_sock, selectors.EVENT_READ, self._accept)

       while True:
           events = self._selector.select()
           for key, mask in events:
               callback = key.data
               callback(key.fileobj, mask)


parser = argparse.ArgumentParser(description='Chat server arguments.')
parser.add_argument('-host', nargs='?', default='localhost')
parser.add_argument('-port', nargs='?', default=1234)
args = parser.parse_args()

chat = ChatServer(**vars(args))
chat.run()


Python Chat Client

Notice here - we are using for the first time select with something other than sockets - stdin!

We want our code to be able to do both things:

  1. type new message on stdin (and send it)
  2. output the recent messages (from the server)

so it’s a combination of using stdin & stdout. One possible way to solve it is by using threads - one thread to read the input, and another thread to print the incoming messages. But why use threads when we can do it so elegantly with select? stdin is a file, after all, and as such, it can be also monitored by select.

I won’t explain in detail the code below, but only describe the objects the selector is monitoring:

  1. [sys.stdin, selectors.EVENT_READ, self._read_stdin] # monitor server socket for read operation, and if available for read, call self._accept
  2. [self._sock, selectors.EVENT_READ, self._read_msg]

We are a bit lazy here - when we get a new message from stdin, we simply sending it back to the server, without registering the socket for a write event on the selector and wait for the socket to be write ready. We simply assume that this is the case.

import argparse
import collections
import json
import selectors
import socket
import sys


BUFFER_SIZE = 1000

Message = collections.namedtuple('Message', ['user', 'text'])


class ChatClient:

   def __init__(self, **kwargs):
       self._selector = selectors.DefaultSelector()
       self._sock = None
       self._host = kwargs['host']
       self._port = kwargs['port']
       self._name = kwargs['username'] or input('Enter username:')
       self._running = True

   def _read_stdin(self, input, mask):
       data = sys.stdin.readline().strip()
       if data:
           msg = json.dumps({'user': self._name, 'text': data}, ensure_ascii=False).encode('utf8')
           self._sock.send(msg) # We should wait for selector here, but it will work.

   def _read_msg(self, conn, mask):
       data = conn.recv(BUFFER_SIZE)  # Should be ready
       if data:
           print(data.decode("utf-8"))
       else:
           print('Connection to server has failed')
           self._running = False

   def run(self):
       self._selector.register(sys.stdin, selectors.EVENT_READ, self._read_stdin)

       self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       self._sock.connect((self._host, self._port))
       self._sock.setblocking(False)
       self._selector.register(self._sock, selectors.EVENT_READ, self._read_msg)

       while self._running:
           events = self._selector.select()
           for key, mask in events:
               callback = key.data
               callback(key.fileobj, mask)


parser = argparse.ArgumentParser(description='Chat client arguments.')
parser.add_argument('-host', nargs='?', default='localhost')
parser.add_argument('-port', nargs='?', default=1234)
parser.add_argument('-username', nargs='?')
args = parser.parse_args()

chat = ChatClient(**vars(args))
chat.run()


Here is the output of each process:

ReactorChatClient.py output

Enter username:Bella
Hi you
[Bella] Hi you
My Name is Bella
[Bella] My Name is Bella
[Hila] Nice, My name is Hila
Im Kim number #1 fan!
[Bella] Im Kim number #1 fan!
[Hila] No, I am!!!!

ReactorChatServer.py output

('127.0.0.1', 57534) hello!
('127.0.0.1', 57534): [Hila] Hi There
('127.0.0.1', 57543) hello!
('127.0.0.1', 57543): [Bella] Hi you
('127.0.0.1', 57543): [Bella] My Name is Bella
('127.0.0.1', 57534): [Hila] Nice, My name is Hila
('127.0.0.1', 57543): [Bella] Im Kim number #1 fan!
('127.0.0.1', 57534): [Hila] No, I am!!!!

Reactor Design Pattern

So if you got here, you have pretty good sense about the reactor design pattern, without even mentioning it. Our Echo Server/Chat server were designed using the reactor pattern - so if you didn’t read that section, go ahead and read it now (the Echo Server would be enough).

from D. Schmidt paper about the Reactor pattern:

The Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients. Each service in an application may consist of serveral methods and is represented by a separate event handler that is responsible for dispatching service-specific requests. Dispatching of event handlers is performed by an initiation dispatcher, which manages the registered event handlers. Demultiplexing of service requests is performed by a synchronous event demultiplexer.

drawing

image from D. Schmidt paper about the Reactor pattern

Let’s review the basic components that compose the reactor pattern and see where we used them in our chat server code:

  • (Concrete) Event Handlers - an Object or a function responsible for handling a request, from a specific type of resource. For each type of request we can have its own event handler. On our code, We had 4 kinds of Event handlers:
    1. _accept - resource type: server socket. Handles the server socket, and accept a new connection.
    2. _read - resource type: client connection. Responsible of reading a new message, from a client connection.
    3. _write - resource type: client connection. Responsible of writing messages, to a client connection.
    4. _read_write - resource type: client connection. Combination of the two handlers above - needed when a read and write operation needed for the client connection.
  • Handles (Resources) - OS Resources that can give input, or consume output. For example: files, sockets, timers, synchronization objects, etc. On our code, We had 2 kinds of Resource types:
    1. server socket
    2. client connection socekt
  • Synchronous Event Demultiplexer - a fancy name for the select syscall, or any other variante of it. The select functions get set of IO resources to monitor, and returns when one or more of them is ready for IO read/write. On our python code, we used selectors.DefaultSelector, which gives us the most efficient implementation available on the current platform (roughly: epoll|kqueue|devpoll > poll > select).
  • Initiation Dispatcher (or: the Reactor) - the one who runs the shows. Responsible of register/unregister the Event Handlers and running the EventLoop (which calls the Synchronous Event Demultiplexer). On the Event Loop, after the Synchronous Event Demultiplexer would return with a list of Resources which are ready for IO operation, the reactor would call each of the resources matching registered Event Handler. On Our code, the Dispatcher is the selectors.DefaultSelector.

The structure of the participants of the Reactor pattern is illustrated in the following OMT class diagram:􏰁

drawing

image from D. Schmidt paper about the Reactor pattern

So as long you are running a single-threded event-loop using select to handle non-blocking IO, and dispach them with callbacks (Event Handlers) - Congratulations, you are using the Reactor Pattern!

Image


Where can we find the Reactor Pattern: libuv, Node.js, Nginx, Twisted

libuv

libuv is the non-blocking I/O engine of Node.js. Since every operating system implements it’s own mechanism for asynchronous I/O (or synchronous I/O multiplexing, whatever you’de like to call it), such as epoll, kqueue, IOCP, event ports; there was a need for an abstraction layer that would enable cross-platform single-threaded asynchronous I/O event loop.

drawing

image from libuv.org

Here is the simple “Hello World” program, contains the uv event loop:

#include <stdio.h>
#include <stdlib.h>
#include <uv.h>

int main() {
    uv_loop_t *loop = malloc(sizeof(uv_loop_t));
    uv_loop_init(loop);

    uv_run(loop, UV_RUN_DEFAULT);

    uv_loop_close(loop);
    free(loop);
    return 0;
}

we can see the event loop implementation (on uv_run function) for unix systems in the following file. Pretty cool and neat:

libuv/src/unix/core.c
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  ...
  while (r != 0 && loop->stop_flag == 0) {
    ...
    ran_pending = uv__run_pending(loop);
    ...
    uv__io_poll(loop, timeout);
    ...
  }
}

from their docs:

The event loop follows the rather usual single threaded asynchronous I/O approach: all (network) I/O is performed on non-blocking sockets which are polled using the best mechanism available on the given platform: epoll on Linux, kqueue on OSX and other BSDs, event ports on SunOS and IOCP on Windows. As part of a loop iteration the loop will block waiting for I/O activity on sockets which have been added to the poller and callbacks will be fired indicating socket conditions (readable, writable hangup) so handles can read, write or perform the desired I/O operation.

Node.js

So as we’ve seen before, libuv was developed for node.js. Just to clear it up, here is a high-level look at Node.js archiecture:

drawing


libuv (with the reactor pattern) and V8, the JavaScript engine (developed by Google for the Chrome browser) are the building blocks. On top of them we see the bindings exposing libuv and other low-level javascript features. On top of that, the node-core JS library that implement the Node.js API.

Nginx

Nginx is a web server, load balancer, reverse proxy that was designed for handling thousands of connections (but it can do more than that). Its arcitecture is composed of 4 types of processes:

  1. master process
  2. cache loader process
  3. cache manager
  4. worker processes - the only processes that are active when the server is busy. the worker process (single threaded) implement a reactor style event loop inorder to handle IO tasks (networks, files,…). according to their documentation (and our understanding of the subject so far) - the most efficient way to handle all the IO tasks is to run one worker process per CPU core.

It’s worth mentioning that nginx also have thread pools inorder to deal with problem of dependencies that are using blocking calls:

But the asynchronous, event‑driven approach still has a problem. Or, as I like to think of it, an “enemy”. And the name of the enemy is: blocking. Unfortunately, many third‑party modules use blocking calls, and users (and sometimes even the developers of the modules) aren’t aware of the drawbacks. Blocking operations can ruin NGINX performance and must be avoided at all costs.

Twisted

“Event-driven networking engine written in Python”, based on the reactor design pattern. They implement all kind of servers (http, mail, pub/sub, and much more), and twisted is considered to be cool (it’s cool enough for Twitch and Scarpy, anyway).

Here is their simple implementation of echo server:

from twisted.internet import protocol, reactor, endpoints

class Echo(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)

class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Echo()

endpoints.serverFromString(reactor, "tcp:1234").listen(EchoFactory())
reactor.run()

C10K

C10K was the problem we started this post with. In this problem (which Dan Kegel Described in 1999), we wonder how to handle 10K concurrent connections to the same server. There are many options to deal with this, some we’ve covered in this post: single/multi-threaded? single/multi-process? blocking/non-blocking sockets? create a thread/process for each request? move some code to the kernel?
Kegel in his paper states the pro’s and con’s of each strategy, and explains how to configure the operating system and write code to support thousands of clients. In the bottom line, using the reactor pattern is the solution (you can find the details at the original paper).

Today the term has changed and people are talking about C10M - since we have application with millions of conccurent connections.

Fin

We’ve covered why to use the Reactor pattern, when, how to use it (with examples of python echo and chat servers), and who is using it.

C10K Reactor - An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events by Douglas C. Schmidt Patterns in C – Part 5: REACTOR Reactor Pattern Explained Python Library Documentation