""" open/DurusWorks/qp/mail/send.py """ from base64 import encodestring from smtplib import SMTP from qp.lib.spec import unicode_string from qp.lib.util import rand_str from qp.mail.rfc822_mailbox import rfc822_mailbox, encode_header from qp.pub.common import get_publisher import socket import sys class Email (object): def __init__(self): self.set_subject('') self.set_body('') webmaster = self.get_webmaster_address() self.set_from_address(webmaster) self.set_smtp_sender(webmaster) self.set_to([webmaster]) self.set_cc([]) self.set_bcc([]) self.set_reply_to([]) self.set_extra_headers([]) self.set_smtp_recipients([]) self.set_timeout(10) self.sent = False self.mime_type = None self.charset = None self.boundary = None self.max_header_recipients = 10 def set_subject(self, subject): self.subject = encode_header(subject) def get_subject(self): return self.subject def set_max_header_recipients(self, number): self.max_header_recipients = number def set_mime_type(self, mime_type): self.mime_type = mime_type def get_mime_type(self): return self.mime_type def set_charset(self, charset): self.charset = charset def get_charset(self): return self.charset def set_boundary(self, boundary): self.boundary = boundary def get_boundary(self): return self.boundary def set_body(self, body): if type(body) is type(''): self.body = body elif isinstance(body, unicode_string): self.body = body.encode('utf-8') elif isinstance(body, list): # The elements of the list are complete mime parts including headers # as,for example: # Content-Type: text/html; charset="utf-8" # Content-Disposition: inline # # if not self.boundary: self.set_boundary("--%s-MultiPart-Mime-Boundary" % rand_str(20)) mark = '\n--%s\n' % self.get_boundary() end_mark = '\n--%s--\n' % self.get_boundary() self.body = mark + mark.join(body) + end_mark else: self.body = str(body) def get_body(self): return self.body def set_from_address(self, address): self.from_address = rfc822_mailbox(address) def get_from_address(self): return self.from_address def set_reply_to(self, addresses): self.reply_to = self.mailboxes(addresses) def get_reply_to(self): return self.reply_to def set_smtp_sender(self, address): self.smtp_sender = rfc822_mailbox(address) def get_smtp_sender(self): return self.smtp_sender def set_to(self, addresses): self.to = self.mailboxes(addresses) def get_to(self): return self.to def set_cc(self, addresses): self.cc = self.mailboxes(addresses) def get_cc(self): return self.cc def set_bcc(self, addresses): self.bcc = self.mailboxes(addresses) def get_bcc(self): return self.bcc def set_extra_headers(self, headers): self.extra_headers = [str(header) for header in headers] def get_extra_headers(self): return self.extra_headers def set_smtp_recipients(self, addresses): self.smtp_recipients = self.mailboxes(addresses) def get_smtp_recipients(self): if self.smtp_recipients: return self.smtp_recipients else: return self.get_to() + self.get_cc() + self.get_bcc() def get_bulk_headers(self): to = self.get_to() if len(to) == 1 and to == [self.get_webmaster_address()]: return [] else: return ["Precedence: junk", "X-No-Archive: yes"] def get_smtp_server(self): return get_publisher().get_smtp_server() def format_headers(self): headers = ["From: %s" % self.get_from_address().format(), "Subject: %s" % self.get_subject()] def address_headers(field_name, addresses): if not addresses: return if len(addresses) == 1: headers.append("%s: %s" % (field_name, addresses[0].format())) elif len(addresses) <= self.max_header_recipients: headers.append("%s: %s," % (field_name, addresses[0].format())) for address in addresses[1:-1]: headers.append(" %s," % address.format()) headers.append(" %s" % addresses[-1].format()) else: headers.append( "%s: (long recipient list suppressed) : ;" % field_name) address_headers("To", self.get_to()) address_headers("Cc", self.get_cc()) address_headers("Bcc", self.get_bcc()) address_headers("Reply-To", self.get_reply_to()) if self.get_mime_type(): content_type = "Content-Type: %s" % self.get_mime_type() if self.get_charset(): content_type += '; charset="%s"' % self.get_charset() if self.get_boundary(): content_type += ';\n\tboundary="%s"' % self.get_boundary() headers.append(content_type) headers += self.get_extra_headers() headers += self.get_bulk_headers() return '\n'.join(headers) + '\n\n' def get_smtp_sender(self): return self.smtp_sender def set_smtp_sender(self, address): self.smtp_sender = rfc822_mailbox(address) def set_timeout(self, timeout): self.timeout = timeout def is_email_enabled(self): return get_publisher().is_email_enabled() def get_webmaster_address(self): return rfc822_mailbox(get_publisher().get_webmaster_address()) def get_debug_address(self): return rfc822_mailbox(get_publisher().get_debug_address()) def mailboxes(self, addresses): return [rfc822_mailbox(address) for address in addresses if address] def send_as_is_without_conditions(self, recipients): message = self.format_headers() + self.get_body() sender = self.get_smtp_sender().get_addr_spec() server = self.get_smtp_server() assert server assert sender assert recipients assert message try: smtp = SMTP(server, timeout=self.timeout) smtp.sendmail(sender, recipients, message) smtp.quit() except socket.error: e = sys.exc_info()[1] print("Email.send() failed: %s" % e) raise e self.sent = True return True def send(self): assert not self.sent # Make sure duplicates are not sent. if not self.is_email_enabled(): return False debug_address = self.get_debug_address() smtp_recipients = self.get_smtp_recipients() if debug_address: recipients = [debug_address.get_addr_spec()] self.subject = '[DEBUG] ' + self.subject self.extra_headers.append('X-Original-Recipients: %s' % ( ', '.join([address.format() for address in smtp_recipients]))) else: assert get_publisher().is_live_host() recipients = [address.get_addr_spec() for address in smtp_recipients] return self.send_as_is_without_conditions(recipients) def sendmail(subject, msg_body, to_addrs, from_addr=None, reply_to=None, smtp_sender=None, cc_addrs=None, bcc_addrs=None, extra_headers=None, mime_type="text/plain", charset='utf-8', smtp_recipients=None, max_header_recipients=None): email = Email() email.set_subject(subject) email.set_body(msg_body) email.set_to(to_addrs) if from_addr: email.set_from_address(from_addr) if reply_to: email.set_reply_to(reply_to) if smtp_sender: email.set_smtp_sender(smtp_sender) if cc_addrs: email.set_cc(cc_addrs) if bcc_addrs: email.set_bcc(bcc_addrs) if mime_type: email.set_mime_type(mime_type) if charset: email.set_charset(charset) if extra_headers: email.set_extra_headers(extra_headers) if smtp_sender: email.set_smtp_sender(smtp_sender) if smtp_recipients: email.set_smtp_recipients(smtp_recipients) if max_header_recipients: email.set_max_header_recipients(max_header_recipients) return email.send() def msg_part( mime_type='text/plain', charset='utf-8', encode=True, filename=None, body=''): result = '' if mime_type: result += "Content-Type: %s" % mime_type if charset: result += '; charset="%s"' % charset if filename: result += ';\n\tname="%s"' % filename result += '\n' result += 'Content-Disposition: ' if filename is None: result += 'inline' else: result += 'attached; filename="%s"' % filename result += '\n' if encode: result += 'Content-Transfer-Encoding: base64\n' body = encodestring(body) result += '\n' result += body return result