[Dev] Mock Reactor for tests?
John Anderson
john at osafoundation.org
Mon Jun 6 14:27:06 PDT 2005
I agree with Brian. It also seems like our tests would be more valid if
we were using real Twisted code instead of something else.
John
Brian Kirsch wrote:
> Hi Phillip,
> I am not sure if we really need the mock reactor. Twisted provides
> very good trial test support for creating local loopbacks between
> Twisted clients and Twisted servers. What is the issue with run and
> stop of a reactor? To my knowledge this should not be a problem.
> Also if all the unit tests were running in the same process why would
> you need to stop and start the reactor?
>
> I have attached my recent submissions to twisted core which include a
> pop3TestServer, a pop3Client, and a unittest illustrating how to set
> up local client server communication.
>
>
> Thoughts?
>
>
>
>
> Phillip J. Eby wrote:
>
>> Hi. I've heard recently that there are some tests that ideally would
>> need to run under the Twisted reactor, in order to properly exercise
>> the functionality under test. However, there are a number of issues
>> including the possible need for multiple uses of run/stop, dependency
>> on external servers, test duration, etc.
>>
>> There is, however, a relatively straightforward solution: use a mock
>> reactor. I've successfully used this approach in the past to test
>> event-driven libraries, although it was only with a subset of the
>> full Twisted reactor capabilities. A mock reactor can be stopped,
>> reset, and started as many times as you like, because it doesn't rely
>> on hooking a GUI event loop, running in a separate thread, or
>> anything like that.
>>
>> A mock reactor can run in "simulated time", which means that it uses
>> a time() function that runs faster than "real" time. For example, if
>> you schedule a callback, and there's no pending simulated I/O or
>> other scheduled calls, the simulated time jumps ahead to the next
>> scheduled callback.
>>
>> One additional side benefit of simulated time is that it's
>> deterministic and therefore can be reliably reproduced in repeated
>> tests. In PEAK, for example, I once wrote tests for some components
>> that might be compared to WakeupCallers in Chandler. I had several
>> scheduled to wake up on various intervals, and the test then verified
>> that they had run at all the times they should have. Since the time
>> is simulated, there were no rounding errors or clock precision to
>> take into account, and the tests could instantaneously whether they
>> were simulating seconds, minutes, or even hours of scheduling
>> operations.
>>
>> A mock reactor for Chandler tests could also use my "mockets" (fake
>> sockets) library in order to avoid doing any "real" network I/O,
>> allowing servers to listen and clients to connect to addresses on a
>> virtual network that exists only in the process' memory, thereby
>> avoiding the complexity of using external processes to set up and
>> tear down servers or depending on other servers being up and having
>> connectivity to them.
>>
>> Although I don't have a complete mock reactor implementation, I do
>> have most of the prerequisites and experience that would be needed to
>> implement one, if anybody is interested. So, if you have things
>> (like Zanshin, Chandler client protocols, etc.) that need
>> reactor-based testing, and would be interested in helping me test a
>> mock reactor for your test cases, let me know. It's also possible
>> that this could be a joint project with the Twisted folks; as early
>> as last year, Itamar expressed an interest in allowing test reactors
>> to run using simulated time:
>>
>> http://twistedmatrix.com/pipermail/twisted-python/2004-January/006982.html
>>
>>
>> And I would be surprised if they're not interested in having a
>> mocket-based reactor as well. The last hacking I did on Twisted was
>> around 1.1, so it might take me some time to get familiar with the
>> 2.0 reactor interfaces. However, unless there are major differences
>> I don't expect it to be difficult to do; Twisted is designed to
>> isolate code from the underlying transport mechanism in use. About
>> the only "interesting" part would likely be SSL/TLS, since I doubt
>> M2Crypto and OpenSSL will want to talk to mocket objects instead of
>> real sockets. It might be necessary to create mock SSL "Transport"
>> objects as well as a mock reactor.
>>
>> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
>>
>> Open Source Applications Foundation "Dev" mailing list
>> http://lists.osafoundation.org/mailman/listinfo/dev
>
>
>
>
>------------------------------------------------------------------------
>
># -*- test-case-name: twisted.mail.test.test_pop3client -*-
># Copyright (c) 2001-2004 Divmod Inc.
># See LICENSE for details.
>
>"""POP3 client protocol implementation
>
>Don't use this module directly. Use twisted.mail.pop3 instead.
>
>@author U{Jp Calderone<mailto:exarkun at twistedmatrix.com>}
>
>API Stability: Unstable
>"""
>
>import re, md5
>
>from twisted.python import log
>from twisted.internet import defer
>from twisted.protocols import basic
>from twisted.protocols import policies
>from twisted.internet import error
>from twisted.internet import interfaces
>
>OK = '+OK'
>ERR = '-ERR'
>
>class POP3ClientError(Exception):
> """Base class for all exceptions raised by POP3Client.
> """
>
>class InsecureAuthenticationDisallowed(POP3ClientError):
> """Secure authentication was required but no mechanism could be found.
> """
>
>class TLSError(POP3ClientError):
> """Secure authentication was required but no mechanism could be found.
> """
>
>class TLSNotSupportedError(POP3ClientError):
> """Secure authentication was required but no mechanism could be found.
> """
>
>class OptionNotSupportedError(POP3ClientError):
> """Secure authentication was required but no mechanism could be found.
> """
>
>
>class ServerErrorResponse(POP3ClientError):
> """The server returned an error response to a request.
> """
> def __init__(self, reason, consumer=None):
> POP3ClientError.__init__(self, reason)
> self.consumer = consumer
>
>class LineTooLong(POP3ClientError):
> """The server sent an extremely long line.
> """
>
>class _ListSetter:
> # Internal helper. POP3 responses sometimes occur in the
> # form of a list of lines containing two pieces of data,
> # a message index and a value of some sort. When a message
> # is deleted, it is omitted from these responses. The
> # setitem method of this class is meant to be called with
> # these two values. In the cases where indexes are skipped,
> # it takes care of padding out the missing values with None.
> def __init__(self, L):
> self.L = L
> def setitem(self, (item, value)):
> diff = item - len(self.L) + 1
> if diff > 0:
> self.L.extend([None] * diff)
> self.L[item] = value
>
>
>def _statXform(line):
> # Parse a STAT response
> numMsgs, totalSize = line.split(None, 1)
> return int(numMsgs), int(totalSize)
>
>
>def _listXform(line):
> # Parse a LIST response
> index, size = line.split(None, 1)
> return int(index) - 1, int(size)
>
>
>def _uidXform(line):
> # Parse a UIDL response
> index, uid = line.split(None, 1)
> return int(index) - 1, uid
>
>def _codeStatusSplit(line):
> # Parse an +OK or -ERR response
> parts = line.split(' ', 1)
> if len(parts) == 1:
> return parts[0], ''
> return parts
>
>def _dotUnquoter(line):
> """
> '.' characters which begin a line of a message are doubled to avoid
> confusing with the terminating '.\r\n' sequence. This function unquotes
> them.
> """
> if line.startswith('..'):
> return line[1:]
> return line
>
>class POP3Client(basic.LineOnlyReceiver, policies.TimeoutMixin):
> """POP3 client protocol implementation class
>
> Instances of this class provide a convenient, efficient API for
> retrieving and deleting messages from a POP3 server.
> """
>
> # Capabilities are not allowed to change during the session
> # So cache the first response and use that for all later
> # lookups
> _capCache = None
>
> # Whether STARTTLS has been issued successfully yet or not.
> startedTLS = False
>
> # Indicate whether login() should be allowed if the server
> # offers no authentication challenge and if our transport
> # does not offer any protection via encryption.
> allowInsecureLogin = False
>
> # Regular expression to search for in the challenge string in the server
> # greeting line.
> challengeMagicRe = re.compile('(<[^>]+>)')
>
> # Challenge received from the server
> serverChallenge = None
>
> # List of pending calls.
> # We are a pipelining API but don't actually
> # support pipelining on the network yet.
> _blockedQueue = None
>
> # The Deferred to which the very next result will go.
> waiting = None
>
> # Number of seconds to wait before timing out a connection.
> # If the number is <= 0 no timeout checking will be performed.
> timeout = 0
>
> #Overides LineOnlyReceiver to set a larger max length.
> MAX_LENGTH = 16384 * 2
>
> def __init__(self, contextFactory = None):
> self.context = contextFactory
> self.timedOut = False
>
>
> def _blocked(self, f, *a):
> # Internal helper. If commands are being blocked, append
> # the given command and arguments to a list and return a Deferred
> # that will be chained with the return value of the function
> # when it eventually runs. Otherwise, set up for commands to be
>
> # blocked and return None.
> if self._blockedQueue is not None:
> d = defer.Deferred()
> self._blockedQueue.append((d, f, a))
> return d
> self._blockedQueue = []
> return None
>
> def _unblock(self):
> # Internal helper. Indicate that a function has completed.
> # If there are blocked commands, run the next one. If there
> # are not, set up for the next command to not be blocked.
> if self._blockedQueue == []:
> self._blockedQueue = None
> elif self._blockedQueue is not None:
> d, f, a = self._blockedQueue.pop(0)
>
> d2 = f(*a)
> d2.chainDeferred(d)
>
> def sendShort(self, cmd, args):
> # Internal helper. Send a command to which a short response
> # is expected. Return a Deferred that fires when the response
> # is received. Block all further commands from being sent until
> # the response is received. Transition the state to SHORT.
> d = self._blocked(self.sendShort, cmd, args)
> if d is not None:
> return d
>
> if args:
> self.sendLine(cmd + ' ' + args)
> else:
> self.sendLine(cmd)
> self.state = 'SHORT'
> self.waiting = defer.Deferred()
> return self.waiting
>
> def sendLong(self, cmd, args, consumer, xform):
> # Internal helper. Send a command to which a multiline
> # response is expected. Return a Deferred that fires when
> # the entire response is received. Block all further commands
> # from being sent until the entire response is received.
> # Transition the state to LONG_INITIAL.
> d = self._blocked(self.sendLong, cmd, args, consumer, xform)
> if d is not None:
> return d
>
> if args:
> self.sendLine(cmd + ' ' + args)
> else:
> self.sendLine(cmd)
> self.state = 'LONG_INITIAL'
> self.xform = xform
> self.consumer = consumer
> self.waiting = defer.Deferred()
> return self.waiting
>
> # Twisted protocol callback
> def connectionMade(self):
> if self.timeout > 0:
> self.setTimeout(self.timeout)
>
> self.state = 'WELCOME'
>
> def timeoutConnection(self):
> self.timedOut = True
> self.transport.loseConnection()
>
> def connectionLost(self, reason):
> if self.timeout > 0:
> self.setTimeout(None)
>
> if self.timedOut:
> reason = error.TimeoutError()
> self.timedOut = False
>
> d = []
> if self.waiting is not None:
> d.append(self.waiting)
> self.waiting = None
> if self._blockedQueue is not None:
> d.extend([deferred for (deferred, f, a) in self._blockedQueue])
> self._blockedQueue = None
> for w in d:
> w.errback(reason)
>
> def lineReceived(self, line):
> if self.timeout > 0:
> self.resetTimeout()
>
> state = self.state
> self.state = None
> state = getattr(self, 'state_' + state)(line) or state
> if self.state is None:
> self.state = state
>
> def lineLengthExceeded(self, buffer):
> # XXX - We need to be smarter about this
> if self.waiting is not None:
> waiting, self.waiting = self.waiting, None
> waiting.errback(LineTooLong())
> self.transport.loseConnection()
>
> # POP3 Client state logic - don't touch this.
> def state_WELCOME(self, line):
> # WELCOME is the first state. The server sends one line of text
> # greeting us, possibly with an APOP challenge. Transition the
> # state to WAITING.
> code, status = _codeStatusSplit(line)
> if code != OK:
> #XXX: Should raise some kind of error here
> self.transport.loseConnection()
> else:
> m = self.challengeMagicRe.search(status)
>
> if m is not None:
> self.serverChallenge = m.group(1)
>
> self.serverGreeting(self.serverChallenge)
>
> return 'WAITING'
>
> def state_WAITING(self, line):
> # The server isn't supposed to send us anything in this state.
> log.msg("Illegal line from server: " + repr(line))
>
> def state_SHORT(self, line):
> # This is the state we are in when waiting for a single
> # line response. Parse it and fire the appropriate callback
> # or errback. Transition the state back to WAITING.
> deferred, self.waiting = self.waiting, None
> self._unblock()
> code, status = _codeStatusSplit(line)
> if code == OK:
> deferred.callback(status)
> else:
> deferred.errback(ServerErrorResponse(status))
> return 'WAITING'
>
> def state_LONG_INITIAL(self, line):
> # This is the state we are in when waiting for the first
> # line of a long response. Parse it and transition the
> # state to LONG if it is an okay response; if it is an
> # error response, fire an errback, clean up the things
> # waiting for a long response, and transition the state
> # to WAITING.
> code, status = _codeStatusSplit(line)
> if code == OK:
> return 'LONG'
> consumer = self.consumer
> deferred = self.waiting
> self.consumer = self.waiting = self.xform = None
> self._unblock()
> deferred.errback(ServerErrorResponse(status, consumer))
> return 'WAITING'
>
> def state_LONG(self, line):
> # This is the state for each line of a long response.
> # If it is the last line, finish things, fire the
> # Deferred, and transition the state to WAITING.
> # Otherwise, pass the line to the consumer.
> if line == '.':
> consumer = self.consumer
> deferred = self.waiting
> self.consumer = self.waiting = self.xform = None
> self._unblock()
> deferred.callback(consumer)
> return 'WAITING'
> else:
> if self.xform is not None:
> self.consumer(self.xform(line))
> else:
> self.consumer(line)
> return 'LONG'
>
> def serverGreeting(self, challenge):
> """Called when the server has sent us a greeting.
>
> @type challenge: C{Str} (None if no challenge returned in the Server Greeting)
> @param challenge: A POP3 server which implements the APOP command will
> include a timestamp challenge in its banner greeting (RFC 1939).
> .
> """
>
> def startTLS(self, contextFactory=None):
> """
> Initiates a 'STLS' request and negotiates the TLS / SSL
> Handshake.
>
> @param contextFactory: The TLS / SSL Context Factory to
> leverage. If the contextFactory is None the POP3Client will
> either use the current TLS / SSL Context Factory or attempt to
> create a new one.
>
> @type contextFactory: C{ssl.ClientContextFactory}
>
> @return: A Deferred which fires when the transport has been
> secured according to the given contextFactory, or which fails
> if the transport cannot be secured.
> """
>
> if self._capCache is None:
> d = self.capabilities()
>
> else:
> d = defer.succeed(self._capCache)
>
> d.addCallback(self._startTLS, contextFactory)
> return d
>
>
> def _startTLS(self, caps, contextFactory):
> assert not self.startedTLS, "Client and Server are currently communicating via TLS"
>
> if contextFactory is None:
> contextFactory = self._getContextFactory()
>
> if contextFactory is None:
> return defer.fail(TLSError(
> "POP3Client requires a TLS context to "
> "initiate the STARTTLS handshake"))
>
> if 'STLS' not in caps:
> return defer.fail(TLSNotSupportedError(
> "Server does not support secure communication "
> "via TLS / SSL"))
>
> tls = interfaces.ITLSTransport(self.transport, None)
>
> if tls is None:
> return defer.fail(TLSError(
> "POP3Client transport does not implement "
> "interfaces.ITLSTransport"))
>
> d = self.sendShort('STLS', None)
> d.addCallback(self._startedTLS, contextFactory)
> d.addCallback(lambda _: self.capabilities())
> return d
>
> def _startedTLS(self, result, context):
> self.transport.startTLS(context)
> self._capCache = None
> self.startedTLS = True
> self.context = context
> return result
>
> def _getContextFactory(self):
> if self.context is not None:
> return self.context
> try:
> from twisted.internet import ssl
> except ImportError:
> return None
> else:
> context = ssl.ClientContextFactory()
> context.method = ssl.SSL.TLSv1_METHOD
> return context
>
> # External hooks - call these (most of 'em anyway)
> def login(self, username, password):
> """Log into the server.
>
> If APOP is available it will be used. Otherwise, if
> TLS is available a 'STLS' session will be started and
> plaintext login will proceed. Otherwise, if the
> instance attribute allowInsecureLogin is set to True,
> insecure plaintext login will proceed. Otherwise,
> InsecureAuthenticationDisallowed will be raised
> (asynchronously).
>
> @param username: The username with which to log in.
> @param password: The password with which to log in.
>
> @rtype: C{Deferred}
> @return: A deferred which fires when login has
> completed.
> """
> if self._capCache is None:
> d = self.capabilities()
>
> else:
> d = defer.succeed(self._capCache)
>
> d.addCallback(self._login, username, password)
> return d
>
> def _login(self, caps, username, password):
> if self.serverChallenge is not None:
> return self._apop(username, password, self.serverChallenge)
>
> tryTLS = 'STLS' in caps
>
> #If our transport supports switching to TLS, we might want to try to switch to TLS.
> tlsableTransport = interfaces.ITLSTransport(self.transport, default=None) is not None
>
> # If our transport is not already using TLS, we might want to try to switch to TLS.
> nontlsTransport = interfaces.ISSLTransport(self.transport, default=None) is None
>
> if not self.startedTLS and tryTLS and tlsableTransport and nontlsTransport:
> d = self.startTLS()
>
> d.addCallback(self._loginTLS, username, password)
> return d
>
> elif self.startedTLS or self.allowInsecureLogin:
> return self._plaintext(username, password)
> else:
> return defer.fail(InsecureAuthenticationDisallowed())
>
> def _loginTLS(self, res, username, password):
> return self._plaintext(username, password)
>
> def _plaintext(self, username, password):
> # Internal helper. Send a username/password pair, returning a Deferred
> # that fires when both have succeeded or fails when the server rejects
> # either.
> return self.user(username).addCallback(lambda r: self.password(password))
>
> def _apop(self, username, password, challenge):
> # Internal helper. Computes and sends an APOP response. Returns
> # a Deferred that fires when the server responds to the response.
> digest = md5.new(challenge + password).hexdigest()
> return self.apop(username, digest)
>
> def apop(self, username, digest):
> """Perform APOP login.
>
> This should be used in special circumstances only, when it is
> known that the server supports APOP authentication, and APOP
> authentication is absolutely required. For the common case,
> use L{login} instead.
>
> @param username: The username with which to log in.
> @param digest: The challenge response to authenticate with.
> """
> return self.sendShort('APOP', username + ' ' + digest)
>
> def user(self, username):
> """Send the user command.
>
> This performs the first half of plaintext login. Unless this
> is absolutely required, use the L{login} method instead.
>
> @param username: The username with which to log in.
> """
> return self.sendShort('USER', username)
>
> def password(self, password):
> """Send the password command.
>
> This performs the second half of plaintext login. Unless this
> is absolutely required, use the L{login} method instead.
>
> @param password: The plaintext password with which to authenticate.
> """
> return self.sendShort('PASS', password)
>
> def delete(self, index):
> """Delete a message from the server.
>
> @type index: C{int}
> @param index: The index of the message to delete.
> This is 0-based.
>
> @rtype: C{Deferred}
> @return: A deferred which fires when the delete command
> is successful, or fails if the server returns an error.
> """
> return self.sendShort('DELE', str(index + 1))
>
> def _consumeOrSetItem(self, cmd, args, consumer, xform):
> # Internal helper. Send a long command. If no consumer is
> # provided, create a consumer that puts results into a list
> # and return a Deferred that fires with that list when it
> # is complete.
> if consumer is None:
> L = []
> consumer = _ListSetter(L).setitem
> return self.sendLong(cmd, args, consumer, xform).addCallback(lambda r: L)
> return self.sendLong(cmd, args, consumer, xform)
>
> def _consumeOrAppend(self, cmd, args, consumer, xform):
> # Internal helper. Send a long command. If no consumer is
> # provided, create a consumer that appends results to a list
> # and return a Deferred that fires with that list when it is
> # complete.
> if consumer is None:
> L = []
> consumer = L.append
> return self.sendLong(cmd, args, consumer, xform).addCallback(lambda r: L)
> return self.sendLong(cmd, args, consumer, xform)
>
> def capabilities(self, useCache=1):
> """Retrieve the capabilities supported by this server.
> """
> if useCache and self._capCache is not None:
> return defer.succeed(self._capCache)
>
> #Reset the Capabilities Cache
> self._capCache = {}
>
> d = self._consumeOrAppend('CAPA', None, self._capsConsumer, None)
> #cabilities is not supported by some POP servers. If an error
> #is thrown we still want the same behavior
> d.addBoth(self._cbCapabilities)
> return d
>
> def _cbCapabilities(self, result):
> """Returns the Capabilities to the caller"""
> return self._capCache
>
>
> def _capsConsumer(self, line):
> tmp = line.split()
>
> size = len(tmp)
>
> if size == 0:
> return
>
> if size == 1:
> self._capCache[tmp[0]] = None
> else:
> self._capCache[tmp[0]] = tmp[1:]
>
> def noop(self):
> return self.sendShort("NOOP", None)
>
> def rset(self):
> return self.sendShort("RSET", None)
>
> def retrieve(self, index, consumer=None, lines=None):
> """Retrieve a message from the server.
>
> If L{consumer} is not None, it will be called with
> each line of the message as it is received. Otherwise,
> the returned Deferred will be fired with a list of all
> the lines when the message has been completely received.
> """
> idx = str(index + 1)
> if lines is None:
> return self._consumeOrAppend('RETR', idx, consumer, _dotUnquoter)
>
> return self._consumeOrAppend('TOP', '%s %d' % (idx, lines), consumer, _dotUnquoter)
>
>
> def stat(self):
> """Issues a 'STAT' request which is allowed in the TRANSACTION state (RFC 1939).
> The returned Deferred will be fired with a tuple containing the
> number or messages in the maildrop and the size of the
> maildrop in octets.
> """
> return self.sendShort('STAT', None).addCallback(_statXform)
>
> def listSize(self, consumer=None):
> """Retrieve a list of the size of all messages on the server.
>
> If L{consumer} is not None, it will be called with two-tuples
> of message index number and message size as they are received.
> Otherwise, a Deferred which will fire with a list of B{only}
> message sizes will be returned. For messages which have been
> deleted, None will be used in place of the message size.
> """
> return self._consumeOrSetItem('LIST', None, consumer, _listXform)
>
> def listUID(self, consumer=None):
> """Retrieve a list of the UIDs of all messages on the server.
>
> If L{consumer} is not None, it will be called with two-tuples
> of message index number and message UID as they are received.
> Otherwise, a Deferred which will fire with of list of B{only}
> message UIDs will be returned. For messages which have been
> deleted, None will be used in place of the message UID.
> """
>
> return self._consumeOrSetItem('UIDL', None, consumer, _uidXform)
>
> def quit(self):
> """Disconnect from the server.
> """
> return self.sendShort('QUIT', None)
>
>__all__ = [
> # Exceptions
> 'InsecureAuthenticationDisallowed', 'LineTooLong', 'POP3ClientError',
> 'ServerErrorResponse', 'TLSError', 'TLSNotSupportedError', 'OptionNotSupported',
>
> # Protocol classes
> 'POP3Client']
>
>
>------------------------------------------------------------------------
>
>#!/usr/local/bin/python
>from twisted.internet.protocol import Factory
>from twisted.protocols import basic
>from twisted.internet import reactor
>import sys
>
>USER = "test"
>PASS = "twisted"
>
>PORT = 1100
>
>SSL_SUPPORT = True
>UIDL_SUPPORT = True
>INVALID_SERVER_RESPONSE = False
>INVALID_CAPABILITY_RESPONSE = False
>INVALID_LOGIN_RESPONSE = False
>DENY_CONNECTION = False
>DROP_CONNECTION = False
>BAD_TLS_RESPONSE = False
>TIMEOUT_RESPONSE = False
>TIMEOUT_DEFERRED = False
>SLOW_GREETING = False
>
>"""Commands"""
>CONNECTION_MADE = "+OK POP3 localhost v2003.83 server ready"
>
>CAPABILITIES = [
>"TOP",
>"LOGIN-DELAY 180",
>"USER",
>"SASL LOGIN"
>]
>
>CAPABILITIES_SSL = "STLS"
>CAPABILITIES_UIDL = "UIDL"
>
>
>INVALID_RESPONSE = "-ERR Unknown request"
>VALID_RESPONSE = "+OK Command Completed"
>AUTH_DECLINED = "-ERR LOGIN failed"
>AUTH_ACCEPTED = "+OK Mailbox open, 0 messages"
>TLS_ERROR = "-ERR server side error start TLS handshake"
>LOGOUT_COMPLETE = "+OK quit completed"
>NOT_LOGGED_IN = "-ERR Unknown AUHORIZATION state command"
>STAT = "+OK 0 0"
>UIDL = "+OK Unique-ID listing follows\r\n."
>LIST = "+OK Mailbox scan listing follows\r\n."
>CAP_START = "+OK Capability list follows:"
>
>
>class POP3TestServer(basic.LineReceiver):
> def __init__(self, contextFactory = None):
> self.loggedIn = False
> self.caps = None
> self.tmpUser = None
> self.ctx = contextFactory
>
> def sendSTATResp(self, req):
> self.sendLine(STAT)
>
> def sendUIDLResp(self, req):
> self.sendLine(UIDL)
>
> def sendLISTResp(self, req):
> self.sendLine(LIST)
>
> def sendCapabilities(self):
> if self.caps is None:
> self.caps = [CAP_START]
>
> if UIDL_SUPPORT:
> self.caps.append(CAPABILITIES_UIDL)
>
> if SSL_SUPPORT:
> self.caps.append(CAPABILITIES_SSL)
>
> for cap in CAPABILITIES:
> self.caps.append(cap)
> resp = '\r\n'.join(self.caps)
> resp += "\r\n."
>
> self.sendLine(resp)
>
>
> def connectionMade(self):
> if DENY_CONNECTION:
> self.transport.loseConnection()
> return
>
> if SLOW_GREETING:
> reactor.callLater(20, self.sendGreeting)
>
> else:
> self.sendGreeting()
>
> def sendGreeting(self):
> self.sendLine(CONNECTION_MADE)
>
> def lineReceived(self, line):
> """Error Conditions"""
> if TIMEOUT_RESPONSE:
> """Do not respond to clients request"""
> return
>
> if DROP_CONNECTION:
> self.transport.loseConnection()
> return
>
> elif "CAPA" in line.upper():
> if INVALID_CAPABILITY_RESPONSE:
> self.sendLine(INVALID_RESPONSE)
> else:
> self.sendCapabilities()
>
> elif "STLS" in line.upper() and SSL_SUPPORT:
> self.startTLS()
>
> elif "USER" in line.upper():
> if INVALID_LOGIN_RESPONSE:
> self.sendLine(INVALID_RESPONSE)
> return
>
> resp = None
> try:
> self.tmpUser = line.split(" ")[1]
> resp = VALID_RESPONSE
> except:
> resp = AUTH_DECLINED
>
> self.sendLine(resp)
>
> elif "PASS" in line.upper():
> resp = None
> try:
> pwd = line.split(" ")[1]
>
> if self.tmpUser is None or pwd is None:
> resp = AUTH_DECLINED
> elif self.tmpUser == USER and pwd == PASS:
> resp = AUTH_ACCEPTED
> self.loggedIn = True
> else:
> resp = AUTH_DECLINED
> except:
> resp = AUTH_DECLINED
>
> self.sendLine(resp)
>
> elif "QUIT" in line.upper():
> self.loggedIn = False
> self.sendLine(LOGOUT_COMPLETE)
> self.disconnect()
>
> elif INVALID_SERVER_RESPONSE:
> self.sendLine(INVALID_RESPONSE)
>
> elif not self.loggedIn:
> self.sendLine(NOT_LOGGED_IN)
>
> elif "NOOP" in line.upper():
> self.sendLine(VALID_RESPONSE)
>
> elif "STAT" in line.upper():
> if TIMEOUT_DEFERRED:
> return
> self.sendLine(STAT)
>
> elif "LIST" in line.upper():
> if TIMEOUT_DEFERRED:
> return
> self.sendLine(LIST)
>
> elif "UIDL" in line.upper():
> if TIMEOUT_DEFERRED:
> return
> elif not UIDL_SUPPORT:
> self.sendLine(INVALID_RESPONSE)
> return
>
> self.sendLine(UIDL)
>
> def startTLS(self):
> if self.ctx is None:
> self.getContext()
>
> if SSL_SUPPORT and self.ctx is not None:
> self.sendLine('+OK Begin TLS negotiation now')
> self.transport.startTLS(self.ctx)
> else:
> self.sendLine('+OK TLS not available')
>
> def disconnect(self):
> self.transport.loseConnection()
>
> def getContext(self):
> try:
> from twisted.internet import ssl
> except ImportError:
> self.ctx = None
> else:
> self.ctx = ssl.ClientContextFactory()
> self.ctx.method = ssl.SSL.TLSv1_METHOD
>
>
>usage = """popServer.py [arg] (default is Standard POP Server with no messages)
>no_ssl - Start with no SSL support
>no_uidl - Start with no UIDL support
>bad_resp - Send a non-RFC compliant response to the Client
>bad_cap_resp - send a non-RFC compliant response when the Client sends a 'CAPABILITY' request
>bad_login_resp - send a non-RFC compliant response when the Client sends a 'LOGIN' request
>deny - Deny the connection
>drop - Drop the connection after sending the greeting
>bad_tls - Send a bad response to a STARTTLS
>timeout - Do not return a response to a Client request
>to_deferred - Do not return a response on a 'Select' request. This
> will test Deferred callback handling
>slow - Wait 20 seconds after the connection is made to return a Server Greeting
>"""
>
>def printMessage(msg):
> print "Server Starting in %s mode" % msg
>
>def processArg(arg):
>
> if arg.lower() == 'no_ssl':
> global SSL_SUPPORT
> SSL_SUPPORT = False
> printMessage("NON-SSL")
>
> elif arg.lower() == 'no_uidl':
> global UIDL_SUPPORT
> UIDL_SUPPORT = False
> printMessage("NON-UIDL")
>
> elif arg.lower() == 'bad_resp':
> global INVALID_SERVER_RESPONSE
> INVALID_SERVER_RESPONSE = True
> printMessage("Invalid Server Response")
>
> elif arg.lower() == 'bad_cap_resp':
> global INVALID_CAPABILITY_RESPONSE
> INVALID_CAPABILITY_RESPONSE = True
> printMessage("Invalid Capability Response")
>
> elif arg.lower() == 'bad_login_resp':
> global INVALID_LOGIN_RESPONSE
> INVALID_LOGIN_RESPONSE = True
> printMessage("Invalid Capability Response")
>
> elif arg.lower() == 'deny':
> global DENY_CONNECTION
> DENY_CONNECTION = True
> printMessage("Deny Connection")
>
> elif arg.lower() == 'drop':
> global DROP_CONNECTION
> DROP_CONNECTION = True
> printMessage("Drop Connection")
>
>
> elif arg.lower() == 'bad_tls':
> global BAD_TLS_RESPONSE
> BAD_TLS_RESPONSE = True
> printMessage("Bad TLS Response")
>
> elif arg.lower() == 'timeout':
> global TIMEOUT_RESPONSE
> TIMEOUT_RESPONSE = True
> printMessage("Timeout Response")
>
> elif arg.lower() == 'to_deferred':
> global TIMEOUT_DEFERRED
> TIMEOUT_DEFERRED = True
> printMessage("Timeout Deferred Response")
>
> elif arg.lower() == 'slow':
> global SLOW_GREETING
> SLOW_GREETING = True
> printMessage("Slow Greeting")
>
> elif arg.lower() == '--help':
> print usage
> sys.exit()
>
> else:
> print usage
> sys.exit()
>
>def main():
>
> if len(sys.argv) < 2:
> printMessage("POP3 with no messages")
> else:
> args = sys.argv[1:]
>
> for arg in args:
> processArg(arg)
>
> f = Factory()
> f.protocol = POP3TestServer
> reactor.listenTCP(PORT, f)
> reactor.run()
>
>if __name__ == '__main__':
> main()
>
>
>------------------------------------------------------------------------
>
># -*- test-case-name: twisted.mail.test.test_pop3client -*-
># Copyright (c) 2001-2004 Divmod Inc.
># See LICENSE for details.
>
>from twisted.mail.pop3 import AdvancedPOP3Client as POP3Client
>from twisted.mail.pop3 import InsecureAuthenticationDisallowed
>from twisted.mail.pop3 import ServerErrorResponse
>from twisted.protocols import loopback
>from twisted.internet import defer
>
>from twisted.trial import unittest
>from twisted.test.proto_helpers import StringTransport
>from twisted.protocols import basic
>import pop3TestServer
>
>try:
> from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
>except ImportError:
> ClientTLSContext = ServerTLSContext = None
>
>capCache = {"TOP": None, "LOGIN-DELAY": "180", "UIDL": None, \
> "STLS": None, "USER": None, "SASL": "LOGIN"}
>def setUp():
> p = POP3Client()
>
> p._capCache = capCache
>
> t = StringTransport()
> p.makeConnection(t)
> return p, t
>
>def strip(f):
> return lambda result, f=f: f()
>
>class POP3ClientLoginTestCase(unittest.TestCase):
> def testOkUser(self):
> p, t = setUp()
> d = p.user("username")
> self.assertEquals(t.value(), "USER username\r\n")
> p.dataReceived("+OK send password\r\n")
> return d.addCallback(unittest.assertEqual, "send password")
>
> def testBadUser(self):
> p, t = setUp()
> d = p.user("username")
> self.assertEquals(t.value(), "USER username\r\n")
> p.dataReceived("-ERR account suspended\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "account suspended")
>
> def testOkPass(self):
> p, t = setUp()
> d = p.password("password")
> self.assertEquals(t.value(), "PASS password\r\n")
> p.dataReceived("+OK you're in!\r\n")
> return d.addCallback(unittest.assertEqual, "you're in!")
>
> def testBadPass(self):
> p, t = setUp()
> d = p.password("password")
> self.assertEquals(t.value(), "PASS password\r\n")
> p.dataReceived("-ERR go away\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "go away")
>
> def testOkLogin(self):
> p, t = setUp()
> p.allowInsecureLogin = True
> d = p.login("username", "password")
> self.assertEquals(t.value(), "USER username\r\n")
> p.dataReceived("+OK go ahead\r\n")
> self.assertEquals(t.value(), "USER username\r\nPASS password\r\n")
> p.dataReceived("+OK password accepted\r\n")
> return d.addCallback(unittest.assertEqual, "password accepted")
>
> def testBadPasswordLogin(self):
> p, t = setUp()
> p.allowInsecureLogin = True
> d = p.login("username", "password")
> self.assertEquals(t.value(), "USER username\r\n")
> p.dataReceived("+OK waiting on you\r\n")
> self.assertEquals(t.value(), "USER username\r\nPASS password\r\n")
> p.dataReceived("-ERR bogus login\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "bogus login")
>
> def testBadUsernameLogin(self):
> p, t = setUp()
> p.allowInsecureLogin = True
> d = p.login("username", "password")
> self.assertEquals(t.value(), "USER username\r\n")
> p.dataReceived("-ERR bogus login\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "bogus login")
>
> def testServerGreeting(self):
> p, t = setUp()
> # Make sure it *isn't* in the instance dict, just for sanity
> self.failIfIn('serverChallenge', vars(p))
> p.dataReceived("+OK lalala this has no challenge\r\n")
> # Make sure it *is* in the instance dict and that it is None
> self.assertEquals(p.serverChallenge, None)
>
> def testServerGreetingWithChallenge(self):
> p, t = setUp()
> # Make sure it *isn't* in the instance dict, just for sanity
> self.failIfIn('serverChallenge', vars(p))
> p.dataReceived("+OK <here is the challenge>\r\n")
> # Make sure it *is* in the instance dict and is what we sent
> self.assertEquals(vars(p)['serverChallenge'], "<here is the challenge>")
>
> def testAPOP(self):
> p, t = setUp()
> p.dataReceived("+OK <challenge string goes here>\r\n")
> d = p.login("username", "password")
> self.assertEquals(t.value(), "APOP username f34f1e464d0d7927607753129cabe39a\r\n")
> p.dataReceived("+OK Welcome!\r\n")
> return d.addCallback(unittest.assertEqual, "Welcome!")
>
> def testInsecureLoginRaisesException(self):
> p, t = setUp()
> p.dataReceived("+OK Howdy")
> d = p.login("username", "password")
> self.failIf(t.value())
> self.assertRaises(InsecureAuthenticationDisallowed, unittest.wait, d)
>
>class ListConsumer:
> def __init__(self):
> self.data = {}
>
> def consume(self, (item, value)):
> self.data.setdefault(item, []).append(value)
>
>class MessageConsumer:
> def __init__(self):
> self.data = []
>
> def consume(self, line):
> self.data.append(line)
>
>class POP3ClientListTestCase(unittest.TestCase):
> def testListSize(self):
> p, t = setUp()
> d = p.listSize()
> self.assertEquals(t.value(), "LIST\r\n")
> p.dataReceived("+OK Here it comes\r\n")
> p.dataReceived("1 3\r\n2 2\r\n3 1\r\n.\r\n")
> return d.addCallback(unittest.assertEqual, [3, 2, 1])
>
> def testListSizeWithConsumer(self):
> p, t = setUp()
> c = ListConsumer()
> f = c.consume
> d = p.listSize(f)
> self.assertEquals(t.value(), "LIST\r\n")
> p.dataReceived("+OK Here it comes\r\n")
> p.dataReceived("1 3\r\n2 2\r\n3 1\r\n")
> self.assertEquals(c.data, {0: [3], 1: [2], 2: [1]})
> p.dataReceived("5 3\r\n6 2\r\n7 1\r\n")
> self.assertEquals(c.data, {0: [3], 1: [2], 2: [1], 4: [3], 5: [2], 6: [1]})
> p.dataReceived(".\r\n")
> return d.addCallback(unittest.assertIdentical, f)
>
> def testFailedListSize(self):
> p, t = setUp()
> d = p.listSize()
> self.assertEquals(t.value(), "LIST\r\n")
> p.dataReceived("-ERR Fatal doom server exploded\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "Fatal doom server exploded")
>
> def testListUID(self):
> p, t = setUp()
> d = p.listUID()
> self.assertEquals(t.value(), "UIDL\r\n")
> p.dataReceived("+OK Here it comes\r\n")
> p.dataReceived("1 abc\r\n2 def\r\n3 ghi\r\n.\r\n")
> return d.addCallback(unittest.assertEqual, ["abc", "def", "ghi"])
>
> def testListUIDWithConsumer(self):
> p, t = setUp()
> c = ListConsumer()
> f = c.consume
> d = p.listUID(f)
> self.assertEquals(t.value(), "UIDL\r\n")
> p.dataReceived("+OK Here it comes\r\n")
> p.dataReceived("1 xyz\r\n2 abc\r\n5 mno\r\n")
> self.assertEquals(c.data, {0: ["xyz"], 1: ["abc"], 4: ["mno"]})
> p.dataReceived(".\r\n")
> return d.addCallback(unittest.assertIdentical, f)
>
> def testFailedListUID(self):
> p, t = setUp()
> d = p.listUID()
> self.assertEquals(t.value(), "UIDL\r\n")
> p.dataReceived("-ERR Fatal doom server exploded\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "Fatal doom server exploded")
>
>class POP3ClientMessageTestCase(unittest.TestCase):
> def testRetrieve(self):
> p, t = setUp()
> d = p.retrieve(7)
> self.assertEquals(t.value(), "RETR 8\r\n")
> p.dataReceived("+OK Message incoming\r\n")
> p.dataReceived("La la la here is message text\r\n")
> p.dataReceived("..Further message text tra la la\r\n")
> p.dataReceived(".\r\n")
> return d.addCallback(
> unittest.assertEqual,
> ["La la la here is message text",
> ".Further message text tra la la"])
>
> def testRetrieveWithConsumer(self):
> p, t = setUp()
> c = MessageConsumer()
> f = c.consume
> d = p.retrieve(7, f)
> self.assertEquals(t.value(), "RETR 8\r\n")
> p.dataReceived("+OK Message incoming\r\n")
> p.dataReceived("La la la here is message text\r\n")
> p.dataReceived("..Further message text\r\n.\r\n")
> self.assertIdentical(unittest.wait(d), f)
> self.assertEquals(c.data, ["La la la here is message text",
> ".Further message text"])
>
> def testPartialRetrieve(self):
> p, t = setUp()
> d = p.retrieve(7, lines=2)
> self.assertEquals(t.value(), "TOP 8 2\r\n")
> p.dataReceived("+OK 2 lines on the way\r\n")
> p.dataReceived("Line the first! Woop\r\n")
> p.dataReceived("Line the last! Bye\r\n")
> p.dataReceived(".\r\n")
> return d.addCallback(
> unittest.assertEqual,
> ["Line the first! Woop",
> "Line the last! Bye"])
>
> def testPartialRetrieveWithConsumer(self):
> p, t = setUp()
> c = MessageConsumer()
> f = c.consume
> d = p.retrieve(7, f, lines=2)
> self.assertEquals(t.value(), "TOP 8 2\r\n")
> p.dataReceived("+OK 2 lines on the way\r\n")
> p.dataReceived("Line the first! Woop\r\n")
> p.dataReceived("Line the last! Bye\r\n")
> p.dataReceived(".\r\n")
> self.assertIdentical(unittest.wait(d), f)
> self.assertEquals(c.data, ["Line the first! Woop",
> "Line the last! Bye"])
>
> def testFailedRetrieve(self):
> p, t = setUp()
> d = p.retrieve(0)
> self.assertEquals(t.value(), "RETR 1\r\n")
> p.dataReceived("-ERR Fatal doom server exploded\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "Fatal doom server exploded")
>
>class POP3ClientMiscTestCase(unittest.TestCase):
> def testCapability(self):
> p, t = setUp()
> d = p.capabilities(useCache=0)
> self.assertEquals(t.value(), "CAPA\r\n")
> p.dataReceived("+OK Capabilities on the way\r\n")
> p.dataReceived("X\r\nY\r\nZ\r\n.\r\n")
> return d.addCallback(unittest.assertEqual, {"X": None, "Y": None, "Z": None})
>
> def testCapabilityError(self):
> p, t = setUp()
> d = p.capabilities(useCache=0)
> self.assertEquals(t.value(), "CAPA\r\n")
> p.dataReceived("-ERR This server is lame!\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "This server is lame!")
>
> def testStat(self):
> p, t = setUp()
> d = p.stat()
> self.assertEquals(t.value(), "STAT\r\n")
> p.dataReceived("+OK 1 1212\r\n")
> return d.addCallback(unittest.assertEqual, (1, 1212))
>
> def testStatError(self):
> p, t = setUp()
> d = p.stat()
> self.assertEquals(t.value(), "STAT\r\n")
> p.dataReceived("-ERR This server is lame!\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "This server is lame!")
>
> def testNoop(self):
> p, t = setUp()
> d = p.noop()
> self.assertEquals(t.value(), "NOOP\r\n")
> p.dataReceived("+OK No-op to you too!\r\n")
> return d.addCallback(unittest.assertEqual, "No-op to you too!")
>
> def testNoopError(self):
> p, t = setUp()
> d = p.noop()
> self.assertEquals(t.value(), "NOOP\r\n")
> p.dataReceived("-ERR This server is lame!\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "This server is lame!")
>
> def testRset(self):
> p, t = setUp()
> d = p.rset()
> self.assertEquals(t.value(), "RSET\r\n")
> p.dataReceived("+OK Reset state\r\n")
> return d.addCallback(unittest.assertEqual, "Reset state")
>
> def testRsetError(self):
> p, t = setUp()
> d = p.rset()
> self.assertEquals(t.value(), "RSET\r\n")
> p.dataReceived("-ERR This server is lame!\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "This server is lame!")
>
> def testDelete(self):
> p, t = setUp()
> d = p.delete(3)
> self.assertEquals(t.value(), "DELE 4\r\n")
> p.dataReceived("+OK Hasta la vista\r\n")
> return d.addCallback(unittest.assertEqual, "Hasta la vista")
>
> def testDeleteError(self):
> p, t = setUp()
> d = p.delete(3)
> self.assertEquals(t.value(), "DELE 4\r\n")
> p.dataReceived("-ERR Winner is not you.\r\n")
> exc = self.assertRaises(ServerErrorResponse, unittest.wait, d)
> self.assertEquals(exc.args[0], "Winner is not you.")
>
>
>class SimpleClient(POP3Client):
> def __init__(self, deferred, contextFactory = None):
> POP3Client.__init__(self, contextFactory)
> self.deferred = deferred
> self.allowInsecureLogin = True
>
> def serverGreeting(self, challenge):
> self.deferred.callback(None)
>
>class POP3HelperMixin:
> serverCTX = None
> clientCTX = None
>
> def setUp(self):
> d = defer.Deferred()
> self.server = pop3TestServer.POP3TestServer(contextFactory=self.serverCTX)
> self.client = SimpleClient(d, contextFactory=self.clientCTX)
> self.client.timeout = 30
> self.connected = d
>
> def tearDown(self):
> del self.server
> del self.client
> del self.connected
>
> def _cbStopClient(self, ignore):
> self.client.transport.loseConnection()
>
> def _ebGeneral(self, failure):
> self.client.transport.loseConnection()
> self.server.transport.loseConnection()
> failure.printTraceback(open('failure.log', 'w'))
> failure.printTraceback()
> raise failure.value
>
> def loopback(self):
> loopback.loopbackTCP(self.server, self.client, noisy=False)
>
>class POP3TLSTestCase(POP3HelperMixin, unittest.TestCase):
> serverCTX = ServerTLSContext and ServerTLSContext()
> clientCTX = ClientTLSContext and ClientTLSContext()
>
> def testStartTLS(self):
> def login():
> #this will startTLS automatically
> return self.client.login('test', 'twisted')
>
> def quit():
> return self.client.quit()
>
> methods = [login, quit]
> map(self.connected.addCallback, map(strip, methods))
> self.connected.addCallback(login)
> self.connected.addCallback(quit)
> self.connected.addCallbacks(self._cbStopClient, self._ebGeneral)
> self.loopback()
>
>class POP3TimeoutTestCase(POP3HelperMixin, unittest.TestCase):
> def testTimeout(self):
> def login():
> #this will startTLS automatically
> d = self.client.login('test', 'twisted')
> d.addErrback(timedOut)
> return d
>
> def timedOut(failure):
> self._cbStopClient(None)
> failure.trap(error.TimeoutError)
>
> def quit():
> return self.client.quit()
>
> self.client.timeout = 3
> #No need to leverage SSL for timeout test
> pop3TestServer.SSL_SUPPORT = False
>
> #Tell the server to not return a response to client.
> #This will trigger a timeout.
> pop3TestServer.TIMEOUT_RESPONSE = True
>
> methods = [login, quit]
> map(self.connected.addCallback, map(strip, methods))
> self.connected.addCallback(login)
> self.connected.addCallback(quit)
> self.connected.addCallback(self._cbStopClient)
> self.connected.addErrback(self._ebGeneral)
> self.loopback()
>
>if ClientTLSContext is None:
> for case in (POP3TLSTestCase,):
> case.skip = "OpenSSL not present"
>
>
>------------------------------------------------------------------------
>
>_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
>
>Open Source Applications Foundation "Dev" mailing list
>http://lists.osafoundation.org/mailman/listinfo/dev
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.osafoundation.org/pipermail/dev/attachments/20050606/8f88044e/attachment.htm
More information about the Dev
mailing list