diff --git a/WAPT/control b/WAPT/control index 6267774..e089c4a 100644 --- a/WAPT/control +++ b/WAPT/control @@ -4,10 +4,10 @@ architecture : all section : base priority : optional name : apps-to-update-on-wapt-server -version : 0-16 categories : Utilities maintainer : Comitari,pcosson description : Package to audit the wapt-server and generate an alert +depends : conflicts : maturity : PROD locale : all @@ -29,7 +29,7 @@ editor : keywords : licence : opensource_free,wapt_public homepage : -package_uuid : +package_uuid : 081d68fb-fd04-4bbc-a1f8-734c1ca909eb valid_from : valid_until : forced_install_on : @@ -40,5 +40,4 @@ icon_sha256sum : signer : signer_fingerprint: signature_date : -signed_attributes : -signature : \ No newline at end of file +signed_attributes : \ No newline at end of file diff --git a/setup.py b/setup.py index 864e55d..df9b80c 100644 --- a/setup.py +++ b/setup.py @@ -1,19 +1,15 @@ -""" -This script handles the installation and auditing of WAPT packages for updating applications. -It checks for new package versions on a WAPT server and sends notifications. -""" # -*- coding: utf-8 -*- +from setuphelpers import * +import requests import json import smtplib -from configparser import ConfigParser - -import requests import waptlicences -from waptpackage import HostCapabilities, WaptRemoteRepo, PackageVersion +from configparser import ConfigParser, NoSectionError, NoOptionError +from waptpackage import HostCapabilities +from waptpackage import WaptRemoteRepo +from waptpackage import PackageVersion from common import get_requests_client_cert_session -from setuphelpers import * - def install(): plugin_inifiles = glob.glob("*.ini") @@ -22,81 +18,225 @@ def install(): print(f"copie de {file} dans {WAPT.private_dir}") filecopyto(file, WAPT.private_dir) -def audit(): +def _get_notification_config(): + """Returns notification configuration from wapt_api.ini.""" + try: + CONFWAPT = ConfigParser() + config_path = makepath(WAPT.private_dir, "wapt_api.ini") + + if not isfile(config_path): + print(f"Warning: Configuration file not found: {config_path}, using defaults") + return _get_default_notification_config() + + CONFWAPT.read(config_path) + + try: + enable_email = CONFWAPT.getboolean("notifications", "enable_email", fallback=True) + except (NoSectionError, NoOptionError, ValueError) as e: + print(f"Warning: Error reading enable_email config: {e}, using default") + enable_email = True + + try: + enable_rocketchat = CONFWAPT.getboolean("notifications", "enable_rocketchat", fallback=False) + except (NoSectionError, NoOptionError, ValueError) as e: + print(f"Warning: Error reading enable_rocketchat config: {e}, using default") + enable_rocketchat = False + + try: + notification_method = CONFWAPT.get("notifications", "notification_method", fallback="both") + if notification_method not in ["both", "email", "rocketchat"]: + print(f"Warning: Invalid notification_method: {notification_method}, using default") + notification_method = "both" + except (NoSectionError, NoOptionError) as e: + print(f"Warning: Error reading notification_method config: {e}, using default") + notification_method = "both" + + return { + "enable_email": enable_email, + "enable_rocketchat": enable_rocketchat, + "notification_method": notification_method + } + + except Exception as e: + print(f"Error: Unexpected error reading notification config: {e}") + import traceback + print(f"Error: Traceback: {traceback.format_exc()}") + return _get_default_notification_config() - conf_wapt = ConfigParser() - conf_wapt.read(makepath(WAPT.private_dir, "wapt_api.ini")) - username_wapt = conf_wapt.get("wapt", "wapt_username") - password_wapt = conf_wapt.get("wapt", "wapt_password") +def _get_default_notification_config(): + """Returns default notification configuration.""" + return { + "enable_email": True, + "enable_rocketchat": False, + "notification_method": "both" + } - dict_host_capa = {} +def _get_wapt_session(): + """Initializes and returns a WAPT session.""" + try: + CONFWAPT = ConfigParser() + config_path = makepath(WAPT.private_dir, "wapt_api.ini") + + if not isfile(config_path): + raise FileNotFoundError(f"WAPT configuration file not found: {config_path}") + + CONFWAPT.read(config_path) + + try: + username_wapt = CONFWAPT.get("wapt", "wapt_username") + password_wapt = CONFWAPT.get("wapt", "wapt_password") + except (NoSectionError, NoOptionError) as e: + raise ValueError(f"Missing WAPT credentials in configuration: {e}") - t = waptlicences.waptserver_login(WAPT.config_filename,username_wapt,password_wapt) - if not 'session' in t['session_cookies']: - session_cookies = [u for u in t['session_cookies'] if u['Domain'] == WAPT.waptserver.server_url.split('://')[-1]][0] - else: - session_cookies = t['session_cookies']['session'] - session_cookies['Name'] = 'session' + print("Info: Attempting to connect to WAPT server...") + t = waptlicences.waptserver_login(WAPT.config_filename, username_wapt, password_wapt) + + if not t or 'session_cookies' not in t: + raise ConnectionError("Failed to authenticate with WAPT server") + + if not 'session' in t['session_cookies']: + session_cookies = [u for u in t['session_cookies'] if u['Domain'] == WAPT.waptserver.server_url.split('://')[-1]] + if not session_cookies: + raise ValueError("No valid session cookies found") + session_cookies = session_cookies[0] + else: + session_cookies = t['session_cookies']['session'] + session_cookies['Name'] = 'session' - client_private_key_password = t["client_private_key_password"] + client_private_key_password = t.get("client_private_key_password", "") + + try: + sessionwapt = get_requests_client_cert_session( + WAPT.waptserver.server_url, + cert=(t['client_certificate'], t['client_private_key'], client_private_key_password), + verify=WAPT.waptserver.verify_cert + ) + sessionwapt.cookies.set(session_cookies['Name'], session_cookies['Value'], domain=session_cookies['Domain']) + sessionwapt.verify = WAPT.waptserver.verify_cert + + print("Info: Successfully authenticated with WAPT server") + return sessionwapt, t, client_private_key_password + + except Exception as e: + print(f"Error: Failed to create WAPT session: {e}") + import traceback + print(f"Error: Traceback: {traceback.format_exc()}") + raise + + except Exception as e: + print(f"Error: Error creating WAPT session: {e}") + import traceback + print(f"Error: Traceback: {traceback.format_exc()}") + raise - sessionwapt = get_requests_client_cert_session(WAPT.waptserver.server_url,cert=(t['client_certificate'],t['client_private_key'],t['client_private_key_password']),verify=WAPT.waptserver.verify_cert) - sessionwapt.cookies.set(session_cookies['Name'], session_cookies['Value'], domain=session_cookies['Domain']) - sessionwapt.verify = WAPT.waptserver.verify_cert +def _get_host_capabilities(sessionwapt): + """Fetches and returns host capabilities from the WAPT server.""" + try: + print("Info: Fetching host capabilities from WAPT server...") + dict_host_capa = {} + + api_url = "%s/api/v3/hosts?columns=host_capabilities&limit=1000000" % WAPT.waptserver.server_url + + try: + response = sessionwapt.get(api_url, timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + raise ConnectionError(f"Failed to fetch host capabilities: {e}") + + try: + data = json.loads(response.content) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON response from WAPT server: {e}") + + if "result" not in data: + raise ValueError("Unexpected response format from WAPT server") + + for pc in data["result"]: + if not pc or not pc.get('host_capabilities'): + continue - for pc in json.loads(sessionwapt.get("%s/api/v3/hosts?columns=host_capabilities&limit=1000000" % WAPT.waptserver.server_url).content)["result"]: - if not pc['host_capabilities']: - continue + try: + dict_capa = dict( + architecture=pc['host_capabilities']['architecture'], + language=pc['host_capabilities']['language'], + os=pc['host_capabilities']['os'], + packages_locales=sorted(pc['host_capabilities']['packages_locales']), + tags=sorted(pc['host_capabilities']['tags']), + os_version=pc['host_capabilities']['os_version'] + ) - dict_capa = dict(architecture= pc['host_capabilities']['architecture'], - language=pc['host_capabilities']['language'], - os=pc['host_capabilities']['os'], - packages_locales= sorted(pc['host_capabilities']['packages_locales']), - tags=sorted(pc['host_capabilities']['tags']), - os_version=pc['host_capabilities']['os_version']) + tempo_capa = HostCapabilities(**dict_capa) + dict_host_capa[str(dict_capa)] = tempo_capa + + except (KeyError, TypeError) as e: + print(f"Warning: Skipping invalid host capability entry: {e}") + continue + + print(f"Info: Successfully retrieved capabilities for {len(dict_host_capa)} host configurations") + return dict_host_capa + + except Exception as e: + print(f"Error: Error fetching host capabilities: {e}") + raise - tempo_capa = HostCapabilities(**dict_capa) - - dict_host_capa[str(dict_capa)] = tempo_capa - - store = WaptRemoteRepo(name="main", url='https://wapt.tranquil.it/wapt', timeout=4, verify_cert=True) - localstore = WaptRemoteRepo(name="main", url= WAPT.waptserver.server_url + '/wapt', timeout=4, verify_cert=WAPT.waptserver.verify_cert) - - store_packages = store.packages() - - localstore.client_certificate = t['client_certificate'] - localstore.client_private_key = t['client_private_key'] - - def give_password(location=None,identity=None): - return client_private_key_password - - localstore.private_key_password_callback = give_password - - store_localstore = localstore.packages() - - # Download JSON data from the URL - online_package_list = {} - local_package_list = {} - for hc in dict_host_capa: - online_package_version = {} - for packageentry in store_packages: - if dict_host_capa[hc].is_matching_package(packageentry): - if not packageentry.package in online_package_version: - online_package_version[packageentry.package] = "0" - if PackageVersion(online_package_version[packageentry.package]) < PackageVersion(packageentry.version): - online_package_version[packageentry.package] = packageentry.version - online_package_list[hc] = online_package_version - - for hc in dict_host_capa: - local_package_version = {} - for packageentry in store_localstore: - if dict_host_capa[hc].is_matching_package(packageentry): - if not packageentry.package in local_package_version: - local_package_version[packageentry.package] = "0" - if PackageVersion(local_package_version[packageentry.package]) < PackageVersion(packageentry.version): - local_package_version[packageentry.package] = packageentry.version - local_package_list[hc] = local_package_version +def _get_packages_versions(store, dict_host_capa): + """Returns a dictionary of package versions for a given store.""" + try: + print(f"Info: Fetching package versions from store: {store.name}") + + try: + packages = store.packages() + except Exception as e: + raise ConnectionError(f"Failed to fetch packages from store {store.name}: {e}") + + if not packages: + print(f"Warning: No packages found in store: {store.name}") + return {} + + online_package_list = {} + + for hc in dict_host_capa: + online_package_version = {} + + try: + for packageentry in packages: + if not packageentry: + continue + + try: + if dict_host_capa[hc].is_matching_package(packageentry): + if not packageentry.package in online_package_version: + online_package_version[packageentry.package] = "0" + + try: + current_version = PackageVersion(online_package_version[packageentry.package]) + new_version = PackageVersion(packageentry.version) + + if current_version < new_version: + online_package_version[packageentry.package] = packageentry.version + except (ValueError, AttributeError) as e: + print(f"Warning: Invalid package version for {packageentry.package}: {e}") + continue + + except Exception as e: + print(f"Warning: Error processing package entry: {e}") + continue + + online_package_list[hc] = online_package_version + + except Exception as e: + print(f"Warning: Error processing host capabilities {hc}: {e}") + continue + + print(f"Info: Successfully processed package versions for {len(online_package_list)} host configurations") + return online_package_list + + except Exception as e: + print(f"Error: Error fetching package versions: {e}") + raise +def _compare_and_notify(local_package_list, online_package_list, dict_host_capa): + """Compares local and online packages and sends notifications based on configuration.""" list_app_to_update = [] for hc in dict_host_capa: for app in local_package_list[hc]: @@ -115,21 +255,126 @@ def audit(): ) WAPT.write_audit_data_if_changed("apps_to_upgrade", "list", list_app_to_update, max_count=3) + # Get notification configuration + notification_config = _get_notification_config() if not list_app_to_update: message="your repository seems up to date" print(message) - #send_to_rocket(message) + + # Send notification based on configuration + if notification_config["notification_method"] == "both": + if notification_config["enable_email"]: + send_email("WAPT Repository Status", message) + if notification_config["enable_rocketchat"]: + send_to_rocket(message) + elif notification_config["notification_method"] == "email" and notification_config["enable_email"]: + send_email("WAPT Repository Status", message) + elif notification_config["notification_method"] == "rocketchat" and notification_config["enable_rocketchat"]: + send_to_rocket(message) + return "OK" + else: + message=f"You need to update some packages :\n" + for app in list_app_to_update: + message += f"**{app['package']}** : {app['new_version']} from : {app['old_version']}\n" + print(message) + + # Send notification based on configuration + if notification_config["notification_method"] == "both": + if notification_config["enable_email"]: + send_email("Some application need to be updated on your wapt server", message) + if notification_config["enable_rocketchat"]: + send_to_rocket(message) + elif notification_config["notification_method"] == "email" and notification_config["enable_email"]: + send_email("Some application need to be updated on your wapt server", message) + elif notification_config["notification_method"] == "rocketchat" and notification_config["enable_rocketchat"]: + send_to_rocket(message) + + return "WARNING" - message=f"You need to update some packages :\n" - for app in list_app_to_update: - message += f"**{app['package']}** : {app['new_version']} from : {app['old_version']}\n" - print(message) - #send_to_rocket(message) - send_email("Some application need to be updated on your wapt server",message) - return "WARNING" +def audit(): + """Main audit function with comprehensive error handling.""" + try: + print("Info: Starting WAPT package audit...") + + # Initialize WAPT session + try: + sessionwapt, t, client_private_key_password = _get_wapt_session() + except Exception as e: + print(f"Error: Failed to initialize WAPT session: {e}") + import traceback + print(f"Error: Traceback: {traceback.format_exc()}") + return "ERROR" + + # Get host capabilities + try: + dict_host_capa = _get_host_capabilities(sessionwapt) + except Exception as e: + print(f"Error: Failed to get host capabilities: {e}") + import traceback + print(f"Error: Traceback: {traceback.format_exc()}") + return "ERROR" + + if not dict_host_capa: + print("Warning: No host capabilities found, audit completed") + return "OK" + + # Get online packages + try: + store = WaptRemoteRepo(name="main", url='https://wapt.tranquil.it/wapt', timeout=10, verify_cert=True) + online_package_list = _get_packages_versions(store, dict_host_capa) + except Exception as e: + print(f"Error: Failed to fetch online packages: {e}") + import traceback + print(f"Error: Traceback: {traceback.format_exc()}") + return "ERROR" + + # Get local packages + try: + localstore = WaptRemoteRepo( + name="main", + url=WAPT.waptserver.server_url + '/wapt', + timeout=10, + verify_cert=WAPT.waptserver.verify_cert + ) + + if not all(key in t for key in ['client_certificate', 'client_private_key']): + print("Error: Missing required certificates for local store access") + return "ERROR" + + localstore.client_certificate = t['client_certificate'] + localstore.client_private_key = t['client_private_key'] + def give_password(location=None, identity=None): + return client_private_key_password + + localstore.private_key_password_callback = give_password + local_package_list = _get_packages_versions(localstore, dict_host_capa) + + except Exception as e: + print(f"Error: Failed to fetch local packages: {e}") + import traceback + print(f"Error: Traceback: {traceback.format_exc()}") + return "ERROR" + + # Compare and notify + try: + result = _compare_and_notify(local_package_list, online_package_list, dict_host_capa) + print(f"Info: Audit completed with result: {result}") + return result + + except Exception as e: + print(f"Error: Failed to compare packages and send notifications: {e}") + import traceback + print(f"Error: Traceback: {traceback.format_exc()}") + return "ERROR" + + except Exception as e: + print(f"Error: Unexpected error during audit: {e}") + import traceback + print(f"Error: Traceback: {traceback.format_exc()}") + return "ERROR" def send_to_rocket(message_text, attachments=None): """ @@ -138,49 +383,111 @@ def send_to_rocket(message_text, attachments=None): :param message_text: Texte du message à envoyer :param attachments: Liste de pièces jointes (facultatif) """ - smtp_inifile = makepath(WAPT.private_dir, "rocket.ini") - conf_wapt = ConfigParser() - conf_wapt.read(smtp_inifile) + try: + print("Info: Attempting to send Rocket.Chat notification...") + + rocket_config_file = makepath(WAPT.private_dir, "rocket.ini") + + if not isfile(rocket_config_file): + print("Warning: Rocket.Chat configuration file not found, skipping notification") + return False + + conf_wapt = ConfigParser() + conf_wapt.read(rocket_config_file) - webhook_url = conf_wapt.get("rocket", "url") - - # Construire le message + try: + webhook_url = conf_wapt.get("rocket", "url") + if not webhook_url: + raise ValueError("Webhook URL is empty") + except (NoSectionError, NoOptionError) as e: + raise ValueError(f"Rocket.Chat webhook URL not configured: {e}") + + # Construire le message + message = { + 'text': message_text + } + if attachments: + message['attachments'] = attachments - message = { - 'text': message_text - } - if attachments: - message['attachments'] = attachments - - # Envoyer la requête POST - response = requests.post(webhook_url, data=json.dumps(message), headers={'Content-Type': 'application/json'}) - - # Vérifier la réponse - if response.status_code == 200: - print('Message envoyé avec succès.') - else: - print(f'Échec de l\'envoi du message. Statut de la réponse : {response.status_code}') - print(f'Erreur : {response.text}') + # Envoyer la requête POST + try: + response = requests.post( + webhook_url, + data=json.dumps(message), + headers={'Content-Type': 'application/json'}, + timeout=30 + ) + + # Vérifier la réponse + if response.status_code == 200: + print('Info: Rocket.Chat message sent successfully') + return True + else: + print(f'Error: Failed to send Rocket.Chat message. Status: {response.status_code}, Error: {response.text}') + return False + + except requests.exceptions.RequestException as e: + print(f"Error: Network error sending Rocket.Chat message: {e}") + return False + + except Exception as e: + print(f"Error: Unexpected error sending Rocket.Chat notification: {e}") + return False -def send_mail(body,subject): +def send_email(body, subject): + """Send email notification with error handling.""" + try: + print("Info: Attempting to send email notification...") + + smtp_config_file = makepath(WAPT.private_dir, "smtp.ini") + + if not isfile(smtp_config_file): + print("Warning: SMTP configuration file not found, skipping email notification") + return False + + conf_wapt = ConfigParser() + conf_wapt.read(smtp_config_file) - smtp_inifile = makepath(WAPT.private_dir, "smtp.ini") - conf_wapt = ConfigParser() - conf_wapt.read(smtp_inifile) + try: + from_addr = conf_wapt.get("smtp", "from_addr") + to_addr = conf_wapt.get("smtp", "to_addr") + password = conf_wapt.get("smtp", "password") + smtpserver = conf_wapt.get("smtp", "smtpserver") + + if not all([from_addr, to_addr, password, smtpserver]): + raise ValueError("Missing required SMTP configuration") + + except (NoSectionError, NoOptionError) as e: + raise ValueError(f"SMTP configuration incomplete: {e}") - from_addr = conf_wapt.get("smtp", "from_addr") - to_addr = conf_wapt.get("smtp", "to_addr") - password = conf_wapt.get("smtp", "password") - smtpserver = conf_wapt.get("smtp", "smtpserver") - - print(from_addr) - - - message = f"Subject: {subject}\n\n{body}" - server = smtplib.SMTP(smtpserver, 587) - server.starttls() - server.login(from_addr, password) - server.sendmail(from_addr, to_addr, message) - server.quit() - return "OK" \ No newline at end of file + print(f"Info: Sending email from {from_addr} to {to_addr}") + + message = f"Subject: {subject}\n\n{body}" + + try: + server = smtplib.SMTP(smtpserver, 587, timeout=30) + server.starttls() + server.login(from_addr, password) + server.sendmail(from_addr, to_addr, message) + server.quit() + + print("Info: Email sent successfully") + return True + + except smtplib.SMTPAuthenticationError as e: + print(f"Error: SMTP authentication failed: {e}") + return False + except smtplib.SMTPConnectError as e: + print(f"Error: Failed to connect to SMTP server: {e}") + return False + except smtplib.SMTPException as e: + print(f"Error: SMTP error occurred: {e}") + return False + except Exception as e: + print(f"Error: Unexpected error sending email: {e}") + return False + + except Exception as e: + print(f"Error: Error in email configuration: {e}") + return False \ No newline at end of file diff --git a/wapt_api.ini b/wapt_api.ini index 3f2be2f..b376a1e 100644 --- a/wapt_api.ini +++ b/wapt_api.ini @@ -1,4 +1,9 @@ [wapt] wapt_username = xxxxx wapt_password = xxxxx -wapt_url =xxxxx \ No newline at end of file +wapt_url =xxxxx + +[notifications] +enable_email = true +enable_rocketchat = false +notification_method = both \ No newline at end of file