Files
dw-parser/parser.py

217 lines
6.7 KiB
Python

#!/usr/bin/env python3
import json, os, smtplib, ssl, logging
from openpyxl.cell.read_only import ReadOnlyCell
from pyvirtualdisplay import Display
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime
from openpyxl import load_workbook
from email.message import EmailMessage
import time
import sys
def load_settings():
with open('conf/conf.json') as file:
return json.load(file)
def fetch_via_browser(newfile):
disp = Display(backend="xvfb", size=(800,600))
disp.start()
options = webdriver.ChromeOptions()
prefs = {
"download.default_directory" : set['files'],
"profile.default_content_settings.popups" : 0,
}
log.debug("CHROMIUM PREFS:")
log.debug(prefs)
options.add_experimental_option("prefs", prefs)
options.add_argument("--headless")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--no-sandbox")
browser = webdriver.Chrome(options=options)
wait = WebDriverWait(browser, 10)
browser.get(set['oo_url'])
iframe = wait.until(
EC.element_to_be_clickable((By.TAG_NAME, 'iframe'))
)
browser.switch_to.frame(iframe)
btn_file = wait.until(
EC.element_to_be_clickable((By.XPATH, "//a[@data-tab='file']"))
)
wait.until(
EC.invisibility_of_element_located((By.CSS_SELECTOR, 'div.asc-loadmask'))
)
btn_file.click()
panel_saveas = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "div#panel-saveas"))
)
btn_download = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "div.svg-format-xlsx"))
)
btn_download.click()
try:
timeout = 0
while not os.path.exists(newfile) and timeout < 20:
time.sleep(1)
timeout += 1
if timeout == 20:
log.warning("Timeout beim Dateidownload erreicht.")
browser.quit()
disp.stop()
except:
log.error("Exception raised: ",sys.exc_info())
def rotate_file(oldfile, newfile):
try:
os.remove(oldfile)
except Exception as e:
log.error("Fehler beim Löschen")
if hasattr(e, 'message'):
log.error(e.message)
else:
log.error(e)
try:
os.rename(newfile, oldfile)
except Exception as e:
log.error("Fehler beim Umbenennen")
if hasattr(e, 'message'):
log.error(e.message)
else:
log.error(e)
def compare_files(oldfile, newfile):
results = []
if os.path.exists(oldfile) and os.path.exists(newfile):
new_wb = load_workbook(filename=newfile, read_only=True)
old_wb = load_workbook(filename=oldfile, read_only=True)
new_sheet = new_wb['BewerberInnen']
old_sheet = old_wb['BewerberInnen']
old_sheet.calculate_dimension(force=True)
new_sheet.calculate_dimension(force=True)
log.debug(f'Old-Sheet-Dimensions (mincol/maxcol:minrow/maxrow): {old_sheet.min_column}/{old_sheet.max_column}:{old_sheet.min_row}/{old_sheet.max_row}')
log.debug(f'New-Sheet-Dimensions (mincol/maxcolminrow/maxrow): {new_sheet.min_column}/{new_sheet.max_column}:{new_sheet.min_row}/{new_sheet.max_row}')
global_min_row = min(old_sheet.min_row, new_sheet.min_row)
global_max_row = max(set.get('max_row', old_sheet.max_row), set.get('max_row', new_sheet.max_row))
global_min_col = min(old_sheet.min_column, new_sheet.min_column)
global_max_col = max(set.get('max_col', old_sheet.max_column), set.get('max_col', new_sheet.max_column))
for row in range(global_min_row, global_max_row + 1):
for col in range(global_min_col, global_max_col + 1):
new_cell = new_sheet.cell(row=row, column=col)
old_cell = old_sheet.cell(row=row, column=col)
log.debug(f'Performance Check, Cell: {col}:{row}')
if new_cell.value != old_cell.value:
if type(new_cell) is ReadOnlyCell:
coord = new_cell.coordinate
elif type(old_cell) is ReadOnlyCell:
coord = old_cell.coordinate
else:
coord = f'{col}/{row}'
message = f'Veränderung in Zelle {coord}: {old_cell.value} ==> {new_cell.value}'
log.info(message)
results.append(message)
new_wb.close()
old_wb.close()
if not results:
log.info("Keine Änderungen gefunden.")
else:
send_email(results, set['email_recipient'])
else:
log.warning('Kann Dateien nicht vergleichen, Dateien nicht vorhanden?')
def send_email(results, recipient):
msg = EmailMessage()
msg['Subject'] = 'ISB-Vermittlung Watchdog'
msg['From'] = set['smtp_user']
msg['To'] = recipient
msg.set_type('text/html')
msg.set_content('\r\n'.join(results))
html_msg = f"""\
<html>
<body>
<a href="{set['oo_url']}">Zur Vermittlungsliste des DW</a>
<br />
<br />
{'<br />'.join(results)}
</body>
</html>
"""
msg.add_alternative(html_msg, subtype="html")
context = ssl.create_default_context()
with smtplib.SMTP(set['smtp_server'], set['smtp_port']) as server:
server.starttls(context=context)
server.login(set['smtp_user'], set['smtp_password'])
server.send_message(msg)
server.quit()
def main():
args = sys.argv[1:]
oldfile = os.path.join(set['files'], 'vermittlung-old.xlsx')
newfile = os.path.join(set['files'], 'ISB Vermittlung.xlsx')
if len(args) == 1 and args[0] == '-t':
send_email("test", "marc-pascal.koenig@outlook.de")
if len(args) == 1 and args[0] == '-c':
compare_files(oldfile, newfile)
else:
rotate_file(oldfile, newfile)
fetch_via_browser(newfile)
compare_files(oldfile, newfile)
set = load_settings()
log = logging.getLogger()
log.setLevel(set['loglevel'])
log_format = logging.Formatter('%(asctime)s - %(levelname)s: %(message)s')
# handler_stdout = logging.StreamHandler(stream="/proc/1/fd/1")
# handler_stdout.setLevel(set['loglevel'])
# handler_stdout.setFormatter(log_format)
handler_file = logging.FileHandler("/proc/1/fd/1")
handler_file.setLevel(set['loglevel'])
handler_file.setFormatter(log_format)
# log.addHandler(handler_stdout)
log.addHandler(handler_file)
if __name__ == '__main__':
main()