wnd_ils_alarmfax_parser/app/exchange_connect.py

124 lines
6.1 KiB
Python

from threading import Thread
from exchangelib import Credentials, Configuration, Account, DELEGATE, FaultTolerance, Folder, Message, ExtendedProperty
import exchangelib.errors
import logging, time, os
from securecad_parser import parse_securecad_message
import os
import datetime
from hooks import webhook, alarminator_api, cups_print
class alarmfax_parser_verarbeitet(ExtendedProperty):
property_set_id = "64901230-f5f2-4b07-a032-58fb3970a09e"
property_name = "vom Alarmfax Parser verarbeitet"
property_type = "Boolean"
class alarmfax_parser_id(ExtendedProperty):
property_set_id = "0c595b56-c79d-4ecb-9e9b-c94fcef86816"
property_name = "ID des Alarmfax Parser"
property_type = "String"
Message.register("alarmfax_parser_verarbeitet", alarmfax_parser_verarbeitet)
Message.register("alarmfax_parser_id", alarmfax_parser_id)
#print([f.name for f in Message.FIELDS if f.is_searchable])
threads = {}
format = "%(asctime)s|%(threadName)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S")
def eventHandler(ELEMENT_NAME, item_id, item_changekey):
if (ELEMENT_NAME == 'ModifiedEvent' and IS_DEV) or ELEMENT_NAME == 'NewMailEvent' or ELEMENT_NAME == 'SearchFolderEvent':
logging.info(ELEMENT_NAME + " - get Mail")
m: Message = a.inbox.get(id=item_id, changekey=item_changekey)
if m.alarmfax_parser_verarbeitet and parser_id in ("" if m.alarmfax_parser_id == None else m.alarmfax_parser_id):
logging.info("Mail {} bereits verarbeitet.. ignoriere".format(m.id))
if not IS_DEV:
return
else:
m.alarmfax_parser_verarbeitet = True
m.alarmfax_parser_id = ("" if m.alarmfax_parser_id == None else m.alarmfax_parser_id) + parser_id
m.save(update_fields=["alarmfax_parser_verarbeitet","alarmfax_parser_id"])
logging.info("got Mail {} von {}".format(m.subject, m.sender.email_address))
if m.sender.email_address in filter_from:
parsed_body = parse_securecad_message(m.body)
logging.debug(parsed_body)
if parsed_body != None:
if 'ALARMDEPESCHE' in parsed_body:
logging.info("Alarm für: {}".format(parsed_body['ALARMDEPESCHE']))
webhook(parsed_body)
alarminator_api(parsed_body)
cups_print(parsed_body,m.body)
pass
def folder_event_subscriber(folder: Folder):
logging.info('folder_event_subscriber startet for Folder: {}'.format(folder.name))
while True:
# filtern des ordners nach mails der letzten 24h, die nicht verarbeitet wurden
now = datetime.datetime.now(a.default_timezone)
folder.all()
folder.all()
filtered_items = folder.filter(
datetime_received__range=(now - datetime.timedelta(days=1), now + datetime.timedelta(days=1))
).exclude(
alarmfax_parser_verarbeitet=True,
alarmfax_parser_id__contains=parser_id
)
cnt = filtered_items.count()
if cnt > 0:
logging.info("{} Mails nicht verarbeitet in den letzten 2 Tagen in ordner: {}".format(cnt, folder.name))
filtered_items = filtered_items.values("id", "changekey")
for m in filtered_items:
t = Thread(target=eventHandler, args=('SearchFolderEvent',m["id"],m["changekey"],),name="eventHandler: SearchFolderEvent ({})".format(m["id"]))
t.start()
# aktives warten auf streaming_events. maximal eine minute lang, dann wird nochmal der ordner durchsucht, falls mails angekommen sind während eines timeout/cooldown.
subscription_id = folder.subscribe_to_streaming()
for notification in folder.get_streaming_events(subscription_id, connection_timeout=1):
for event in notification.events:
if event.item_id != None:
t = Thread(target=eventHandler, args=(event.ELEMENT_NAME,event.item_id.id,event.item_id.changekey,),name="eventHandler: {} ({})".format(event.ELEMENT_NAME, event.item_id.id))
t.start()
if __name__ == "__main__":
try:
username = os.environ.get('username')
password = os.environ.get('password')
server = os.environ.get('server')
folders = os.environ.get('folders',"")
parser_id = os.environ.get('alarmfax_parser_id',"")
primary_smtp_address = os.environ.get('primary_smtp_address')
filter_from = os.environ.get('filter_from').split(";") if os.environ.get('filter_from') else []
IS_DEV = True if os.environ.get('IS_DEV') and os.environ.get('IS_DEV') == "True" else False
if IS_DEV:
logging.getLogger().setLevel(logging.DEBUG)
credentials = Credentials(username=username, password=password)
config = Configuration(server=server, credentials=credentials,retry_policy=FaultTolerance(max_wait=5), max_connections=10)
a = Account(primary_smtp_address=primary_smtp_address, config=config, autodiscover=False, access_type=DELEGATE)
folders_to_subscribe = []
for f in folders.split(";"):
if f == "":
folders_to_subscribe.append(a.inbox)
else:
folders_to_subscribe.append(a.inbox / f)
while True:
for f in folders_to_subscribe:
if not f.name in threads or not threads[f.name].is_alive():
logging.info("folder_event_subscriber for folder \"{}\" not alive, starting".format(f.name))
t = Thread(target=folder_event_subscriber, args=(f,), daemon=True, name="folder_event_subscriber {}".format(f.name))
threads[f.name] = t
t.start()
time.sleep(1)
except exchangelib.errors.RateLimitError as e:
logging.error('',exc_info=e)
if e.status_code == 401:
webhook({"ERROR":{
"status_code": e.status_code,
"url": e.url
}})
time.sleep(60*60*12)
else:
logging.info("rate limit!!!! sleep 60s")
time.sleep(60)