Actualiser setup.py

This commit is contained in:
2025-12-29 10:03:53 +00:00
parent c95e799036
commit 2f436f8384

368
setup.py
View File

@@ -1,188 +1,182 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from setuphelpers import * from setuphelpers import *
import requests import requests
import json import json
import smtplib import smtplib
from configparser import ConfigParser import waptlicences
from waptpackage import HostCapabilities from configparser import ConfigParser
from waptpackage import WaptRemoteRepo from waptpackage import HostCapabilities
from waptpackage import PackageVersion from waptpackage import WaptRemoteRepo
from waptpackage import PackageVersion
all_package = {} from common import get_requests_client_cert_session
dict_host_capa = { def install():
"ubuntu22frx64": HostCapabilities( plugin_inifiles = glob.glob("*.ini")
architecture="x64",
language="fr", for file in plugin_inifiles:
os="ubuntu", if not isfile(makepath(WAPT.private_dir,file.split("\\")[-1])) :
packages_locales=["fr", "en", "es", "de", "it"], print(f"copie de {file} dans {WAPT.private_dir}")
tags=["debian", "debian_based", "linux", "unix", "debian11", "ubuntu-22"], filecopyto(file, WAPT.private_dir)
os_version="11",
), def audit():
"ubuntu20frx64": HostCapabilities(
architecture="x64", CONFWAPT = ConfigParser()
language="fr", CONFWAPT.read(makepath(WAPT.private_dir, "wapt_api.ini"))
os="ubuntu", username_wapt = CONFWAPT.get("wapt", "wapt_username")
packages_locales=["fr", "en", "es", "de", "it"], password_wapt = CONFWAPT.get("wapt", "wapt_password")
tags=["debian", "debian_based", "linux", "unix", "debian11", "ubuntu-20"],
os_version="11", dict_host_capa = {}
),
"debian11frx64": HostCapabilities( t = waptlicences.waptserver_login(WAPT.config_filename,username_wapt,password_wapt)
architecture="x64", if not 'session' in t['session_cookies']:
language="fr", session_cookies = [u for u in t['session_cookies'] if u['Domain'] == WAPT.waptserver.server_url.split('://')[-1]][0]
os="debian", else:
packages_locales=["fr", "en", "es", "de", "it"], session_cookies = t['session_cookies']['session']
tags=["debian-bullseye", "debian", "debian_based", "linux", "unix", "debian11", "debian-11"], session_cookies['Name'] = 'session'
os_version="11",
), client_private_key_password = t["client_private_key_password"]
"debian12frx64": HostCapabilities(
architecture="x64", sessionwapt = get_requests_client_cert_session(WAPT.waptserver.server_url,cert=(t['client_certificate'],t['client_private_key'],t['client_private_key_password']),verify=WAPT.waptserver.verify_cert)
language="fr", sessionwapt.cookies.set(session_cookies['Name'], session_cookies['Value'], domain=session_cookies['Domain'])
os="debian", sessionwapt.verify = WAPT.waptserver.verify_cert
packages_locales=["fr", "en", "es", "de", "it"],
tags=["debian-bookworm", "debian", "debian_based", "linux", "unix", "debian12", "debian-12"], for pc in json.loads(sessionwapt.get("%s/api/v3/hosts?columns=host_capabilities&limit=1000000" % WAPT.waptserver.server_url).content)["result"]:
os_version="11", if not pc['host_capabilities']:
), continue
"win10x64fr": HostCapabilities(
architecture="x64", dict_capa = dict(architecture= pc['host_capabilities']['architecture'],
language="fr", language=pc['host_capabilities']['language'],
os="windows", os=pc['host_capabilities']['os'],
packages_locales=["fr", "en", "es", "de", "it"], packages_locales= sorted(pc['host_capabilities']['packages_locales']),
tags=["windows-10", "win-10", "w-10", "windows10", "win10", "w10", "windows", "win", "w"], tags=sorted(pc['host_capabilities']['tags']),
os_version="10.0.19043", os_version=pc['host_capabilities']['os_version'])
),
} tempo_capa = HostCapabilities(**dict_capa)
def install(): dict_host_capa[str(dict_capa)] = tempo_capa
plugin_inifiles = glob.glob("*.ini")
store = WaptRemoteRepo(name="main", url='https://wapt.tranquil.it/wapt', timeout=4, verify_cert=True)
for file in plugin_inifiles: localstore = WaptRemoteRepo(name="main", url= WAPT.waptserver.server_url + '/wapt', timeout=4, verify_cert=WAPT.waptserver.verify_cert)
if not isfile(makepath(WAPT.private_dir,file.split("\\")[-1])) :
print(f"copie de {file} dans {WAPT.private_dir}") store_packages = store.packages()
filecopyto(file, WAPT.private_dir)
localstore.client_certificate = t['client_certificate']
def audit(): localstore.client_private_key = t['client_private_key']
plugin_inifile = makepath(WAPT.private_dir, "wapt_api.ini")
conf_wapt = ConfigParser() def give_password(location=None,identity=None):
conf_wapt.read(plugin_inifile) return client_private_key_password
wapt_url = conf_wapt.get("wapt", "wapt_url")
wapt_user = conf_wapt.get("wapt", "wapt_username") localstore.private_key_password_callback = give_password
wapt_password = conf_wapt.get("wapt", "wapt_password")
store_localstore = localstore.packages()
app_to_update_json_path = makepath(WAPT.private_dir, "app_to_update.json")
if isfile(app_to_update_json_path): # Download JSON data from the URL
print("suppression de l'ancienne version du fichier json") online_package_list = {}
remove_file(app_to_update_json_path) local_package_list = {}
for hc in dict_host_capa:
store = WaptRemoteRepo(name="main", url="https://wapt.tranquil.it/wapt", timeout=4, verify_cert=False) online_package_version = {}
localstore = WaptRemoteRepo(name="main", url="https://srvwapt.comitari.fr/wapt", timeout=4, verify_cert=False) for packageentry in store_packages:
# Download JSON data from the URL if dict_host_capa[hc].is_matching_package(packageentry):
online_package_list = {} if not packageentry.package in online_package_version:
local_package_list = {} online_package_version[packageentry.package] = "0"
for hc in dict_host_capa: if PackageVersion(online_package_version[packageentry.package]) < PackageVersion(packageentry.version):
online_package_version = {} online_package_version[packageentry.package] = packageentry.version
for packageentry in store.packages(): online_package_list[hc] = online_package_version
if dict_host_capa[hc].is_matching_package(packageentry):
if not packageentry.package in online_package_version: for hc in dict_host_capa:
online_package_version[packageentry.package] = "0" local_package_version = {}
if PackageVersion(online_package_version[packageentry.package]) < PackageVersion(packageentry.version): for packageentry in store_localstore:
online_package_version[packageentry.package] = packageentry.version if dict_host_capa[hc].is_matching_package(packageentry):
online_package_list[hc] = online_package_version if not packageentry.package in local_package_version:
local_package_version[packageentry.package] = "0"
for hc in dict_host_capa: if PackageVersion(local_package_version[packageentry.package]) < PackageVersion(packageentry.version):
local_package_version = {} local_package_version[packageentry.package] = packageentry.version
for packageentry in localstore.packages(): local_package_list[hc] = local_package_version
if dict_host_capa[hc].is_matching_package(packageentry):
if not packageentry.package in local_package_version: list_app_to_update = []
local_package_version[packageentry.package] = "0" for hc in dict_host_capa:
if PackageVersion(local_package_version[packageentry.package]) < PackageVersion(packageentry.version): for app in local_package_list[hc]:
local_package_version[packageentry.package] = packageentry.version if "-" in app:
local_package_list[hc] = local_package_version if "tis-" + app.split("-", 1)[1] in online_package_list[hc]:
if PackageVersion(local_package_list[hc][app]) < PackageVersion(online_package_list[hc]["tis-" + app.split("-", 1)[1]]) and app not in list_app_to_update:
list_app_to_update = [] print(
for hc in dict_host_capa: f'{app} new version detected from {local_package_list[hc][app]} to {online_package_list[hc]["tis-"+app.split("-", 1)[1]]} for {hc}'
for app in local_package_list[hc]: )
if "-" in app: list_app_to_update.append(
if "tis-" + app.split("-", 1)[1] in online_package_list[hc]: {
if PackageVersion(local_package_list[hc][app]) < PackageVersion(online_package_list[hc]["tis-" + app.split("-", 1)[1]]) and app not in list_app_to_update: "package": app,
print( "old_version": local_package_list[hc][app],
f'{app} new version detected from {local_package_list[hc][app]} to {online_package_list[hc]["tis-"+app.split("-", 1)[1]]} for {hc}' "new_version": online_package_list[hc]["tis-" + app.split("-", 1)[1]],
) }
list_app_to_update.append( )
{ WAPT.write_audit_data_if_changed("apps_to_upgrade", "list", list_app_to_update, max_count=3)
"package": app,
"old_version": local_package_list[hc][app],
"new_version": online_package_list[hc]["tis-" + app.split("-", 1)[1]], if not list_app_to_update:
} message="your repository seems up to date"
) print(message)
WAPT.write_audit_data_if_changed("apps_to_upgrade", "list", list_app_to_update, max_count=3) #send_to_rocket(message)
return "OK"
else:
if not list_app_to_update: message=f"You need to update some packages :\n"
message="your repository seems up to date" for app in list_app_to_update:
print(message) message += f"**{app['package']}** : {app['new_version']} from : {app['old_version']}\n"
#send_to_rocket(message) print(message)
return "OK" #send_to_rocket(message)
else: send_email("Some application need to be updated on your wapt server",message)
message=f"You need to update some packages :\n" return "WARNING"
for app in list_app_to_update:
message += f"**{app['package']}** : {app['new_version']} from : {app['old_version']}\n"
print(message) def send_to_rocket(message_text, attachments=None):
#send_to_rocket(message) """
send_email("Some application need to be updated on your wapt server",message) Envoie un message à Rocket.Chat via un webhook.
return "WARNING"
:param message_text: Texte du message à envoyer
:param attachments: Liste de pièces jointes (facultatif)
def send_to_rocket(message_text, attachments=None): """
""" smtp_inifile = makepath(WAPT.private_dir, "rocket.ini")
Envoie un message à Rocket.Chat via un webhook. conf_wapt = ConfigParser()
conf_wapt.read(smtp_inifile)
:param message_text: Texte du message à envoyer
:param attachments: Liste de pièces jointes (facultatif) webhook_url = conf_wapt.get("rocket", "url")
"""
smtp_inifile = makepath(WAPT.private_dir, "rocket.ini") # Construire le message
conf_wapt = ConfigParser()
conf_wapt.read(smtp_inifile) message = {
'text': message_text
webhook_url = conf_wapt.get("rocket", "url") }
if attachments:
# Construire le message message['attachments'] = attachments
message = { # Envoyer la requête POST
'text': message_text response = requests.post(webhook_url, data=json.dumps(message), headers={'Content-Type': 'application/json'})
}
if attachments: # Vérifier la réponse
message['attachments'] = attachments if response.status_code == 200:
print('Message envoyé avec succès.')
# Envoyer la requête POST else:
response = requests.post(webhook_url, data=json.dumps(message), headers={'Content-Type': 'application/json'}) print(f'Échec de l\'envoi du message. Statut de la réponse : {response.status_code}')
print(f'Erreur : {response.text}')
# Vérifier la réponse
if response.status_code == 200:
print('Message envoyé avec succès.') def send_mail(body,subject):
else:
print(f'Échec de l\'envoi du message. Statut de la réponse : {response.status_code}') smtp_inifile = makepath(WAPT.private_dir, "smtp.ini")
print(f'Erreur : {response.text}') conf_wapt = ConfigParser()
conf_wapt.read(smtp_inifile)
def send_mail(body,subject): from_addr = conf_wapt.get("smtp", "from_addr")
to_addr = conf_wapt.get("smtp", "to_addr")
smtp_inifile = makepath(WAPT.private_dir, "smtp.ini") password = conf_wapt.get("smtp", "password")
conf_wapt = ConfigParser() smtpserver = conf_wapt.get("smtp", "smtpserver")
conf_wapt.read(smtp_inifile)
print(from_addr)
from_addr = conf_wapt.get("smtp", "from_addr")
to_addr = conf_wapt.get("smtp", "to_addr")
password = conf_wapt.get("smtp", "password") message = f"Subject: {subject}\n\n{body}"
smtpserver = conf_wapt.get("smtp", "smtpserver") server = smtplib.SMTP(smtpserver, 587)
server.starttls()
print(from_addr) server.login(from_addr, password)
server.sendmail(from_addr, to_addr, message)
server.quit()
message = f"Subject: {subject}\n\n{body}"
server = smtplib.SMTP(smtpserver, 587)
server.starttls()
server.login(from_addr, password)
server.sendmail(from_addr, to_addr, message)
server.quit()
return "OK" return "OK"