Ajout du support des certificats Airwatch pour l'authentification

This commit is contained in:
Jason SECULA
2025-06-05 14:26:26 +02:00
parent 882cf674a2
commit 8e24abb269
2 changed files with 96 additions and 25 deletions

View File

@ -4,6 +4,8 @@ import base64
import requests import requests
import json import json
import argparse import argparse
from cryptography.hazmat.primitives.serialization import pkcs12, pkcs7
from cryptography.hazmat.primitives import hashes, serialization
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-debug", action=argparse.BooleanOptionalAction) parser.add_argument("-debug", action=argparse.BooleanOptionalAction)
@ -14,6 +16,9 @@ args = parser.parse_args()
settingsDefault = { settingsDefault = {
"airwatchServer":"https://apimagenta.phm.education.gouv.fr", "airwatchServer":"https://apimagenta.phm.education.gouv.fr",
"airwatchAPIKey":"", "airwatchAPIKey":"",
"airwatchAuthMethod":"CMSURL",
"airwatchCertPath":"",
"airwatchCertPass":"",
"airwatchAPIUser":"", "airwatchAPIUser":"",
"airwatchAPIPassword":"", "airwatchAPIPassword":"",
"glpiServer":"http://127.0.0.1/glpi/", "glpiServer":"http://127.0.0.1/glpi/",
@ -41,11 +46,19 @@ lockFile = './airwatchStagingUserAssignation.lock'
debug=args.debug debug=args.debug
# Informations du serveur Airwatch # Informations du serveur Airwatch
#airwatchServer = 'https://pp-apimagenta.phm.education.gouv.fr'
airwatchServer = settings["airwatchServer"] airwatchServer = settings["airwatchServer"]
airwatchAPIKey = settings["airwatchAPIKey"] airwatchAPIKey = settings["airwatchAPIKey"]
airwatchAPIUser = settings["airwatchAPIUser"] airwatchAuthMethod = settings["airwatchAuthMethod"]
airwatchAPIPassword = settings["airwatchAPIPassword"] airwatchAPIUser = None
airwatchAPIPassword = None
airwatchCertPath = None
airwatchCertPass = None
if(airwatchAuthMethod == 'password'):
airwatchAPIUser = settings["airwatchAPIUser"]
airwatchAPIPassword = settings["airwatchAPIPassword"]
elif(airwatchAuthMethod == 'CMSURL'):
airwatchCertPath = settings["airwatchCertPath"]
airwatchCertPass = settings["airwatchCertPass"]
stagingUser = settings["stagingUser"] stagingUser = settings["stagingUser"]
# Informations du serveur GLPI # Informations du serveur GLPI
@ -55,6 +68,30 @@ GLPIUserToken = settings["glpiUserToken"]
# ====================================== # # ====================================== #
def getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri, User=None, password=None, CertPath=None, CertPassword=None):
if(airwatchAuthMethod == "password"):
airwatchAPIUserToken = base64.b64encode(f"{airwatchAPIUser}:{airwatchAPIPassword}".encode('ascii')).decode("ascii")
return {
"Authorization": f"Basic {airwatchAPIUserToken}",
"aw-tenant-code": airwatchAPIKey,
"Accept": "application/json"
}
else:
signing_data = uri.split('?')[0]
with open(CertPath, 'rb') as certfile:
cert = certfile.read()
key, certificate, additional_certs = pkcs12.load_key_and_certificates(cert, CertPassword.encode())
options = [pkcs7.PKCS7Options.DetachedSignature]
signed_data = pkcs7.PKCS7SignatureBuilder().set_data(signing_data.encode("UTF-8")).add_signer(certificate, key, hashes.SHA256()).sign(serialization.Encoding.DER, options)
signed_data_b64 = base64.b64encode(signed_data).decode()
return {
"Authorization": f"CMSURL'1 {signed_data_b64}",
"aw-tenant-code": airwatchAPIKey,
"Accept": "application/json"
}
# Vérification de la présence du verrou avant de continuer # Vérification de la présence du verrou avant de continuer
if(os.path.isfile(lockFile) and not args.force): if(os.path.isfile(lockFile) and not args.force):
@ -68,14 +105,7 @@ else:
# avec limite de 500 appareils par page (limite max de l'API) # avec limite de 500 appareils par page (limite max de l'API)
airwatchAPIDevicesSearchURI = f"/API/mdm/devices/search?user={stagingUser}&pagesize=500&page=" airwatchAPIDevicesSearchURI = f"/API/mdm/devices/search?user={stagingUser}&pagesize=500&page="
# User token formé par la concaténation du nom de l'utilisateur API et de son mot de passe converti en base 64 airwatchHeaders = getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri=airwatchAPIDevicesSearchURI, User=airwatchAPIUser, password=airwatchAPIPassword, CertPath=airwatchCertPath, CertPassword=airwatchCertPass)
airwatchAPIUserToken = base64.b64encode(f"{airwatchAPIUser}:{airwatchAPIPassword}".encode('ascii')).decode("ascii")
airwatchHeaders = {
"Authorization": f"Basic {airwatchAPIUserToken}",
"aw-tenant-code": airwatchAPIKey,
"Accept": "application/json"
}
# Page de départ pour la recherche # Page de départ pour la recherche
pageNumber = 0 pageNumber = 0
@ -186,7 +216,9 @@ for device in devices:
# Vérification que l'appareil est associé à un utilisateur dans GLPI # Vérification que l'appareil est associé à un utilisateur dans GLPI
if(device_user != None): if(device_user != None):
# Récupération de l'utilisateur sur Magenta # Récupération de l'utilisateur sur Magenta
uri = f"{airwatchServer}/API/system/users/search?username={device_user}" cmdURI = f'/API/system/users/search?username={device_user}'
airwatchHeaders = getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri=cmdURI, User=airwatchAPIUser, password=airwatchAPIPassword, CertPath=airwatchCertPath, CertPassword=airwatchCertPass)
uri = f"{airwatchServer}{cmdURI}"
if(debug): if(debug):
print(f"Airwatch user search uri : {uri}") print(f"Airwatch user search uri : {uri}")
user = requests.get(uri, headers=airwatchHeaders) user = requests.get(uri, headers=airwatchHeaders)
@ -198,7 +230,9 @@ for device in devices:
user = user.json() user = user.json()
# Changement de l'utilisateur assigné sur l'appareil dans Magenta # Changement de l'utilisateur assigné sur l'appareil dans Magenta
patchUri = f'{airwatchServer}/API/mdm/devices/{device["Id"]["Value"]}/enrollmentuser/{user["Users"][0]["Id"]["Value"]}' cmdURI = f'/API/mdm/devices/{device["Id"]["Value"]}/enrollmentuser/{user["Users"][0]["Id"]["Value"]}'
patchUri = f'{airwatchServer}{cmdURI}'
airwatchHeaders = getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri=cmdURI, User=airwatchAPIUser, password=airwatchAPIPassword, CertPath=airwatchCertPath, CertPassword=airwatchCertPass)
if(debug): if(debug):
print(f"patchUri = {patchUri}") print(f"patchUri = {patchUri}")
requests.patch(patchUri, headers=airwatchHeaders) requests.patch(patchUri, headers=airwatchHeaders)

