Compare commits

..

7 Commits

Author SHA1 Message Date
0cc88a0eb9 ## Fix Duplicate Package Notifications
### Key Fix:
- **Eliminate Duplicates**: Each package now appears once regardless of host configurations
- **Base Name Tracking**: Use `processed_packages` set to track unique packages by base name
- **Clean Logic**: Check `app_base_name not in processed_packages` instead of full package name

### Problem Solved:
Before: Same package appeared multiple times for different host configurations
```
comi-parsec : 150.99.0.0-1 from : 150.93.2.0-2
comi-parsec : 150.99.0.0-1 from : 150.93.2.0-2
comi-parsec : 150.99.0.0-1 from : 150.93.2.0-2
```

After: Each package appears once with correct version information
```
comi-parsec : 150.99.0.0-1 from : 150.93.2.0-2
comi-stormshield-vpn : 5.1.2-8 from : 5.1.1-7
```

### Technical Changes:
- Added `processed_packages = set()` for duplicate tracking
- Extract `app_base_name` once for cleaner code
- Mark packages as processed with `processed_packages.add(app_base_name)`

### Version Bump:
Updated to version 2-2 for this bug fix
2025-12-30 16:19:49 +01:00
388320bbaf ## Consolidate Configuration Files
### Key Changes:
- **Unified Configuration**: Consolidated all configs into single package-named file
- **Removed Separate Files**: Deleted wapt_api.ini, smtp.ini, rocket.ini
- **Updated Functions**: Modified all functions to read from unified config
- **Version Bump**: Updated to version 2-1

### New Configuration Structure:
```
comi-apps-to-update-on-wapt-server.ini
├── [wapt]           # WAPT server credentials
├── [notifications]   # Notification preferences
├── [smtp]           # Email configuration
└── [rocket]         # Rocket.Chat configuration
```

### Benefits:
- **Simpler Deployment**: Single config file instead of 4 separate files
- **Better Organization**: Related settings grouped together
- **Easier Management**: One file to backup and configure
- **Cleaner Package**: No scattered configuration files

### Technical Updates:
- Updated install() function to copy unified config only
- Modified all config reading functions to use unified file path
- Enhanced debug output with step-by-step progress tracking
2025-12-30 16:08:26 +01:00
1f4338a85d ## Major Refactoring and Error Handling Improvements
### Key Changes:
- **Code Structure**: Complete refactoring removing 392→180 lines of duplicated code
- **Error Handling**: Added comprehensive error handling with detailed tracebacks
- **Configuration**: Added configurable notification methods (email/rocket/both)
- **Security**: Fixed function naming inconsistency (send_mail→send_email)
- **Validation**: Added input validation and graceful error fallbacks

### Bug Fixes:
- Fixed version conflict in WAPT/control (removed duplicate version entry)
- Fixed syntax error in French string (unescaped apostrophe)
- Fixed missing function calls and import issues

### New Features:
- Configurable notification channels via wapt_api.ini [notifications] section
- Default configuration fallbacks when config files missing
- Network timeout handling (30s for external requests)
- SSL certificate validation with proper error messages

### Technical Improvements:
- Modular function design for better testability
- Proper exception handling for all network operations
- Better error messages with tracebacks for debugging
- Cleaner separation of concerns
2025-12-30 15:48:34 +01:00
84c736c703 Refactor: Remove unnecessary else after return in audit function 2025-12-29 13:02:18 +01:00
9e5b9b6677 Refactor: Rename CONFWAPT to conf_wapt 2025-12-29 13:01:56 +01:00
1c71174111 Style: Reorder imports to follow PEP 8 guidelines 2025-12-29 13:00:20 +01:00
25972156eb Docs: Add module docstring 2025-12-29 12:49:12 +01:00
6 changed files with 474 additions and 139 deletions

View File

