#!/usr/bin/env python3 from distutils.debug import DEBUG import json, os, smtplib, ssl, logging from openpyxl.cell.read_only import ReadOnlyCell from pyvirtualdisplay import Display from selenium import webdriver from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from datetime import datetime from openpyxl import load_workbook from email.message import EmailMessage import time import sys def load_settings(): with open('settings.json') as file: return json.load(file) def fetch_via_browser(newfile): disp = Display(backend="xvfb", size=(800,600)) disp.start() options = webdriver.ChromeOptions() prefs = { "download.default_directory" : set['files'], "profile.default_content_settings.popups" : 0, } logging.info(prefs) options.add_experimental_option("prefs", prefs) options.add_argument("--headless=new") browser = webdriver.Chrome(options=options) wait = WebDriverWait(browser, 10) browser.get(set['oo_url']) iframe = wait.until( EC.element_to_be_clickable((By.TAG_NAME, 'iframe')) ) browser.switch_to.frame(iframe) btn_file = wait.until( EC.element_to_be_clickable((By.XPATH, "//a[@data-tab='file']")) ) wait.until( EC.invisibility_of_element_located((By.CSS_SELECTOR, 'div.asc-loadmask')) ) btn_file.click() panel_saveas = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "div#panel-saveas")) ) btn_download = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "div.svg-format-xlsx")) ) btn_download.click() try: timeout = 0 while not os.path.exists(newfile) and timeout < 20: time.sleep(1) timeout += 1 if timeout == 20: logging.warning("Timeout beim Dateidownload erreicht.") browser.quit() disp.stop() except: logging.error("Exception raised: ",sys.exc_info()) def rotate_file(oldfile, newfile): try: os.remove(oldfile) except Exception as e: logging.error("Fehler beim Löschen") if hasattr(e, 'message'): logging.error(e.message) else: logging.error(e) try: os.rename(newfile, oldfile) except Exception as e: logging.error("Fehler beim Umbenennen") if hasattr(e, 'message'): logging.error(e.message) else: logging.error(e) def compare_files(oldfile, newfile): results = [] if os.path.exists(oldfile) and os.path.exists(newfile): new_wb = load_workbook(filename=newfile, read_only=True) old_wb = load_workbook(filename=oldfile, read_only=True) new_sheet = new_wb['BewerberInnen'] old_sheet = old_wb['BewerberInnen'] old_sheet.calculate_dimension(force=True) new_sheet.calculate_dimension(force=True) logging.debug(f'Old-Sheet-Dimensions (mincol/maxcol:minrow/maxrow): {old_sheet.min_column}/{old_sheet.max_column}:{old_sheet.min_row}/{old_sheet.max_row}') logging.debug(f'New-Sheet-Dimensions (mincol/maxcolminrow/maxrow): {new_sheet.min_column}/{new_sheet.max_column}:{new_sheet.min_row}/{new_sheet.max_row}') global_min_row = min(old_sheet.min_row, new_sheet.min_row) global_max_row = max(set.get('max_row', old_sheet.max_row), set.get('max_row', new_sheet.max_row)) global_min_col = min(old_sheet.min_column, new_sheet.min_column) global_max_col = max(set.get('max_col', old_sheet.max_column), set.get('max_col', new_sheet.max_column)) for row in range(global_min_row, global_max_row + 1): for col in range(global_min_col, global_max_col + 1): new_cell = new_sheet.cell(row=row, column=col) old_cell = old_sheet.cell(row=row, column=col) logging.debug(f'Performance Check, Cell: {col}:{row}') if new_cell.value != old_cell.value: if type(new_cell) is ReadOnlyCell: coord = new_cell.coordinate elif type(old_cell) is ReadOnlyCell: coord = old_cell.coordinate else: coord = f'{col}/{row}' message = f'Veränderung in Zelle {coord}: {old_cell.value} ==> {new_cell.value}' logging.info(message) results.append(message) new_wb.close() old_wb.close() if not results: logging.info("Keine Änderungen gefunden.") else: send_email(results, set['email_recipient']) else: logging.warning('Kann Dateien nicht vergleichen, Dateien nicht vorhanden?') def send_email(results, recipient): msg = EmailMessage() msg['Subject'] = 'ISB-Vermittlung Watchdog' msg['From'] = set['smtp_user'] msg['To'] = recipient msg.set_type('text/html') msg.set_content('\r\n'.join(results)) html_msg = f"""\ Zur Vermittlungsliste des DW

{'
'.join(results)} """ msg.add_alternative(html_msg, subtype="html") context = ssl.create_default_context() with smtplib.SMTP(set['smtp_server'], set['smtp_port']) as server: server.starttls(context=context) server.login(set['smtp_user'], set['smtp_password']) server.send_message(msg) server.quit() def main(): args = sys.argv[1:] oldfile = os.path.join(set['files'], 'vermittlung-old.xlsx') newfile = os.path.join(set['files'], 'ISB Vermittlung.xlsx') if len(args) == 1 and args[0] == '-t': send_email("test", "marc-pascal.koenig@outlook.de") if len(args) == 1 and args[0] == '-c': compare_files(oldfile, newfile) else: rotate_file(oldfile, newfile) fetch_via_browser(newfile) compare_files(oldfile, newfile) set = load_settings() logging.basicConfig( level=set['loglevel'], format='[%(asctime)s] %(levelname)s: %(message)s', filename=set['logfile'] ) if __name__ == '__main__': main()