from threading import Thread from exchangelib import Credentials, Configuration, Account, DELEGATE, FaultTolerance, Folder, Message, ExtendedProperty import exchangelib.errors import logging, time, os from securecad_parser import parse_securecad_message import os import datetime from hooks import webhook, alarminator_api, cups_print class alarmfax_parser_verarbeitet(ExtendedProperty): property_set_id = "64901230-f5f2-4b07-a032-58fb3970a09e" property_name = "vom Alarmfax Parser verarbeitet" property_type = "Boolean" class alarmfax_parser_id(ExtendedProperty): property_set_id = "0c595b56-c79d-4ecb-9e9b-c94fcef86816" property_name = "ID des Alarmfax Parser" property_type = "String" Message.register("alarmfax_parser_verarbeitet", alarmfax_parser_verarbeitet) Message.register("alarmfax_parser_id", alarmfax_parser_id) #print([f.name for f in Message.FIELDS if f.is_searchable]) threads = {} format = "%(asctime)s|%(threadName)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S") def eventHandler(ELEMENT_NAME, item_id, item_changekey): if (ELEMENT_NAME == 'ModifiedEvent' and IS_DEV) or ELEMENT_NAME == 'NewMailEvent' or ELEMENT_NAME == 'SearchFolderEvent': logging.info(ELEMENT_NAME + " - get Mail") m: Message = a.inbox.get(id=item_id, changekey=item_changekey) if m.alarmfax_parser_verarbeitet and parser_id in ("" if m.alarmfax_parser_id == None else m.alarmfax_parser_id): logging.info("Mail {} bereits verarbeitet.. ignoriere".format(m.id)) if not IS_DEV: return else: m.alarmfax_parser_verarbeitet = True m.alarmfax_parser_id = ("" if m.alarmfax_parser_id == None else m.alarmfax_parser_id) + parser_id m.save(update_fields=["alarmfax_parser_verarbeitet","alarmfax_parser_id"]) logging.info("got Mail {} von {}".format(m.subject, m.sender.email_address)) if m.sender.email_address in filter_from: parsed_body = parse_securecad_message(m.body) logging.debug(parsed_body) if parsed_body != None: if 'ALARMDEPESCHE' in parsed_body: logging.info("Alarm für: {}".format(parsed_body['ALARMDEPESCHE'])) webhook(parsed_body) alarminator_api(parsed_body) cups_print(parsed_body,m.body) pass def folder_event_subscriber(folder: Folder): logging.info('folder_event_subscriber startet for Folder: {}'.format(folder.name)) while True: # filtern des ordners nach mails der letzten 24h, die nicht verarbeitet wurden now = datetime.datetime.now(a.default_timezone) folder.all() folder.all() filtered_items = folder.filter( datetime_received__range=(now - datetime.timedelta(days=1), now + datetime.timedelta(days=1)) ).exclude( alarmfax_parser_verarbeitet=True, alarmfax_parser_id__contains=parser_id ) cnt = filtered_items.count() if cnt > 0: logging.info("{} Mails nicht verarbeitet in den letzten 2 Tagen in ordner: {}".format(cnt, folder.name)) filtered_items = filtered_items.values("id", "changekey") for m in filtered_items: t = Thread(target=eventHandler, args=('SearchFolderEvent',m["id"],m["changekey"],),name="eventHandler: SearchFolderEvent ({})".format(m["id"])) t.start() # aktives warten auf streaming_events. maximal eine minute lang, dann wird nochmal der ordner durchsucht, falls mails angekommen sind während eines timeout/cooldown. subscription_id = folder.subscribe_to_streaming() for notification in folder.get_streaming_events(subscription_id, connection_timeout=1): for event in notification.events: if event.item_id != None: t = Thread(target=eventHandler, args=(event.ELEMENT_NAME,event.item_id.id,event.item_id.changekey,),name="eventHandler: {} ({})".format(event.ELEMENT_NAME, event.item_id.id)) t.start() if __name__ == "__main__": try: username = os.environ.get('username') password = os.environ.get('password') server = os.environ.get('server') folders = os.environ.get('folders',"") parser_id = os.environ.get('alarmfax_parser_id',"") primary_smtp_address = os.environ.get('primary_smtp_address') filter_from = os.environ.get('filter_from').split(";") if os.environ.get('filter_from') else [] IS_DEV = True if os.environ.get('IS_DEV') and os.environ.get('IS_DEV') == "True" else False if IS_DEV: logging.getLogger().setLevel(logging.DEBUG) credentials = Credentials(username=username, password=password) config = Configuration(server=server, credentials=credentials,retry_policy=FaultTolerance(max_wait=5), max_connections=10) a = Account(primary_smtp_address=primary_smtp_address, config=config, autodiscover=False, access_type=DELEGATE) folders_to_subscribe = [] for f in folders.split(";"): if f == "": folders_to_subscribe.append(a.inbox) else: folders_to_subscribe.append(a.inbox / f) while True: for f in folders_to_subscribe: if not f.name in threads or not threads[f.name].is_alive(): logging.info("folder_event_subscriber for folder \"{}\" not alive, starting".format(f.name)) t = Thread(target=folder_event_subscriber, args=(f,), daemon=True, name="folder_event_subscriber {}".format(f.name)) threads[f.name] = t t.start() time.sleep(1) except exchangelib.errors.RateLimitError as e: logging.error('',exc_info=e) if e.status_code == 401: webhook({"ERROR":{ "status_code": e.status_code, "url": e.url }}) time.sleep(60*60*12) else: logging.info("rate limit!!!! sleep 60s") time.sleep(60)