Source code for antispam

#!/usr/bin/env python
# Copyright (c) 2015 Peixuan Ding
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import print_function

import re
import os
import sys
import json
from functools import reduce

__version__ = "0.0.10"


[docs]class Model(object): """Save & Load the model in/from the file system using Python's json module. """ DEFAULT_DATA_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "model.json") def __init__(self, file_path=None, create_new=False): """Constructs a Model object by the indicated ``file_path``, if the file does not exist, create a new file and contruct a empty model. :param file_path: (optional) Path for the model file indicated, if path is not indicated, use the built-in model file provided by the author, which is located in the ``antispam`` package folder. :param create_new: (option) Boolean. If ``True``, create an empty model. ``file_path`` will be used when saving the model. If there is an existing model file on the path, the existing model file will be overwritten. """ self.file_path = file_path if file_path else self.DEFAULT_DATA_PATH self.create_new = create_new if self.create_new: self.spam_count_total = 0 self.ham_count_total = 0 self.token_table = {} else: self.spam_count_total, self.ham_count_total, self.token_table = self.load(file_path)
[docs] def load(self, file_path=None): """Load the serialized file from the specified file_path, and return ``spam_count_total``, ``ham_count_total`` and ``token_table``. :param file_path: (optional) Path for the model file. If the path does not exist, create a new one. """ file_path = file_path if file_path else self.DEFAULT_DATA_PATH if not os.path.exists(file_path): with open(file_path, 'a'): os.utime(file_path, None) with open(file_path, 'rb') as f: try: return json.load(f) except: return (0, 0, {})
[docs] def save(self): """Serialize the model using Python's json module, and save the serialized modle as a file which is indicated by ``self.file_path``.""" with open(self.file_path, 'wb') as f: json.dump( (self.spam_count_total, self.ham_count_total, self.token_table), f)
[docs]class Detector(object): """A baysian spam filter :param path: (optional) Path for the model file, will be passes to ``Model`` and construct a ``Model`` object based on ``path``. """ TOKENS_RE = re.compile(r"\$?\d*(?:[.,]\d+)+|\w+-\w+|\w+", re.U) INIT_RATING = 0.4 def __init__(self, path=None, create_new=False): self.model = Model(path, create_new) def _get_word_list(self, msg): """Return a list of strings which contains only alphabetic letters, and keep only the words with a length greater than 2. """ return filter(lambda s: len(s) > 2, self.TOKENS_RE.findall(msg.lower()))
[docs] def save(self): """Save ``self.model`` based on ``self.model.file_path``. """ self.model.save()
[docs] def train(self, msg, is_spam): """Train the model. :param msg: Message in string format. :param is_spam: Boolean. If True, train the message as a spam, if False, train the message as a ham. """ token_table = self.model.token_table if is_spam: self.model.spam_count_total += 1 else: self.model.ham_count_total += 1 for word in self._get_word_list(msg.lower()): if word in token_table: token = token_table[word] if is_spam: token[1] += 1 else: token[0] += 1 else: token_table[word] = [0, 1] if is_spam else [1, 0]
[docs] def score(self, msg): """Calculate and return the spam score of a msg. The higher the score, the stronger the liklihood that the msg is a spam is. :param msg: Message in string format. """ token_table = self.model.token_table hashes = self._get_word_list(msg.lower()) ratings = [] for h in hashes: if h in token_table: ham_count, spam_count = token_table[h] if spam_count > 0 and ham_count == 0: rating = 0.99 elif spam_count == 0 and ham_count > 0: rating = 0.01 elif self.model.spam_count_total > 0 and self.model.ham_count_total > 0: ham_prob = float(ham_count) / float( self.model.ham_count_total) spam_prob = float(spam_count) / float( self.model.spam_count_total) rating = spam_prob / (ham_prob + spam_prob) if rating < 0.01: rating = 0.01 else: rating = self.INIT_RATING else: rating = self.INIT_RATING ratings.append(rating) if (len(ratings) == 0): return 0 if (len(ratings) > 20): ratings.sort() ratings = ratings[:10] + ratings[-10:] product = reduce(lambda x, y: x * y, ratings) alt_product = reduce(lambda x, y: x * y, map(lambda r: 1.0 - r, ratings)) return product / (product + alt_product)
def is_spam(self, msg): """Decide whether the message is a spam or not. """ return self.score(msg) > 0.9
module = sys.modules[__name__]
[docs]def score(msg): """Score the message based on the built-in model. :param msg: Message to be scored in string format. """ if hasattr(module, 'obj'): detector = getattr(module, 'obj') return detector.score(msg) else: detector = Detector() setattr(module, 'obj', detector) return detector.score(msg)
[docs]def is_spam(msg): """Decide whether the message is a spam or not based on the built-in model. :param msg: Message to be classified in string format. """ return score(msg) > 0.9
if __name__ == "__main__": d = Detector(create_new=True) d.train("Super cheap octocats for sale at GitHub.", True) d.train("Hi John, could you please come to my office by 3pm? Ding", False) m1 = "Cheap shoes for sale at DSW shoe store!" print(d.score(m1)) m2 = "Hi mark could you please send me a copy of your machine learning homework? thanks" print(d.score(m2))