小灯具网站建设方案提高关键词排名的软文案例
github地址:
https://github.com/CaLlMeErIC/MailDetective
因为需求需要写一个简单的邮件检测系统的框架,这里记录下思路
首先第一反应,这个检测系统不应该是各个邮件收件系统都有自带的吗,于是搜索了下是否有相关的邮件检测开源软件,发现有两个比较靠谱的:
1.spamassassin
https://spamassassin.apache.org/
2.mmpi
https://github.com/a232319779/mmpi
3.其他一大堆使用机器学习的nlp方法,论文保存在git的misc文件下了
发现实际mmpi比较符合我的需求,但是看了下感觉规则有点少了而且是几年前的,
spamassassin规则很丰富,而且一直在更新,但是也有问题:它的规则很大一部分是根据英文关键词和域名设计的,如果是中文的话需要再做适配,还有就是他的很多规则需要联网或者自定义黑白名单,我的想法是尽量做一个离线也能使用的脚本框架,于是借鉴
spamassassin的方式,把它的正则表达式规则移植过来。它的中心思想是,如果邮件命中了规则的话就给邮件加上对应的报警分数,当分数达到一定阈值的时候就报警。
我是尝试使用了python自带的email包来读取邮件,然后把各个字段以字典的形式保存下来,到时候对应的检测的时候直接检测对应字段即可
import email.header
import os
from email.parser import Parser
import json
import re
from OSutils import loadJsonclass MailReader(object):"""用于读取eml文件,并把各个字段保存下来使用python3.7内置的email包"""def __init__(self, eml_path="", debug=False):"""初始化属性"""self.raw_email = Noneself.email_content = Noneself.process_log = ""self.debug = debugself.attribute_dict = {}self.mail_text = ""self.all_links = []self.urls = []self.tag = set()self.flag = set()if eml_path:self.__MailReader(eml_path)@staticmethoddef decodeHeader(header_str):"""输入需要解码的header字符串,返回解码结果"""temp = email.header.decode_header(header_str)result = email.header.make_header(temp)return resultdef addTag(self, tag):"""给邮件添加标签,以字符串的形式存放在列表中"""self.tag.add(tag)return self.tagdef addFlag(self, flag):"""给邮件添加自定义的检测标记tag用于输出,flag用于规则检测"""self.flag.add(flag)return self.flagdef toString(self):"""打印整个邮件以及日志"""print("email内容:", self.email_content)if self.debug:print("process_log:", self.process_log)return self.email_contentdef toDict(self):"""把header转换为字典形式,From,To,Subject需要单独解码字典的键统一小写"""each_key: strall_str = []if self.attribute_dict != {}:return self.attribute_dictfor each_key in set(self.email_content.keys()):self.attribute_dict.update({each_key.lower(): self.email_content.get_all(each_key)})all_str += self.email_content.get_all(each_key)for each_key in ["From", "To", "Subject"]:temp = []if each_key not in self.attribute_dict:continuefor each_str in self.attribute_dict.get(each_key):each_str = str(self.decodeHeader(each_str))temp.append(each_str)self.attribute_dict.update({each_key.lower(): temp})self.attribute_dict.update({'body': self.getContent()})self.attribute_dict.update({'url': self.getUrls()})self.attribute_dict.update({'all': all_str})return self.attribute_dictdef toJson(self):"""把字典转换为json格式"""if self.attribute_dict == {}:self.attribute_dict = self.toDict()return json.dumps(self.attribute_dict)def __MailReader(self, eml_path):"""读取邮件,有些邮件开头会混入无用字符,需要去除才能提取信息"""try:if os.path.exists(eml_path):with open(eml_path, encoding='utf-8', errors='ignore') as fp:self.raw_email = fp.read()cut_len = 0for each_line in self.raw_email.split('\n'):if ':' not in each_line:cut_len += len(each_line) + 1else:breakif cut_len:self.raw_email = self.raw_email[cut_len:]self.email_content = Parser().parsestr(self.raw_email)except Exception as e:self.process_log += "读取邮件失败:" + str(e)self.toString()return selfdef parseMail(self, eml_path):"""输入邮件路径,用email库整理邮件"""self.attribute_dict = {}return self.__MailReader(eml_path)def getContent(self):"""循环遍历数据块并尝试解码,暂时只处理text数据"""all_content = []for par in self.email_content.walk():if not par.is_multipart(): # 这里要判断是否是multipart,是的话,里面的数据是无用的str_charset = par.get_content_charset(failobj=None) # 当前数据块的编码信息if str_charset is None:self.addTag("没有获取到部分内容的charset")self.addFlag("NO_CHARSET")continuestr_content_type = par.get_content_type()if str_content_type in ('text/plain', 'text/html'):try:content = par.get_payload(decode=True)all_content.append(content.decode(str_charset))except Exception as e:print(e)self.mail_text = all_contentreturn all_contentdef getUrls(self):"""获取所有的url链接,与getLinks不一样的是,getUrls的返回值是一个字符串列表"""if self.urls:return self.urlsself.getLinks()return self.urlsdef getLinks(self):"""通过正则表达式,匹配超链接以及显示的属性内容,格式如下[('https://rashangharper.com/wp-admin/user/welllz/display/login.html', 'wellsfargo.com')]"""if self.all_links:return self.all_linksall_links = []self.urls = []if self.mail_text == "":self.getContent()pattern = '<a.*?href="(.+)".*?>(.*?)</a>'for part in self.mail_text:links = re.findall(pattern, part, re.IGNORECASE)all_links += linksself.all_links = all_linksfor each_link in all_links:self.urls += list(each_link)return all_linksif __name__ == '__main__':a = MailReader("fakeherf.eml").toDict().get('date')[0]
然后是规则设计部分,也是参考spamassassin的规则, 分为简单规则和复杂规则,简单规则,直接用正则表达式检查邮件对应的部分即可
复杂规则的话,我是写成了python类的形式,然后在运行的时候动态加载,放在metarules文件夹里,类似于下面这样的:
class CheckMail(object):"""检查邮件中,From是否和存在的真实发件源不一样"""def __init__(self, input_mail):self.reader = input_mailself.score = 2.5self.description = "检查邮件中,From和存在的真实发件源不一样"@staticmethoddef list2str(data_list):"""把字符串列表转换成字符串"""if isinstance(data_list, list):result = ""for each_str in data_list:result += each_str + " "return result[:-1]return data_listdef getReport(self):"""检测邮件的From字段,option_sender中是几种可能的真实发件人字段"""header_dict = self.reader.toDict()if header_dict.get('from'):mail_from = self.list2str(header_dict.get('from'))else:self.reader.addTag("发件人缺失")return True, [self.score, "未检测到发件人字段"]for option_sender in ["x-mail-from", "return-path", "x-qq-orgsender", "sender"]:if option_sender in header_dict:for each_option_sender in header_dict.get(option_sender):if each_option_sender not in mail_from:self.reader.addTag("疑似伪造的发件人")return True, [self.score, self.description]return False, []
其他内容及readme详见github