Source code for pycopia.asyncserver

# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:softtabstop=4:smarttab:expandtab:fenc=utf-8
#    Copyright (C) 2012 Keith Dart <>
#    This library is free software; you can redistribute it and/or
#    modify it under the terms of the GNU Lesser General Public
#    License as published by the Free Software Foundation; either
#    version 2.1 of the License, or (at your option) any later version.
#    This library is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    Lesser General Public License for more details.

Asynchronous server core that supports protocol objects.

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division

import sys
import traceback

from pycopia import protocols
from pycopia import socket
from pycopia import asyncio

# Socket protocol handlers


poller = asyncio.Poll()

[docs]class AsyncServerHandler(asyncio.PollerInterface): """Generic asynchronous server handler. Register an instance of this, with a worker class and a protocol, with the poller. """ def __init__(self, sock, workerclass, protocol): self._sock = sock _host, self.server_port = sock.getsockname() self.server_name = socket.getfqdn(_host) self._workerclass = workerclass self.protocol = protocol sock.setblocking(0) poller.register(self) def __del__(self): self.close()
[docs] def fileno(self): return self._sock.fileno()
[docs] def close(self): if self._sock is not None: poller.unregister(self) s = self._sock self._sock = None s.close()
closed = property(lambda self: bool(self._sock))
[docs] def readable(self): return True
[docs] def writable(self): return False
[docs] def priority(self): return True
[docs] def read_handler(self): conn, addr = self._sock.accept() conn.setblocking(0) h = self._workerclass(conn, addr, self.protocol) poller.register(h) return h
[docs]class AsyncWorkerHandler(asyncio.PollerInterface): def __init__(self, sock, addr, proto): self.address = addr self._sock = sock self.protocol = proto self._state = CONNECTED self._writebuf = "" self.initialize()
[docs] def fileno(self): return self._sock.fileno()
[docs] def close(self): if self._sock is not None: poller.unregister(self) s = self._sock self._sock = None if self._writebuf: s.send(self._writebuf) self._writebuf = "" s.close() self._state = CLOSED
closed = property(lambda self: self._state == CLOSED)
[docs] def write(self, data): self._writebuf += data poller.modify(self) return len(data)
[docs] def readable(self): return self._state == CONNECTED
[docs] def writable(self): return self._state == CONNECTED and bool(self._writebuf)
[docs] def priority(self): return self._state == CONNECTED
[docs] def hangup_handler(self): poller.unregister(self) self.close()
[docs] def error_handler(self): poller.unregister(self)
[docs] def write_handler(self): writ = self._sock.send(self._writebuf) self._writebuf = self._writebuf[writ:] poller.modify(self)
[docs] def initialize(self): self.protocol.reset()
[docs] def read_handler(self): with self._sock.makefile("w+b", 0) as fo: try: self.protocol.step(fo, self.address) except protocols.ProtocolExit: self.close() except protocols.ProtocolError: self.close() raise
[docs] def pri_handler(self): log("unhandled priority message")
[docs] def exception_handler(self, ex, val, tb): traceback.print_exception(ex, val, tb) self.close()
[docs]def log(*args): """Print to stderr""" print(*args, file=sys.stderr)