View File

@ -5,6 +5,8 @@ import base64
import requests import requests
import json import json
import argparse import argparse
from cryptography.hazmat.primitives.serialization import pkcs12, pkcs7
from cryptography.hazmat.primitives import hashes, serialization
from datetime import datetime from datetime import datetime
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -18,6 +20,9 @@ args = parser.parse_args()
settingsDefault = { settingsDefault = {
"airwatchServer":"https://apimagenta.phm.education.gouv.fr", "airwatchServer":"https://apimagenta.phm.education.gouv.fr",
"airwatchAPIKey":"", "airwatchAPIKey":"",
"airwatchAuthMethod":"CMSURL",
"airwatchCertPath":"",
"airwatchCertPass":"",
"airwatchAPIUser":"", "airwatchAPIUser":"",
"airwatchAPIPassword":"", "airwatchAPIPassword":"",
"glpiServer":"http://127.0.0.1/glpi/", "glpiServer":"http://127.0.0.1/glpi/",
@ -45,11 +50,19 @@ lockFile = './airwatchSyncGLPI.lock'
debug=args.debug debug=args.debug
# Informations du serveur Airwatch # Informations du serveur Airwatch
#airwatchServer = 'https://pp-apimagenta.phm.education.gouv.fr'
airwatchServer = settings["airwatchServer"] airwatchServer = settings["airwatchServer"]
airwatchAPIKey = settings["airwatchAPIKey"] airwatchAPIKey = settings["airwatchAPIKey"]
airwatchAPIUser = settings["airwatchAPIUser"] airwatchAuthMethod = settings["airwatchAuthMethod"]
airwatchAPIPassword = settings["airwatchAPIPassword"] airwatchAPIUser = None
airwatchAPIPassword = None
airwatchCertPath = None
airwatchCertPass = None
if(airwatchAuthMethod == 'password'):
airwatchAPIUser = settings["airwatchAPIUser"]
airwatchAPIPassword = settings["airwatchAPIPassword"]
else:
airwatchCertPath = settings["airwatchCertPath"]
airwatchCertPass = settings["airwatchCertPass"]
# Informations du serveur GLPI # Informations du serveur GLPI
GLPIServer = settings["glpiServer"] GLPIServer = settings["glpiServer"]
@ -66,6 +79,31 @@ platformFilterOut = [12]
# ====================================== # # ====================================== #
def getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri, User=None, password=None, CertPath=None, CertPassword=None):
if(airwatchAuthMethod == "password"):
airwatchAPIUserToken = base64.b64encode(f"{airwatchAPIUser}:{airwatchAPIPassword}".encode('ascii')).decode("ascii")
return {
"Authorization": f"Basic {airwatchAPIUserToken}",
"aw-tenant-code": airwatchAPIKey,
"Accept": "application/json"
}
else:
signing_data = uri.split('?')[0]
with open(CertPath, 'rb') as certfile:
cert = certfile.read()
key, certificate, additional_certs = pkcs12.load_key_and_certificates(cert, CertPassword.encode())
options = [pkcs7.PKCS7Options.DetachedSignature]
signed_data = pkcs7.PKCS7SignatureBuilder().set_data(signing_data.encode("UTF-8")).add_signer(certificate, key, hashes.SHA256()).sign(serialization.Encoding.DER, options)
signed_data_b64 = base64.b64encode(signed_data).decode()
return {
"Authorization": f"CMSURL'1 {signed_data_b64}",
"aw-tenant-code": airwatchAPIKey,
"Accept": "application/json"
}
# Vérification de la présence du verrou avant de continuer # Vérification de la présence du verrou avant de continuer
if(os.path.isfile(lockFile) and not args.force): if(os.path.isfile(lockFile) and not args.force):
if(debug): if(debug):
@ -78,15 +116,7 @@ else:
# Adresse de recherche des appareils # Adresse de recherche des appareils
# avec limite de 500 appareils par page (limite max de l'API) # avec limite de 500 appareils par page (limite max de l'API)
airwatchAPIDevicesSearchURI = f"/API/mdm/devices/search?pagesize=500&page=" airwatchAPIDevicesSearchURI = f"/API/mdm/devices/search?pagesize=500&page="
airwatchHeaders = getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri=airwatchAPIDevicesSearchURI, User=airwatchAPIUser, password=airwatchAPIPassword, CertPath=airwatchCertPath, CertPassword=airwatchCertPass)
# User token formé par la concaténation du nom de l'utilisateur API et de son mot de passe converti en base 64
airwatchAPIUserToken = base64.b64encode(f"{airwatchAPIUser}:{airwatchAPIPassword}".encode('ascii')).decode("ascii")
airwatchHeaders = {
"Authorization": f"Basic {airwatchAPIUserToken}",
"aw-tenant-code": airwatchAPIKey,
"Accept": "application/json"
}
# Page de départ pour la recherche # Page de départ pour la recherche
pageNumber = 0 pageNumber = 0
@ -95,6 +125,7 @@ pageNumber = 0
devices = [] devices = []
uri = f"{airwatchServer}{airwatchAPIDevicesSearchURI}{pageNumber}" uri = f"{airwatchServer}{airwatchAPIDevicesSearchURI}{pageNumber}"
if(debug): if(debug):
print(f"Uri for device search on airwatch : {uri}") print(f"Uri for device search on airwatch : {uri}")
result = requests.get(uri, headers=airwatchHeaders) result = requests.get(uri, headers=airwatchHeaders)
@ -121,6 +152,9 @@ while(len(devices) != result["Total"]):
pageNumber += 1 pageNumber += 1
if(debug):
print(f"Nombre d'appareils {len(devices)}")
# ====================== Début suppression des doublons ================================= # # ====================== Début suppression des doublons ================================= #
# On récupére les numéros de série # On récupére les numéros de série
@ -159,6 +193,7 @@ devices = [d for d in devices if d["Id"]["Value"] not in devicesToDelete]
# envoi de la requête de suppression des appareils sur magenta # envoi de la requête de suppression des appareils sur magenta
airwatchAPIDeleteURI = '/API/mdm/devices/' airwatchAPIDeleteURI = '/API/mdm/devices/'
airwatchHeaders = getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri=airwatchAPIDeleteURI, User=airwatchAPIUser, password=airwatchAPIPassword, CertPath=airwatchCertPath, CertPassword=airwatchCertPass)
for device in devicesToDelete: for device in devicesToDelete:
uri = f"{airwatchServer}{airwatchAPIDeleteURI}{device}" uri = f"{airwatchServer}{airwatchAPIDeleteURI}{device}"
if(debug): if(debug):
@ -312,6 +347,7 @@ for device in devices:
} }
# Récupération des applications présents sur les appareils # Récupération des applications présents sur les appareils
airwatchAPIAppsSearchURI = f"/api/mdm/devices/{device['Uuid']}/apps/search" airwatchAPIAppsSearchURI = f"/api/mdm/devices/{device['Uuid']}/apps/search"
airwatchHeaders = getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri=airwatchAPIAppsSearchURI, User=airwatchAPIUser, password=airwatchAPIPassword, CertPath=airwatchCertPath, CertPassword=airwatchCertPass)
uri = f"{airwatchServer}{airwatchAPIAppsSearchURI}" uri = f"{airwatchServer}{airwatchAPIAppsSearchURI}"
apps = requests.get(uri, headers=airwatchHeaders).json() apps = requests.get(uri, headers=airwatchHeaders).json()
@ -343,6 +379,7 @@ for device in devices:
# Mise à jour du friendly name sur Airwatch # Mise à jour du friendly name sur Airwatch
if(device["DeviceFriendlyName"] != f"{data['1']} {platformName} {device['OperatingSystem']} - {device['UserName']}"): if(device["DeviceFriendlyName"] != f"{data['1']} {platformName} {device['OperatingSystem']} - {device['UserName']}"):
airwatchAPIURI = f"/API/mdm/devices/{device['Id']['Value']}" airwatchAPIURI = f"/API/mdm/devices/{device['Id']['Value']}"
airwatchHeaders = getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri=airwatchAPIURI, User=airwatchAPIUser, password=airwatchAPIPassword, CertPath=airwatchCertPath, CertPassword=airwatchCertPass)
uri = f"{airwatchServer}{airwatchAPIURI}" uri = f"{airwatchServer}{airwatchAPIURI}"
updateDeviceDetails = { updateDeviceDetails = {
"DeviceFriendlyName":f"{data['1']} {platformName} {device['OperatingSystem']} - {device['UserName']}" "DeviceFriendlyName":f"{data['1']} {platformName} {device['OperatingSystem']} - {device['UserName']}"