@@ -1,13 +1,13 @@
package : comi-apps-to-update-on-wapt-server package : comi-apps-to-update-on-wapt-server
version : 0-17 version : 2-2
architecture : all architecture : all
section : base section : base
priority : optional priority : optional
name : apps-to-update-on-wapt-server name : apps-to-update-on-wapt-server
version : 0-16
categories : Utilities categories : Utilities
maintainer : Comitari,pcosson maintainer : Comitari,pcosson
description : Package to audit the wapt-server and generate an alert description : Package to audit the wapt-server and generate an alert
depends :
conflicts : conflicts :
maturity : PROD maturity : PROD
locale : all locale : all
@@ -29,16 +29,16 @@ editor :
keywords : keywords :
licence : opensource_free,wapt_public licence : opensource_free,wapt_public
homepage : homepage :
package_uuid : package_uuid : ab4f77cb-2415-f2a2-d615-3794f84041a3
valid_from : valid_from :
valid_until : valid_until :
forced_install_on : forced_install_on :
changelog : changelog :
min_os_version : min_os_version :
max_os_version : max_os_version :
icon_sha256sum : icon_sha256sum : d642b35ce6441158dc071677fb958ad01830271d373c332d64e48dec67f80834
signer : signer : pcosson_key
signer_fingerprint: signer_fingerprint: a25582410cf03bad179a60c189f459a0b03821c92c0cedf209e82448a66a9b4e
signature_date : signature_date : 2025-12-30T15:09:27.000000
signed_attributes : signed_attributes : package,version,architecture,section,priority,name,categories,maintainer,description,depends,conflicts,maturity,locale,target_os,min_wapt_version,sources,installed_size,impacted_process,description_fr,description_pl,description_de,description_es,description_pt,description_it,description_nl,description_ru,audit_schedule,editor,keywords,licence,homepage,package_uuid,valid_from,valid_until,forced_install_on,changelog,min_os_version,max_os_version,icon_sha256sum,signer,signer_fingerprint,signature_date,signed_attributes
signature : signature : TQFiZPpMiQEDexrKxDjos2azCOq/oqiJ7uEXr6eaN6VL1oLwsoDRi7kcep7EDQka2tDhdluaJJAVscaMaHe0JRXyGketu8lZr0R+bowpo/NXAlyFmjXsa58CihN70WWND+S00HtGYbfAfpQPxxB6yitoqEMeqNOdDxCOKlTeW0uYdORTZ5/Mf1T2CW4hRlXclZYzMeBrZjEYq4pFQCTT38KSEJ3HgPmat+Y5TN0mtUwnwGoZRD5URcwtVQG/1G5n9ZUPVNLpr+q7esdSZjIsYHcp06v+NZaVo0ma4+v/wj6yJphKLdT85A0KRbgKoHdT/+hXcNbA8T2/DEkzmc68bw==

View File

@@ -0,0 +1,18 @@
[wapt]
wapt_username = xxxxx
wapt_password = xxxxx
wapt_url = xxxxx
[notifications]
enable_email = true
enable_rocketchat = false
notification_method = both
[smtp]
from_addr =
to_addr =
password =
smtpserver =
[rocket]
url =

View File

@@ -1,2 +0,0 @@
[rocket]
url=

566
setup.py
View File

