// MessageParser.sqlj // part of the webmail ACS module // written by Jin Choi // 2000-03-01 // ported to openacs by Dan Wickstrom // 2000-06-18 // This class provides some static methods for parsing RFC822 messages // into a Postgresql database. import java.sql.*; import java.io.*; import javax.mail.*; import javax.mail.internet.*; import java.util.*; import java.text.DateFormat; import java.text.ParseException; import acspg.*; import nsjava.*; public class MessageParser { private static boolean runningOutsideOracleP = false; protected static Session s = null; private static NsLog log; private Pg_Query st; public MessageParser (String qdir) throws ClassNotFoundException, FileNotFoundException, IOException, SQLException, MessagingException { try { log.write("Notice", "Connected."); st = new Pg_Query("subquery"); // Set the flag to tell the system not to delete any messages // after parsing. runningOutsideOracleP = false; processQueue(qdir); log.write("Notice", "Parse succeeded."); st.releasehandle(); } catch (Exception e) { st.releasehandle(); log.write("Error", "Error running the example: " + e.getMessage()); e.printStackTrace(); } } // Process all files in a directory. public void processQueue(String queueDirectory) throws SQLException, ParseException, IOException, FileNotFoundException, MessagingException { // Go through all the files in the queue and attempt to parse them. File qdir = new File(queueDirectory); String[] filenamesToProcess = qdir.list(); for (int i = 0; i < filenamesToProcess.length; i++) { File currentFile = new File(qdir, filenamesToProcess[i]); log.write("Debug", "Processing " + currentFile.getAbsolutePath()); if (currentFile.isFile() == false) { log.write("Debug", "Not a file!"); continue; } try { // need to make sure that not holding a lock for parsing all // of the files is o.k. I think it was only done to act as // a kind of a mutex for detecting if the parse routine was // already running. I've moved the locking into tcl code, so // that only one parse job can run at a time. st.executeUpdate("begin transaction"); parseMessageFromFile(currentFile); if (runningOutsideOracleP == false) { st.executeUpdate("commit"); currentFile.delete(); } else { st.executeUpdate("rollback transaction"); } } catch (Exception e) { st.executeUpdate("rollback transaction"); recordParseError(currentFile, e); } } } protected void recordParseError(File currentFile, Exception e) throws SQLException { // We don't want to quit for parse errors. Integer n_already_reported = new Integer(0); String filename = currentFile.getAbsolutePath(); n_already_reported = new Integer(st.databaseToJavaString("select count(*) from wm_parse_errors where filename = '" + filename + "';")); if (n_already_reported.intValue() == 0) { String errmsg = stackTrace(e); st.executeUpdate("insert into wm_parse_errors (filename, error_message, first_parse_attempt) values ('" + filename + "','" + errmsg + "', sysdate())"); } } // Process a single file. public void parseMessageFromFile(String filename) throws MessagingException, SQLException, FileNotFoundException, IOException, ParseException { parseMessageFromFile(new File(filename)); } public void parseMessageFromFile(File file) throws MessagingException, SQLException, FileNotFoundException, IOException, ParseException { // Open the file. BufferedInputStream is = new BufferedInputStream(new FileInputStream(file)); // Get the headers as an enumeration of Header objects. InternetHeaders ih = new InternetHeaders(is); Enumeration headers = ih.getAllHeaders(); // Create new record in wm_messages and grab the CLOB to stuff with the body. Integer msgId; Integer lob_id; log.write("Debug", "Inserting into wm_messages..."); msgId = new Integer(st.databaseToJavaString("select nextval('wm_msg_id_sequence') from dual")); lob_id = new Integer(st.databaseToJavaString("select empty_lob()")); st.executeUpdate("insert into wm_message_lobs (msg_id,lob) values(" + msgId.intValue() + "," + lob_id.intValue() + ")"); st.executeUpdate("insert into wm_messages (msg_id,body) values(" + msgId.intValue() + "," + lob_id.intValue() + ")"); BLOB body = new BLOB(lob_id,st); // the blob handles inserting the string into the db. is = new BufferedInputStream(new FileInputStream(file)); copyInputStreamToBlob(is, body); log.write("Debug", "writing to the database"); body.writeToDatabase(); log.write("Debug", "done"); // Insert the headers into wm_headers. insertHeaders(msgId.intValue(), headers); String contentType = ih.getHeader("Content-Type", null); if (contentType != null) { contentType = contentType.toLowerCase(); if (true) { //if (contentType.indexOf("text") == -1) { // Reopen the file to pass to parseMIME. is = new BufferedInputStream(new FileInputStream(file)); // If message is a MIME message and is not plain text, save // text to wm_messages.mime_text and save attachments to directory. parseMIME(msgId.intValue(), is); } } // "Deliver" the message by inserting into wm_message_mailbox_map. log.write("Debug", "delivering the message"); deliverMessage(msgId.intValue(), ih.getHeader("Delivered-To", null)); } private void insertHeaders(int msgId, Enumeration headers) throws SQLException, ParseException { int sortOrder = 0; boolean receivedSeenP = false; while (headers.hasMoreElements()) { Header h = (Header) headers.nextElement(); String name = h.getName(); String lowerName = name.toLowerCase(); String value = h.getValue(); if ((lowerName.equals("from") || lowerName.equals("return-path")) && value.length() > 0) { try { // Stuff email_value and name_value fields. InternetAddress[] addresses = InternetAddress.parse(value); String email = addresses[0].getAddress(); String fullName = addresses[0].getPersonal(); st.executeUpdate("INSERT INTO wm_headers (msg_id, name, lower_name, value, email_value, name_value, sort_order) VALUES (" + msgId + ",'" + name + "','" + lowerName + "','" + value + "','" + email + "','" + fullName + "'," + sortOrder + ")"); } catch (AddressException ae) { // Couldn't parse it as an address; just store the value. st.executeUpdate("INSERT INTO wm_headers (msg_id, name, lower_name, value, sort_order) VALUES (" + msgId + ",'" + name + "','" + lowerName + "','" + value + "'," + sortOrder + ")"); } } else if (lowerName.equals("date") && value.length() > 0) { // Stuff date values into date_value field. try { Timestamp d = parseDate(value); st.executeUpdate("INSERT INTO wm_headers (msg_id, name, lower_name, value, time_value, sort_order) VALUES (" + msgId + ",'" + name + "','" + lowerName + "','" + value + "','" + d + "'," + sortOrder + ")"); } catch (Exception pe) { st.executeUpdate("INSERT INTO wm_headers (msg_id, name, lower_name, value, sort_order) VALUES (" + msgId + ",'" + name + "','" + lowerName + "','" + value + "','" + sortOrder + ")"); } } else if (lowerName.equals("received") && !receivedSeenP) { // Only parse the first Received header, the one qmail tacked on. // Others will often be nastily formatted. receivedSeenP = true; String timestamp = value.substring(value.lastIndexOf(";") + 1); try { Timestamp d = parseDate(timestamp); st.executeUpdate("INSERT INTO wm_headers (msg_id, name, lower_name, value, time_value, sort_order) VALUES (" + msgId + ",'" + name + "','" + lowerName + "','" + value + "','" + d + "'," + sortOrder + ")"); } catch (Exception pe) { } } else { if (lowerName.equals("message-id") && value.length() > 0) { st.executeUpdate("UPDATE wm_messages SET message_id = '" + value + "' WHERE msg_id = " + msgId); } // Random headers (most likely the "To" field) may be >4000 // bytes. Truncate if so. String insertValue = value; if (value.length() > 4000) { insertValue = value.substring(0, 4000); } st.executeUpdate("INSERT INTO wm_headers (msg_id, name, lower_name, value, sort_order) VALUES (" + msgId + ",'" + name + "','" + lowerName + "','" + insertValue + "'," + sortOrder + ")"); } // If this is a recipient field, then parse it and insert it into recipients. if (lowerName.equals("to") || lowerName.equals("cc")) { try { InternetAddress[] recipients = InternetAddress.parse(value); for (int i = 0; i < recipients.length; i++) { InternetAddress recipient = recipients[i]; String email = recipient.getAddress(); String fullName = recipient.getPersonal(); st.executeUpdate("INSERT INTO wm_recipients (msg_id, header, email, name) VALUES (" + msgId + ",'" + name + "','" + email + "','" + fullName + "')"); } } catch (Exception e) { // do nothing } } sortOrder++; } } // Map recipient to ACS user and insert row for that user's INBOX. private void deliverMessage(int msgId, String lastDeliveredTo) throws SQLException { if (lastDeliveredTo != null) { st.executeUpdate("insert into wm_message_mailbox_map (msg_id, mailbox_id) " + "select " + msgId + ", mailbox_id " + "from wm_email_user_map eum, wm_domains d, wm_mailboxes m " + "where d.short_name = eum.domain " + "and m.name = 'INBOX' " + "and m.creation_user = eum.user_id " + "and 'webmail-' || eum.domain || '-' || email_user_name || '@' || d.full_domain_name = '" + lastDeliveredTo + "'"); } } // Utility procedure to write an InputStream to a CLOB. protected void copyInputStreamToClob(InputStream is, CLOB to) throws IOException, SQLException { OutputStream os = to.getAsciiOutputStream(); int chunk = to.getChunkSize(); byte[] copyBuffer = new byte[chunk]; int bytesRead = 0; log.write("Debug", "Entering copyInputStreamToClob"); while ((bytesRead = is.read(copyBuffer)) > 0) { os.write(copyBuffer, 0, bytesRead); log.write("Debug", "wrote " + bytesRead + " bytes"); } log.write("Debug", "done copying"); os.flush(); os.close(); is.close(); log.write("Debug", "exiting copyInputStreamToClob"); } // Same, for BLOBs. public void copyInputStreamToBlob(InputStream is, BLOB to) throws IOException, SQLException { OutputStream os = to.getBinaryOutputStream(); int chunk = to.getChunkSize(); byte[] copyBuffer = new byte[chunk]; int bytesRead; log.write("Debug", "Entering copyInputStreamToBlob"); while ((bytesRead = is.read(copyBuffer)) > 0) { os.write(copyBuffer, 0, bytesRead); } os.flush(); os.close(); is.close(); } // Utility procedure for parsing timestamps. Java date parsing // wayyyy sucks; this is the simplest method that seems to work // most of the time. public Timestamp parseDate(String s) throws ParseException { // This DateFormat stuff doesn't work so great. // DateFormat df = DateFormat.getDateTimeInstance(DateFormat.MEDIUM, // DateFormat.FULL); log.write("Debug", "Attempting to parse date: " + s); return new java.sql.Timestamp(Timestamp.parse(s)); } // Parses a MIME message, inserts text into wm_messages.mime_text, and unpacks // attachments into wm_attachments. public void parseMIME(int msgId, InputStream is) throws MessagingException, SQLException, IOException { // Parse the message. if (s == null) { Properties props = new Properties(); s = Session.getDefaultInstance(props, null); } MimeMessage msg = new MimeMessage(s, is); is.close(); log.write("Debug", "Message type is " + msg.getContentType()); // Buffer we're going to store up text bits in. StringBuffer text = new StringBuffer(); // Wrap partNumber in an array so we can pass by reference. int[] partNumber = new int[1]; partNumber[0] = 0; try { dumpPart(msgId, msg, text, partNumber); } catch (Exception e) { // If dumpPart fails, then just treat the message as text. return; } String textStr = text.toString(); // System.out.println("text = " + textStr); //System.out.println("Parsed MIME text is:\n" + textStr); if (textStr.length() > 0) { BLOB mimeText = null; ByteArrayInputStream sbis = new ByteArrayInputStream(textStr.getBytes()); Integer lob_id = new Integer(st.databaseToJavaString("select empty_lob()")); mimeText = new BLOB(lob_id,st); log.write("Debug", "lob id = " + lob_id + ", msg id = " + msgId); st.executeUpdate("insert into wm_message_lobs (lob,msg_id) values(" + lob_id + "," + msgId + ")"); st.executeUpdate("update wm_messages set mime_text = " + lob_id + " where msg_id = " + msgId); copyInputStreamToBlob(sbis, mimeText); mimeText.writeToDatabase(); } } // Writes text representation of part to text buffer and saves // attachment data to wm_attachments. partNumber is for creating // unique identifiers if filename is not specified in the part. protected void dumpPart(int msgId, Part p, StringBuffer text, int[] partNumber) throws MessagingException, SQLException, IOException { Object o = p.getContent(); log.write("Debug", "Part is " + o.getClass().getName()); if (o instanceof java.lang.String) { //System.out.println("the string = " + o); text.append(o); return; } if (o instanceof javax.mail.Multipart) { Multipart mp = (Multipart) o; int count = mp.getCount(); for (int i = 0; i < count; i++) { dumpPart(msgId, mp.getBodyPart(i), text, partNumber); } return; } if (o instanceof javax.mail.internet.MimeMessage) { MimeMessage msg = (MimeMessage) o; text.append('\n'); Enumeration e = msg.getAllHeaderLines(); while (e.hasMoreElements()) { String line = (String) e.nextElement(); text.append(line); text.append('\n'); } dumpPart(msgId, msg, text, partNumber); return; } if (o instanceof java.io.InputStream ) { InputStream is = (InputStream) o; String filename = null; try { filename = p.getFileName(); } catch (MessagingException mex) { // System.out.println(mex.getMessage()); } log.write("Debug", "filename = " + filename); if (filename == null || filename.length() == 0) { filename = "" + partNumber[0]++; } // Write out place holders for links. if (p.isMimeType("image/*")) { text.append("##wm_image: " + filename + "\n"); } else { text.append("##wm_part: " + filename + "\n"); } String contentType = p.getContentType(); // use only primary type and sub type int firstSemicolonLocation = contentType.indexOf(";"); if (firstSemicolonLocation != -1) { contentType = contentType.substring(0, firstSemicolonLocation); } try { st.executeUpdate("insert into wm_attachments (msg_id, filename, content_type, lob) values (" + msgId + ",'" + filename + "','" + contentType + "', empty_lob())"); Integer data = new Integer(st.databaseToJavaString("select lob from wm_attachments where msg_id = " + msgId + " and filename = '" + filename + "'")); BLOB b = new BLOB(data,st); copyInputStreamToBlob(is, b); b.writeToDatabase(); try { b.finalize(); } catch (Throwable t) { throw new SQLException("temp file cleanup error: " + t.getMessage()); } } catch (SQLException e) { if (e.getErrorCode() == 1) { // Unique constraint violated. // Most likely, filename was same as another filename // in the message. Append number to it and try again. filename += " (" + partNumber[0]++ + ")"; st.executeUpdate("insert into wm_attachments (msg_id, filename, content_type, lob) values (" + msgId + ",'" + filename + "','" + contentType + "', empty_lob())"); Integer data = new Integer(st.databaseToJavaString("select lob from wm_attachments where msg_id = " + msgId + " and filename = '" + filename + "'")); BLOB b = new BLOB(data,st); copyInputStreamToBlob(is, b); b.writeToDatabase(); try { b.finalize(); } catch (Throwable t) { throw new SQLException("temp file cleanup error: " + t.getMessage()); } } else { throw(e); } } } } // Utility method to return stack trace from an Exception. public String stackTrace(Exception e) { CharArrayWriter caw = new CharArrayWriter(); PrintWriter pw = new PrintWriter(caw); e.printStackTrace(pw); pw.flush(); return caw.toString(); } public static void instructions() { log.write("Debug", "\nThis example tests the basic webmail message parsing\n"); log.write("Debug", "Useage:\n java MessageParser jdbc:postgresql:database user password [debug]\n\nThe debug field can be anything. It's presence will enable DriverManager's\ndebug trace. Unless you want to see screens of items, don't put anything in\nhere."); System.exit(1); } public static void process_queue(String args[]) { String queueDir = args[0] + "/new"; log = new NsLog(); log.write("Debug", "openacs test of webmail port\n"); // Now run the parser try { MessageParser parser = new MessageParser(queueDir); log.write("Debug", "done parsing in process_queue"); } catch(Exception ex) { log.write("Error", "Exception caught.\n"+ex); ex.printStackTrace(); } } }