[Commits] (bkirsch) second pass added auth and starttls support
commits at osafoundation.org
commits at osafoundation.org
Thu Sep 30 14:17:35 PDT 2004
Commit by: bkirsch
Modified files:
chandler/parcels/osaf/mail/tests/smtp_test_server/smtpTestServer.py 1.2 1.3
Log message:
second pass added auth and starttls support
ViewCVS links:
http://cvs.osafoundation.org/index.cgi/chandler/parcels/osaf/mail/tests/smtp_test_server/smtpTestServer.py.diff?r1=text&tr1=1.2&r2=text&tr2=1.3
Index: chandler/parcels/osaf/mail/tests/smtp_test_server/smtpTestServer.py
diff -u chandler/parcels/osaf/mail/tests/smtp_test_server/smtpTestServer.py:1.2 chandler/parcels/osaf/mail/tests/smtp_test_server/smtpTestServer.py:1.3
--- chandler/parcels/osaf/mail/tests/smtp_test_server/smtpTestServer.py:1.2 Thu Sep 30 12:19:43 2004
+++ chandler/parcels/osaf/mail/tests/smtp_test_server/smtpTestServer.py Thu Sep 30 14:17:34 2004
@@ -3,7 +3,6 @@
from twisted.protocols import basic
from twisted.internet import reactor
-
"""
TESTS:
1. Connection error
@@ -16,13 +15,21 @@
8. No TLS
9. No EHLO
10. Transfer Error
+11. Invalid login
+12. Bad auth string from client
"""
PORT = 2500
ACCEPT_LIST = ["brian at test.com", "osafuser at code-bear.com", "brian at localhost"]
+"""Base 64 encoding of username: testuser passworf: testuser
+[0] = username\0password form
+[1] = username\0username\0password
+"""
+LOGIN_PLAIN_BASE_64 = ["DGVZDHVZZXIADGVZDHVZZXI=", "dGVzdHVzZXIAdGVzdHVzZXIAdGVzdHVzZXI="]
+
"""Support Flags"""
-ESMTP_SUPPORT = True
+EHLO_SUPPORT = True
SSL_SUPPORT = True
AUTH_SUPPORT = True
@@ -34,11 +41,20 @@
DISCONNECT_STRING = "221 Bye"
OK_STRING = "250 OK"
DATA_STRING = "354 End data with <CR><LF>.<CR><LF>"
+AUTH_SUCCESS = "235 Authenication Successful"
+
+"""ERRORS"""
+NO_RELAY = "550 relaying prohibited by administrator"
+NO_USER = "550 recipient address denied"
+UNKNOWN_COMMAND = "502 command not implemented"
+MALFORMED_AUTH = "501 Authenication Failed: mailformed initial response"
+UNSUPPORTED_AUTH = "504 Unsupported authenication mechanism"
+AUTH_DECLINED = "535 authentication failed"
+
CAPABILITIES = [
"250-localhost",
-"250-PIPELINING",
"250-SIZE",
"250-VRFY",
"250-ETRN",
@@ -47,47 +63,51 @@
]
CAPABILITIES_SSL = "250-STARTTLS"
-CAPABILITIES_AUTH = "250-AUTH LOGIN PLAIN"
+CAPABILITIES_AUTH = "250-AUTH PLAIN"
+
+def config(ehlo, ssl, auth):
+ global EHLO_SUPPORT, SSL_SUPPORT, AUTH_SUPPORT
+ EHLO_SUPPORT = ehlo
+ SSL_SUPPORT = ssl
+ AUTH_SUPPORT = auth
-"""MAY want to be a line reciever"""
class SMTPTestServer(basic.LineReceiver):
def __init__(self):
self.in_data = False
+ self.caps = None
def sendCapabilities(self, helo=False):
- for cap in CAPABILITIES:
- self.sendLine(cap)
+ if self.caps is None:
+ self.caps = []
+
+ if AUTH_SUPPORT and not helo:
+ self.caps.append(CAPABILITIES_AUTH)
- if AUTH_SUPPORT and not helo:
- self.sendLine(CAPABILITIES_AUTH)
+ if SSL_SUPPORT and not helo:
+ self.caps.append(CAPABILITIES_SSL)
- if SSL_SUPPORT and not helo:
- self.sendLine(CAPABILITIES_SSL)
+ for cap in CAPABILITIES:
+ self.caps.append(cap)
+ self.sendLine('\r\n'.join(self.caps))
def connectionMade(self):
self.sendLine(CONNECTION_MADE)
def lineReceived(self, line):
- print "I got ", line
-
if self.in_data:
- print "IN DATA"
if TERMINATOR == line:
self.in_data = False
self.sendLine(OK_STRING)
return
- """COMMAND HANDLING"""
-
if "EHLO" in line.upper():
- if ESMTP_SUPPORT:
+ if EHLO_SUPPORT:
self.sendCapabilities()
else:
- """XXX: Put in bad command"""
- print "NO ELHO SUPPORT"
+ self.sendLine(UNKNOWN_COMMAND)
elif "HELO" in line.upper():
self.sendCapabilities(True)
@@ -104,8 +124,11 @@
self.sendLine(OK_STRING)
else:
- """XXX PUT IN BAD CODE HERE"""
- print "NO ALLOWED USERS"
+ if "MAIL FROM:" in line.upper():
+ self.sendLine(NO_USER)
+
+ else:
+ self.sendLine(NO_RELAY)
elif "DATA" in line.upper():
self.sendLine(DATA_STRING)
@@ -115,8 +138,29 @@
self.sendLine(DISCONNECT_STRING)
self.disconnect()
+ elif "STARTTLS" in line.upper() and SSL_SUPPORT:
+ #XXX: Send a bad TLS handshake
+ pass
+
+ elif "AUTH" in line.upper() and AUTH_SUPPORT:
+ if "PLAIN" in line.upper():
+
+ if line[-1] != '=':
+ """It is not a base64 encoded string"""
+ self.sendLine(MALFORMED_AUTH)
+ return
+
+ for key in LOGIN_PLAIN_BASE_64:
+ if key in line:
+ self.sendLine(AUTH_SUCCESS)
+ return
+
+ self.sendLine(AUTH_DECLINED)
+ else:
+ self.sendLine(UNSUPPORTED_AUTH)
+
else:
- print "UNKNOWN COMMAND RECEIVED: ", line
+ self.sendLine(UNKNOWN_COMMAND)
def disconnect(self):
self.transport.loseConnection()
More information about the Commits
mailing list