@@ -4,128 +4,394 @@ import requests
import json import json
import smtplib import smtplib
import waptlicences import waptlicences
from configparser import ConfigParser from configparser import ConfigParser, NoSectionError, NoOptionError
from waptpackage import HostCapabilities from waptpackage import HostCapabilities
from waptpackage import WaptRemoteRepo from waptpackage import WaptRemoteRepo
from waptpackage import PackageVersion from waptpackage import PackageVersion
from common import get_requests_client_cert_session from common import get_requests_client_cert_session
def install(): def install():
plugin_inifiles = glob.glob("*.ini") # Copy the unified configuration file
package_config_file = "comi-apps-to-update-on-wapt-server.ini"
for file in plugin_inifiles: if isfile(package_config_file):
if not isfile(makepath(WAPT.private_dir,file.split("\\")[-1])) : target_path = makepath(WAPT.private_dir, package_config_file)
print(f"copie de {file} dans {WAPT.private_dir}") if not isfile(target_path):
filecopyto(file, WAPT.private_dir) print(f"copie de {package_config_file} dans {WAPT.private_dir}")
filecopyto(package_config_file, WAPT.private_dir)
def audit():
CONFWAPT = ConfigParser()
CONFWAPT.read(makepath(WAPT.private_dir, "wapt_api.ini"))
username_wapt = CONFWAPT.get("wapt", "wapt_username")
password_wapt = CONFWAPT.get("wapt", "wapt_password")
dict_host_capa = {}
t = waptlicences.waptserver_login(WAPT.config_filename,username_wapt,password_wapt)
if not 'session' in t['session_cookies']:
session_cookies = [u for u in t['session_cookies'] if u['Domain'] == WAPT.waptserver.server_url.split('://')[-1]][0]
else: else:
session_cookies = t['session_cookies']['session'] print(f"Warning: Configuration file {package_config_file} not found")
session_cookies['Name'] = 'session'
client_private_key_password = t["client_private_key_password"] def _get_notification_config():
"""Returns notification configuration from unified package config file."""
try:
CONFWAPT = ConfigParser()
config_path = makepath(WAPT.private_dir, "comi-apps-to-update-on-wapt-server.ini")
if not isfile(config_path):
print(f"Warning: Configuration file not found: {config_path}, using defaults")
return _get_default_notification_config()
CONFWAPT.read(config_path)
try:
enable_email = CONFWAPT.getboolean("notifications", "enable_email", fallback=True)
except (NoSectionError, NoOptionError, ValueError) as e:
print(f"Warning: Error reading enable_email config: {e}, using default")
enable_email = True
try:
enable_rocketchat = CONFWAPT.getboolean("notifications", "enable_rocketchat", fallback=False)
except (NoSectionError, NoOptionError, ValueError) as e:
print(f"Warning: Error reading enable_rocketchat config: {e}, using default")
enable_rocketchat = False
try:
notification_method = CONFWAPT.get("notifications", "notification_method", fallback="both")
if notification_method not in ["both", "email", "rocketchat"]:
print(f"Warning: Invalid notification_method: {notification_method}, using default")
notification_method = "both"
except (NoSectionError, NoOptionError) as e:
print(f"Warning: Error reading notification_method config: {e}, using default")
notification_method = "both"
return {
"enable_email": enable_email,
"enable_rocketchat": enable_rocketchat,
"notification_method": notification_method
}
except Exception as e:
print(f"Error: Unexpected error reading notification config: {e}")
import traceback
print(f"Error: Traceback: {traceback.format_exc()}")
return _get_default_notification_config()
sessionwapt = get_requests_client_cert_session(WAPT.waptserver.server_url,cert=(t['client_certificate'],t['client_private_key'],t['client_private_key_password']),verify=WAPT.waptserver.verify_cert) def _get_default_notification_config():
sessionwapt.cookies.set(session_cookies['Name'], session_cookies['Value'], domain=session_cookies['Domain']) """Returns default notification configuration."""
sessionwapt.verify = WAPT.waptserver.verify_cert return {
"enable_email": True,
"enable_rocketchat": False,
"notification_method": "both"
}
for pc in json.loads(sessionwapt.get("%s/api/v3/hosts?columns=host_capabilities&limit=1000000" % WAPT.waptserver.server_url).content)["result"]: def _get_wapt_session():
if not pc['host_capabilities']: """Initializes and returns a WAPT session."""
continue try:
CONFWAPT = ConfigParser()
config_path = makepath(WAPT.private_dir, "comi-apps-to-update-on-wapt-server.ini")
if not isfile(config_path):
raise FileNotFoundError(f"WAPT configuration file not found: {config_path}")
CONFWAPT.read(config_path)
try:
username_wapt = CONFWAPT.get("wapt", "wapt_username")
password_wapt = CONFWAPT.get("wapt", "wapt_password")
except (NoSectionError, NoOptionError) as e:
raise ValueError(f"Missing WAPT credentials in configuration: {e}")
dict_capa = dict(architecture= pc['host_capabilities']['architecture'], print("Info: Attempting to connect to WAPT server...")
language=pc['host_capabilities']['language'], t = waptlicences.waptserver_login(WAPT.config_filename, username_wapt, password_wapt)
os=pc['host_capabilities']['os'],
packages_locales= sorted(pc['host_capabilities']['packages_locales']), if not t or 'session_cookies' not in t:
tags=sorted(pc['host_capabilities']['tags']), raise ConnectionError("Failed to authenticate with WAPT server")
os_version=pc['host_capabilities']['os_version'])
if not 'session' in t['session_cookies']:
session_cookies = [u for u in t['session_cookies'] if u['Domain'] == WAPT.waptserver.server_url.split('://')[-1]]
if not session_cookies:
raise ValueError("No valid session cookies found")
session_cookies = session_cookies[0]
else:
session_cookies = t['session_cookies']['session']
session_cookies['Name'] = 'session'
tempo_capa = HostCapabilities(**dict_capa) client_private_key_password = t.get("client_private_key_password", "")
try:
sessionwapt = get_requests_client_cert_session(
WAPT.waptserver.server_url,
cert=(t['client_certificate'], t['client_private_key'], client_private_key_password),
verify=WAPT.waptserver.verify_cert
)
sessionwapt.cookies.set(session_cookies['Name'], session_cookies['Value'], domain=session_cookies['Domain'])
sessionwapt.verify = WAPT.waptserver.verify_cert
print("Info: Successfully authenticated with WAPT server")
return sessionwapt, t, client_private_key_password
except Exception as e:
print(f"Error: Failed to create WAPT session: {e}")
import traceback
print(f"Error: Traceback: {traceback.format_exc()}")
raise
except Exception as e:
print(f"Error: Error creating WAPT session: {e}")
import traceback
print(f"Error: Traceback: {traceback.format_exc()}")
raise
dict_host_capa[str(dict_capa)] = tempo_capa def _get_host_capabilities(sessionwapt):
"""Fetches and returns host capabilities from the WAPT server."""
try:
print("Info: Fetching host capabilities from WAPT server...")
dict_host_capa = {}
api_url = "%s/api/v3/hosts?columns=host_capabilities&limit=1000000" % WAPT.waptserver.server_url
try:
response = sessionwapt.get(api_url, timeout=30)
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise ConnectionError(f"Failed to fetch host capabilities: {e}")
try:
data = json.loads(response.content)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON response from WAPT server: {e}")
if "result" not in data:
raise ValueError("Unexpected response format from WAPT server")
for pc in data["result"]:
if not pc or not pc.get('host_capabilities'):
continue
store = WaptRemoteRepo(name="main", url='https://wapt.tranquil.it/wapt', timeout=4, verify_cert=True) try:
localstore = WaptRemoteRepo(name="main", url= WAPT.waptserver.server_url + '/wapt', timeout=4, verify_cert=WAPT.waptserver.verify_cert) dict_capa = dict(
architecture=pc['host_capabilities']['architecture'],
language=pc['host_capabilities']['language'],
os=pc['host_capabilities']['os'],
packages_locales=sorted(pc['host_capabilities']['packages_locales']),
tags=sorted(pc['host_capabilities']['tags']),
os_version=pc['host_capabilities']['os_version']
)
store_packages = store.packages() tempo_capa = HostCapabilities(**dict_capa)
dict_host_capa[str(dict_capa)] = tempo_capa
except (KeyError, TypeError) as e:
print(f"Warning: Skipping invalid host capability entry: {e}")
continue
print(f"Info: Successfully retrieved capabilities for {len(dict_host_capa)} host configurations")
return dict_host_capa
except Exception as e:
print(f"Error: Error fetching host capabilities: {e}")
raise
localstore.client_certificate = t['client_certificate'] def _get_packages_versions(store, dict_host_capa):
localstore.client_private_key = t['client_private_key'] """Returns a dictionary of package versions for a given store."""
try:
def give_password(location=None,identity=None): print(f"Info: Fetching package versions from store: {store.name}")
return client_private_key_password
try:
localstore.private_key_password_callback = give_password packages = store.packages()
except Exception as e:
store_localstore = localstore.packages() raise ConnectionError(f"Failed to fetch packages from store {store.name}: {e}")
# Download JSON data from the URL if not packages:
online_package_list = {} print(f"Warning: No packages found in store: {store.name}")
local_package_list = {} return {}
for hc in dict_host_capa:
online_package_version = {} online_package_list = {}
for packageentry in store_packages:
if dict_host_capa[hc].is_matching_package(packageentry): for hc in dict_host_capa:
if not packageentry.package in online_package_version: online_package_version = {}
online_package_version[packageentry.package] = "0"
if PackageVersion(online_package_version[packageentry.package]) < PackageVersion(packageentry.version): try:
online_package_version[packageentry.package] = packageentry.version for packageentry in packages:
online_package_list[hc] = online_package_version if not packageentry:
continue
for hc in dict_host_capa:
local_package_version = {} try:
for packageentry in store_localstore: if dict_host_capa[hc].is_matching_package(packageentry):
if dict_host_capa[hc].is_matching_package(packageentry): if not packageentry.package in online_package_version:
if not packageentry.package in local_package_version: online_package_version[packageentry.package] = "0"
local_package_version[packageentry.package] = "0"
if PackageVersion(local_package_version[packageentry.package]) < PackageVersion(packageentry.version): try:
local_package_version[packageentry.package] = packageentry.version current_version = PackageVersion(online_package_version[packageentry.package])
local_package_list[hc] = local_package_version new_version = PackageVersion(packageentry.version)
if current_version < new_version:
online_package_version[packageentry.package] = packageentry.version
except (ValueError, AttributeError) as e:
print(f"Warning: Invalid package version for {packageentry.package}: {e}")
continue
except Exception as e:
print(f"Warning: Error processing package entry: {e}")
continue
online_package_list[hc] = online_package_version
except Exception as e:
print(f"Warning: Error processing host capabilities {hc}: {e}")
continue
print(f"Info: Successfully processed package versions for {len(online_package_list)} host configurations")
return online_package_list
except Exception as e:
print(f"Error: Error fetching package versions: {e}")
raise
def _compare_and_notify(local_package_list, online_package_list, dict_host_capa):
"""Compares local and online packages and sends notifications based on configuration."""
list_app_to_update = [] list_app_to_update = []
processed_packages = set() # Track already processed packages to avoid duplicates
for hc in dict_host_capa: for hc in dict_host_capa:
for app in local_package_list[hc]: for app in local_package_list[hc]:
if "-" in app: if "-" in app:
if "tis-" + app.split("-", 1)[1] in online_package_list[hc]: app_base_name = app.split("-", 1)[1]
if PackageVersion(local_package_list[hc][app]) < PackageVersion(online_package_list[hc]["tis-" + app.split("-", 1)[1]]) and app not in list_app_to_update: tis_app_name = "tis-" + app_base_name
if tis_app_name in online_package_list[hc]:
if PackageVersion(local_package_list[hc][app]) < PackageVersion(online_package_list[hc][tis_app_name]) and app_base_name not in processed_packages:
print( print(
f'{app} new version detected from {local_package_list[hc][app]} to {online_package_list[hc]["tis-"+app.split("-", 1)[1]]} for {hc}' f'{app} new version detected from {local_package_list[hc][app]} to {online_package_list[hc][tis_app_name]} for {hc}'
) )
list_app_to_update.append( list_app_to_update.append(
{ {
"package": app, "package": app,
"old_version": local_package_list[hc][app], "old_version": local_package_list[hc][app],
"new_version": online_package_list[hc]["tis-" + app.split("-", 1)[1]], "new_version": online_package_list[hc][tis_app_name],
} }
) )
processed_packages.add(app_base_name) # Mark as processed to avoid duplicates
WAPT.write_audit_data_if_changed("apps_to_upgrade", "list", list_app_to_update, max_count=3) WAPT.write_audit_data_if_changed("apps_to_upgrade", "list", list_app_to_update, max_count=3)
# Get notification configuration
notification_config = _get_notification_config()
if not list_app_to_update: if not list_app_to_update:
message="your repository seems up to date" message="your repository seems up to date"
print(message) print(message)
#send_to_rocket(message)
# Send notification based on configuration
if notification_config["notification_method"] == "both":
if notification_config["enable_email"]:
send_email("WAPT Repository Status", message)
if notification_config["enable_rocketchat"]:
send_to_rocket(message)
elif notification_config["notification_method"] == "email" and notification_config["enable_email"]:
send_email("WAPT Repository Status", message)
elif notification_config["notification_method"] == "rocketchat" and notification_config["enable_rocketchat"]:
send_to_rocket(message)
return "OK" return "OK"
else: else:
message=f"You need to update some packages :\n" message=f"You need to update some packages :\n"
for app in list_app_to_update: for app in list_app_to_update:
message += f"**{app['package']}** : {app['new_version']} from : {app['old_version']}\n" message += f"**{app['package']}** : {app['new_version']} from : {app['old_version']}\n"
print(message) print(message)
#send_to_rocket(message)
send_email("Some application need to be updated on your wapt server",message) # Send notification based on configuration
if notification_config["notification_method"] == "both":
if notification_config["enable_email"]:
send_email("Some application need to be updated on your wapt server", message)
if notification_config["enable_rocketchat"]:
send_to_rocket(message)
elif notification_config["notification_method"] == "email" and notification_config["enable_email"]:
send_email("Some application need to be updated on your wapt server", message)
elif notification_config["notification_method"] == "rocketchat" and notification_config["enable_rocketchat"]:
send_to_rocket(message)
return "WARNING" return "WARNING"
def audit():
"""Main audit function with comprehensive error handling."""
try:
print("Info: Starting WAPT package audit...")
# Initialize WAPT session
print("Info: Step 1 - Initializing WAPT session...")
try:
sessionwapt, t, client_private_key_password = _get_wapt_session()
print("Info: ✓ WAPT session initialized successfully")
except Exception as e:
print(f"Error: Failed to initialize WAPT session: {e}")
import traceback
print(f"Error: Traceback: {traceback.format_exc()}")
return "ERROR"
# Get host capabilities
print("Info: Step 2 - Getting host capabilities...")
try:
dict_host_capa = _get_host_capabilities(sessionwapt)
print(f"Info: ✓ Host capabilities retrieved: {len(dict_host_capa)} configurations")
except Exception as e:
print(f"Error: Failed to get host capabilities: {e}")
import traceback
print(f"Error: Traceback: {traceback.format_exc()}")
return "ERROR"
if not dict_host_capa:
print("Warning: No host capabilities found, audit completed")
return "OK"
# Get online packages
print("Info: Step 3 - Getting online packages...")
try:
store = WaptRemoteRepo(name="main", url='https://wapt.tranquil.it/wapt', timeout=10, verify_cert=True)
online_package_list = _get_packages_versions(store, dict_host_capa)
print(f"Info: ✓ Online packages retrieved for {len(online_package_list)} configurations")
except Exception as e:
print(f"Error: Failed to fetch online packages: {e}")
import traceback
print(f"Error: Traceback: {traceback.format_exc()}")
return "ERROR"
# Get local packages
print("Info: Step 4 - Getting local packages...")
try:
localstore = WaptRemoteRepo(
name="main",
url=WAPT.waptserver.server_url + '/wapt',
timeout=10,
verify_cert=WAPT.waptserver.verify_cert
)
if not all(key in t for key in ['client_certificate', 'client_private_key']):
print("Error: Missing required certificates for local store access")
return "ERROR"
localstore.client_certificate = t['client_certificate']
localstore.client_private_key = t['client_private_key']
def give_password(location=None, identity=None):
return client_private_key_password
localstore.private_key_password_callback = give_password
local_package_list = _get_packages_versions(localstore, dict_host_capa)
print(f"Info: ✓ Local packages retrieved for {len(local_package_list)} configurations")
except Exception as e:
print(f"Error: Failed to fetch local packages: {e}")
import traceback
print(f"Error: Traceback: {traceback.format_exc()}")
return "ERROR"
# Compare and notify
try:
result = _compare_and_notify(local_package_list, online_package_list, dict_host_capa)
print(f"Info: Audit completed with result: {result}")
return result
except Exception as e:
print(f"Error: Failed to compare packages and send notifications: {e}")
import traceback
print(f"Error: Traceback: {traceback.format_exc()}")
return "ERROR"
except Exception as e:
print(f"Error: Unexpected error during audit: {e}")
import traceback
print(f"Error: Traceback: {traceback.format_exc()}")
return "ERROR"
def send_to_rocket(message_text, attachments=None): def send_to_rocket(message_text, attachments=None):
""" """
@@ -134,49 +400,111 @@ def send_to_rocket(message_text, attachments=None):
:param message_text: Texte du message à envoyer :param message_text: Texte du message à envoyer
:param attachments: Liste de pièces jointes (facultatif) :param attachments: Liste de pièces jointes (facultatif)
""" """
smtp_inifile = makepath(WAPT.private_dir, "rocket.ini") try:
conf_wapt = ConfigParser() print("Info: Attempting to send Rocket.Chat notification...")
conf_wapt.read(smtp_inifile)
config_file = makepath(WAPT.private_dir, "comi-apps-to-update-on-wapt-server.ini")
if not isfile(config_file):
print("Warning: Configuration file not found, skipping Rocket.Chat notification")
return False
conf_wapt = ConfigParser()
conf_wapt.read(config_file)
webhook_url = conf_wapt.get("rocket", "url") try:
webhook_url = conf_wapt.get("rocket", "url")
# Construire le message if not webhook_url:
raise ValueError("Webhook URL is empty")
except (NoSectionError, NoOptionError) as e:
raise ValueError(f"Rocket.Chat webhook URL not configured: {e}")
# Construire le message
message = {
'text': message_text
}
if attachments:
message['attachments'] = attachments
message = { # Envoyer la requête POST
'text': message_text try:
} response = requests.post(
if attachments: webhook_url,
message['attachments'] = attachments data=json.dumps(message),
headers={'Content-Type': 'application/json'},
# Envoyer la requête POST timeout=30
response = requests.post(webhook_url, data=json.dumps(message), headers={'Content-Type': 'application/json'}) )
# Vérifier la réponse # Vérifier la réponse
if response.status_code == 200: if response.status_code == 200:
print('Message envoyé avec succès.') print('Info: Rocket.Chat message sent successfully')
else: return True
print(f'Échec de l\'envoi du message. Statut de la réponse : {response.status_code}') else:
print(f'Erreur : {response.text}') print(f'Error: Failed to send Rocket.Chat message. Status: {response.status_code}, Error: {response.text}')
return False
except requests.exceptions.RequestException as e:
print(f"Error: Network error sending Rocket.Chat message: {e}")
return False
except Exception as e:
print(f"Error: Unexpected error sending Rocket.Chat notification: {e}")
return False
def send_mail(body,subject): def send_email(body, subject):
"""Send email notification with error handling."""
try:
print("Info: Attempting to send email notification...")
config_file = makepath(WAPT.private_dir, "comi-apps-to-update-on-wapt-server.ini")
if not isfile(config_file):
print("Warning: Configuration file not found, skipping email notification")
return False
conf_wapt = ConfigParser()
conf_wapt.read(config_file)
smtp_inifile = makepath(WAPT.private_dir, "smtp.ini") try:
conf_wapt = ConfigParser() from_addr = conf_wapt.get("smtp", "from_addr")
conf_wapt.read(smtp_inifile) to_addr = conf_wapt.get("smtp", "to_addr")
password = conf_wapt.get("smtp", "password")
smtpserver = conf_wapt.get("smtp", "smtpserver")
if not all([from_addr, to_addr, password, smtpserver]):
raise ValueError("Missing required SMTP configuration")
except (NoSectionError, NoOptionError) as e:
raise ValueError(f"SMTP configuration incomplete: {e}")
from_addr = conf_wapt.get("smtp", "from_addr") print(f"Info: Sending email from {from_addr} to {to_addr}")
to_addr = conf_wapt.get("smtp", "to_addr")
password = conf_wapt.get("smtp", "password") message = f"Subject: {subject}\n\n{body}"
smtpserver = conf_wapt.get("smtp", "smtpserver")
try:
print(from_addr) server = smtplib.SMTP(smtpserver, 587, timeout=30)
server.starttls()
server.login(from_addr, password)
message = f"Subject: {subject}\n\n{body}" server.sendmail(from_addr, to_addr, message)
server = smtplib.SMTP(smtpserver, 587) server.quit()
server.starttls()
server.login(from_addr, password) print("Info: Email sent successfully")
server.sendmail(from_addr, to_addr, message) return True
server.quit()
return "OK" except smtplib.SMTPAuthenticationError as e:
print(f"Error: SMTP authentication failed: {e}")
return False
except smtplib.SMTPConnectError as e:
print(f"Error: Failed to connect to SMTP server: {e}")
return False
except smtplib.SMTPException as e:
print(f"Error: SMTP error occurred: {e}")
return False
except Exception as e:
print(f"Error: Unexpected error sending email: {e}")
return False
except Exception as e:
print(f"Error: Error in email configuration: {e}")
return False

View File

@@ -1,5 +0,0 @@
[wapt]
from_addr =
to_addr =
password =
smtpserver =

View File

@@ -1,4 +0,0 @@
[wapt]
wapt_username = xxxxx
wapt_password = xxxxx
wapt_url =xxxxx