# -*- coding: utf-8 -*- from setuphelpers import * import requests import json import smtplib import waptlicences from configparser import ConfigParser, NoSectionError, NoOptionError from waptpackage import HostCapabilities from waptpackage import WaptRemoteRepo from waptpackage import PackageVersion from common import get_requests_client_cert_session def install(): # Copy the unified configuration file package_config_file = "comi-apps-to-update-on-wapt-server.ini" if isfile(package_config_file): target_path = makepath(WAPT.private_dir, package_config_file) if not isfile(target_path): print(f"copie de {package_config_file} dans {WAPT.private_dir}") filecopyto(package_config_file, WAPT.private_dir) else: print(f"Warning: Configuration file {package_config_file} not found") def _get_notification_config(): """Returns notification configuration from unified package config file.""" try: CONFWAPT = ConfigParser() config_path = makepath(WAPT.private_dir, "comi-apps-to-update-on-wapt-server.ini") if not isfile(config_path): print(f"Warning: Configuration file not found: {config_path}, using defaults") return _get_default_notification_config() CONFWAPT.read(config_path) try: enable_email = CONFWAPT.getboolean("notifications", "enable_email", fallback=True) except (NoSectionError, NoOptionError, ValueError) as e: print(f"Warning: Error reading enable_email config: {e}, using default") enable_email = True try: enable_rocketchat = CONFWAPT.getboolean("notifications", "enable_rocketchat", fallback=False) except (NoSectionError, NoOptionError, ValueError) as e: print(f"Warning: Error reading enable_rocketchat config: {e}, using default") enable_rocketchat = False try: notification_method = CONFWAPT.get("notifications", "notification_method", fallback="both") if notification_method not in ["both", "email", "rocketchat"]: print(f"Warning: Invalid notification_method: {notification_method}, using default") notification_method = "both" except (NoSectionError, NoOptionError) as e: print(f"Warning: Error reading notification_method config: {e}, using default") notification_method = "both" return { "enable_email": enable_email, "enable_rocketchat": enable_rocketchat, "notification_method": notification_method } except Exception as e: print(f"Error: Unexpected error reading notification config: {e}") import traceback print(f"Error: Traceback: {traceback.format_exc()}") return _get_default_notification_config() def _get_default_notification_config(): """Returns default notification configuration.""" return { "enable_email": True, "enable_rocketchat": False, "notification_method": "both" } def _get_wapt_session(): """Initializes and returns a WAPT session.""" try: CONFWAPT = ConfigParser() config_path = makepath(WAPT.private_dir, "comi-apps-to-update-on-wapt-server.ini") if not isfile(config_path): raise FileNotFoundError(f"WAPT configuration file not found: {config_path}") CONFWAPT.read(config_path) try: username_wapt = CONFWAPT.get("wapt", "wapt_username") password_wapt = CONFWAPT.get("wapt", "wapt_password") except (NoSectionError, NoOptionError) as e: raise ValueError(f"Missing WAPT credentials in configuration: {e}") print("Info: Attempting to connect to WAPT server...") t = waptlicences.waptserver_login(WAPT.config_filename, username_wapt, password_wapt) if not t or 'session_cookies' not in t: raise ConnectionError("Failed to authenticate with WAPT server") if not 'session' in t['session_cookies']: session_cookies = [u for u in t['session_cookies'] if u['Domain'] == WAPT.waptserver.server_url.split('://')[-1]] if not session_cookies: raise ValueError("No valid session cookies found") session_cookies = session_cookies[0] else: session_cookies = t['session_cookies']['session'] session_cookies['Name'] = 'session' client_private_key_password = t.get("client_private_key_password", "") try: sessionwapt = get_requests_client_cert_session( WAPT.waptserver.server_url, cert=(t['client_certificate'], t['client_private_key'], client_private_key_password), verify=WAPT.waptserver.verify_cert ) sessionwapt.cookies.set(session_cookies['Name'], session_cookies['Value'], domain=session_cookies['Domain']) sessionwapt.verify = WAPT.waptserver.verify_cert print("Info: Successfully authenticated with WAPT server") return sessionwapt, t, client_private_key_password except Exception as e: print(f"Error: Failed to create WAPT session: {e}") import traceback print(f"Error: Traceback: {traceback.format_exc()}") raise except Exception as e: print(f"Error: Error creating WAPT session: {e}") import traceback print(f"Error: Traceback: {traceback.format_exc()}") raise def _get_host_capabilities(sessionwapt): """Fetches and returns host capabilities from the WAPT server.""" try: print("Info: Fetching host capabilities from WAPT server...") dict_host_capa = {} api_url = "%s/api/v3/hosts?columns=host_capabilities&limit=1000000" % WAPT.waptserver.server_url try: response = sessionwapt.get(api_url, timeout=30) response.raise_for_status() except requests.exceptions.RequestException as e: raise ConnectionError(f"Failed to fetch host capabilities: {e}") try: data = json.loads(response.content) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON response from WAPT server: {e}") if "result" not in data: raise ValueError("Unexpected response format from WAPT server") for pc in data["result"]: if not pc or not pc.get('host_capabilities'): continue try: dict_capa = dict( architecture=pc['host_capabilities']['architecture'], language=pc['host_capabilities']['language'], os=pc['host_capabilities']['os'], packages_locales=sorted(pc['host_capabilities']['packages_locales']), tags=sorted(pc['host_capabilities']['tags']), os_version=pc['host_capabilities']['os_version'] ) tempo_capa = HostCapabilities(**dict_capa) dict_host_capa[str(dict_capa)] = tempo_capa except (KeyError, TypeError) as e: print(f"Warning: Skipping invalid host capability entry: {e}") continue print(f"Info: Successfully retrieved capabilities for {len(dict_host_capa)} host configurations") return dict_host_capa except Exception as e: print(f"Error: Error fetching host capabilities: {e}") raise def _get_packages_versions(store, dict_host_capa): """Returns a dictionary of package versions for a given store.""" try: print(f"Info: Fetching package versions from store: {store.name}") try: packages = store.packages() except Exception as e: raise ConnectionError(f"Failed to fetch packages from store {store.name}: {e}") if not packages: print(f"Warning: No packages found in store: {store.name}") return {} online_package_list = {} for hc in dict_host_capa: online_package_version = {} try: for packageentry in packages: if not packageentry: continue try: if dict_host_capa[hc].is_matching_package(packageentry): if not packageentry.package in online_package_version: online_package_version[packageentry.package] = "0" try: current_version = PackageVersion(online_package_version[packageentry.package]) new_version = PackageVersion(packageentry.version) if current_version < new_version: online_package_version[packageentry.package] = packageentry.version except (ValueError, AttributeError) as e: print(f"Warning: Invalid package version for {packageentry.package}: {e}") continue except Exception as e: print(f"Warning: Error processing package entry: {e}") continue online_package_list[hc] = online_package_version except Exception as e: print(f"Warning: Error processing host capabilities {hc}: {e}") continue print(f"Info: Successfully processed package versions for {len(online_package_list)} host configurations") return online_package_list except Exception as e: print(f"Error: Error fetching package versions: {e}") raise def _compare_and_notify(local_package_list, online_package_list, dict_host_capa): """Compares local and online packages and sends notifications based on configuration.""" list_app_to_update = [] processed_packages = set() # Track already processed packages to avoid duplicates for hc in dict_host_capa: for app in local_package_list[hc]: if "-" in app: app_base_name = app.split("-", 1)[1] tis_app_name = "tis-" + app_base_name if tis_app_name in online_package_list[hc]: if PackageVersion(local_package_list[hc][app]) < PackageVersion(online_package_list[hc][tis_app_name]) and app_base_name not in processed_packages: print( f'{app} new version detected from {local_package_list[hc][app]} to {online_package_list[hc][tis_app_name]} for {hc}' ) list_app_to_update.append( { "package": app, "old_version": local_package_list[hc][app], "new_version": online_package_list[hc][tis_app_name], } ) processed_packages.add(app_base_name) # Mark as processed to avoid duplicates WAPT.write_audit_data_if_changed("apps_to_upgrade", "list", list_app_to_update, max_count=3) # Get notification configuration notification_config = _get_notification_config() if not list_app_to_update: message="your repository seems up to date" print(message) # Send notification based on configuration if notification_config["notification_method"] == "both": if notification_config["enable_email"]: send_email("WAPT Repository Status", message) if notification_config["enable_rocketchat"]: send_to_rocket(message) elif notification_config["notification_method"] == "email" and notification_config["enable_email"]: send_email("WAPT Repository Status", message) elif notification_config["notification_method"] == "rocketchat" and notification_config["enable_rocketchat"]: send_to_rocket(message) return "OK" else: message=f"You need to update some packages :\n" for app in list_app_to_update: message += f"**{app['package']}** : {app['new_version']} from : {app['old_version']}\n" print(message) # Send notification based on configuration if notification_config["notification_method"] == "both": if notification_config["enable_email"]: send_email("Some application need to be updated on your wapt server", message) if notification_config["enable_rocketchat"]: send_to_rocket(message) elif notification_config["notification_method"] == "email" and notification_config["enable_email"]: send_email("Some application need to be updated on your wapt server", message) elif notification_config["notification_method"] == "rocketchat" and notification_config["enable_rocketchat"]: send_to_rocket(message) return "WARNING" def audit(): """Main audit function with comprehensive error handling.""" try: print("Info: Starting WAPT package audit...") # Initialize WAPT session print("Info: Step 1 - Initializing WAPT session...") try: sessionwapt, t, client_private_key_password = _get_wapt_session() print("Info: ✓ WAPT session initialized successfully") except Exception as e: print(f"Error: Failed to initialize WAPT session: {e}") import traceback print(f"Error: Traceback: {traceback.format_exc()}") return "ERROR" # Get host capabilities print("Info: Step 2 - Getting host capabilities...") try: dict_host_capa = _get_host_capabilities(sessionwapt) print(f"Info: ✓ Host capabilities retrieved: {len(dict_host_capa)} configurations") except Exception as e: print(f"Error: Failed to get host capabilities: {e}") import traceback print(f"Error: Traceback: {traceback.format_exc()}") return "ERROR" if not dict_host_capa: print("Warning: No host capabilities found, audit completed") return "OK" # Get online packages print("Info: Step 3 - Getting online packages...") try: store = WaptRemoteRepo(name="main", url='https://wapt.tranquil.it/wapt', timeout=10, verify_cert=True) online_package_list = _get_packages_versions(store, dict_host_capa) print(f"Info: ✓ Online packages retrieved for {len(online_package_list)} configurations") except Exception as e: print(f"Error: Failed to fetch online packages: {e}") import traceback print(f"Error: Traceback: {traceback.format_exc()}") return "ERROR" # Get local packages print("Info: Step 4 - Getting local packages...") try: localstore = WaptRemoteRepo( name="main", url=WAPT.waptserver.server_url + '/wapt', timeout=10, verify_cert=WAPT.waptserver.verify_cert ) if not all(key in t for key in ['client_certificate', 'client_private_key']): print("Error: Missing required certificates for local store access") return "ERROR" localstore.client_certificate = t['client_certificate'] localstore.client_private_key = t['client_private_key'] def give_password(location=None, identity=None): return client_private_key_password localstore.private_key_password_callback = give_password local_package_list = _get_packages_versions(localstore, dict_host_capa) print(f"Info: ✓ Local packages retrieved for {len(local_package_list)} configurations") except Exception as e: print(f"Error: Failed to fetch local packages: {e}") import traceback print(f"Error: Traceback: {traceback.format_exc()}") return "ERROR" # Compare and notify try: result = _compare_and_notify(local_package_list, online_package_list, dict_host_capa) print(f"Info: Audit completed with result: {result}") return result except Exception as e: print(f"Error: Failed to compare packages and send notifications: {e}") import traceback print(f"Error: Traceback: {traceback.format_exc()}") return "ERROR" except Exception as e: print(f"Error: Unexpected error during audit: {e}") import traceback print(f"Error: Traceback: {traceback.format_exc()}") return "ERROR" def send_to_rocket(message_text, attachments=None): """ Envoie un message à Rocket.Chat via un webhook. :param message_text: Texte du message à envoyer :param attachments: Liste de pièces jointes (facultatif) """ try: print("Info: Attempting to send Rocket.Chat notification...") config_file = makepath(WAPT.private_dir, "comi-apps-to-update-on-wapt-server.ini") if not isfile(config_file): print("Warning: Configuration file not found, skipping Rocket.Chat notification") return False conf_wapt = ConfigParser() conf_wapt.read(config_file) try: webhook_url = conf_wapt.get("rocket", "url") if not webhook_url: raise ValueError("Webhook URL is empty") except (NoSectionError, NoOptionError) as e: raise ValueError(f"Rocket.Chat webhook URL not configured: {e}") # Construire le message message = { 'text': message_text } if attachments: message['attachments'] = attachments # Envoyer la requête POST try: response = requests.post( webhook_url, data=json.dumps(message), headers={'Content-Type': 'application/json'}, timeout=30 ) # Vérifier la réponse if response.status_code == 200: print('Info: Rocket.Chat message sent successfully') return True else: print(f'Error: Failed to send Rocket.Chat message. Status: {response.status_code}, Error: {response.text}') return False except requests.exceptions.RequestException as e: print(f"Error: Network error sending Rocket.Chat message: {e}") return False except Exception as e: print(f"Error: Unexpected error sending Rocket.Chat notification: {e}") return False def send_email(body, subject): """Send email notification with error handling.""" try: print("Info: Attempting to send email notification...") config_file = makepath(WAPT.private_dir, "comi-apps-to-update-on-wapt-server.ini") if not isfile(config_file): print("Warning: Configuration file not found, skipping email notification") return False conf_wapt = ConfigParser() conf_wapt.read(config_file) try: from_addr = conf_wapt.get("smtp", "from_addr") to_addr = conf_wapt.get("smtp", "to_addr") password = conf_wapt.get("smtp", "password") smtpserver = conf_wapt.get("smtp", "smtpserver") if not all([from_addr, to_addr, password, smtpserver]): raise ValueError("Missing required SMTP configuration") except (NoSectionError, NoOptionError) as e: raise ValueError(f"SMTP configuration incomplete: {e}") print(f"Info: Sending email from {from_addr} to {to_addr}") message = f"Subject: {subject}\n\n{body}" try: server = smtplib.SMTP(smtpserver, 587, timeout=30) server.starttls() server.login(from_addr, password) server.sendmail(from_addr, to_addr, message) server.quit() print("Info: Email sent successfully") return True except smtplib.SMTPAuthenticationError as e: print(f"Error: SMTP authentication failed: {e}") return False except smtplib.SMTPConnectError as e: print(f"Error: Failed to connect to SMTP server: {e}") return False except smtplib.SMTPException as e: print(f"Error: SMTP error occurred: {e}") return False except Exception as e: print(f"Error: Unexpected error sending email: {e}") return False except Exception as e: print(f"Error: Error in email configuration: {e}") return False