#!/usr/bin/python3 import os import base64 import requests import json import argparse from cryptography.hazmat.primitives.serialization import pkcs12, pkcs7 from cryptography.hazmat.primitives import hashes, serialization parser = argparse.ArgumentParser() parser.add_argument("-debug", action=argparse.BooleanOptionalAction) parser.add_argument("-force", action=argparse.BooleanOptionalAction) parser.add_argument("-serialnumber") parser.add_argument("-staginguser") args = parser.parse_args() settingsDefault = { "airwatchServer":"https://airwatchServer", "airwatchAPIKey":"APIKEY", "airwatchAuthMethod":"CMSURL", "airwatchCertPath":"/path/to/cert", "airwatchCertPass":"certPassword", "airwatchAPIUser":"UserAPI", "airwatchAPIPassword":"PasswordUserAPI", "glpiServer":"http://127.0.0.1/glpi", "glpiAppToken":"GLPIAppToken", "glpiUserToken":"GLPIUserToken", "stagingUser":"staging-pr", "userAgent":"Airwatch Synchronizer" } settings = None if(not os.path.isfile("./settings.json")): f = open("./settings.json", "w") f.write(json.dumps(settingsDefault, indent=4)) f.close() exit(1) else: with open("./settings.json", "r") as f: settings = json.load(f) #======== Paramètres du script ========# # Emplacement du verrou lockFile = './airwatchStagingUserAssignation.lock' debug=args.debug # Informations du serveur Airwatch airwatchServer = settings["airwatchServer"] airwatchAPIKey = settings["airwatchAPIKey"] airwatchAuthMethod = settings["airwatchAuthMethod"] airwatchAPIUser = None airwatchAPIPassword = None airwatchCertPath = None airwatchCertPass = None if(airwatchAuthMethod == 'password'): airwatchAPIUser = settings["airwatchAPIUser"] airwatchAPIPassword = settings["airwatchAPIPassword"] elif(airwatchAuthMethod == 'CMSURL'): airwatchCertPath = settings["airwatchCertPath"] airwatchCertPass = settings["airwatchCertPass"] stagingUser = settings["stagingUser"] if(args.staginguser != None): stagingUser = args.staginguser # Informations du serveur GLPI GLPIServer = settings["glpiServer"] GLPIAppToken = settings["glpiAppToken"] GLPIUserToken = settings["glpiUserToken"] # ====================================== # def getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri, User=None, password=None, CertPath=None, CertPassword=None): if(airwatchAuthMethod == "password"): airwatchAPIUserToken = base64.b64encode(f"{airwatchAPIUser}:{airwatchAPIPassword}".encode('ascii')).decode("ascii") return { "Authorization": f"Basic {airwatchAPIUserToken}", "aw-tenant-code": airwatchAPIKey, "Accept": "application/json" } else: signing_data = uri.split('?')[0] with open(CertPath, 'rb') as certfile: cert = certfile.read() key, certificate, additional_certs = pkcs12.load_key_and_certificates(cert, CertPassword.encode()) options = [pkcs7.PKCS7Options.DetachedSignature] signed_data = pkcs7.PKCS7SignatureBuilder().set_data(signing_data.encode("UTF-8")).add_signer(certificate, key, hashes.SHA256()).sign(serialization.Encoding.DER, options) signed_data_b64 = base64.b64encode(signed_data).decode() return { "Authorization": f"CMSURL'1 {signed_data_b64}", "aw-tenant-code": airwatchAPIKey, "Accept": "application/json" } # Vérification de la présence du verrou avant de continuer if(os.path.isfile(lockFile) and not args.force): if(debug): print('Lock file is present, exiting...') exit(0) else: open(lockFile, "w").close() # Adresse de recherche des appareils filtré sur l'utilisateur de staging # avec limite de 500 appareils par page (limite max de l'API) airwatchAPIDevicesSearchURI = f"/API/mdm/devices/search?user={stagingUser}&pagesize=500&page=" airwatchHeaders = getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri=airwatchAPIDevicesSearchURI, User=airwatchAPIUser, password=airwatchAPIPassword, CertPath=airwatchCertPath, CertPassword=airwatchCertPass) # Page de départ pour la recherche pageNumber = 0 # Initialisation de la variable devices qui va stocker l'ensemble des appareils trouvés devices = [] uri = f"{airwatchServer}{airwatchAPIDevicesSearchURI}{pageNumber}" if(debug): print(f"Uri for device search on airwatch : {uri}") result = requests.get(uri, headers=airwatchHeaders) if(debug): print(f"Result of request : {result}") # On vérifie qu'on a bien un retour OK pour la requête API if(result.status_code != 200): # Suppression du verrou os.remove(lockFile) exit(0) result = result.json() # On fait une requête pour chaque page en fonction tant qu'on a pas atteint le nombre # d'appareils trouvé dans la première requête while(len(devices) != result["Total"]): uri = f"{airwatchServer}{airwatchAPIDevicesSearchURI}{pageNumber}" result = requests.get(uri, headers=airwatchHeaders).json() devices += result["Devices"] pageNumber += 1 # Adresse d'initalisation de l'api GLPI GLPIAPIInitUri = '/apirest.php/initSession/' GLPIHeaders = { 'Content-Type': 'application/json', "Authorization": f"user_token {GLPIUserToken}", "App-Token": GLPIAppToken } # Récupération d'un token de session uri = f"{GLPIServer}{GLPIAPIInitUri}" result = requests.get(uri, headers=GLPIHeaders) if(debug): print(f"GLPI api access : {result}") if(result.status_code != 200): # Suppression du verrou os.remove(lockFile) exit(1) GLPISessionToken = result.json()["session_token"] if(debug): print(f"GLPI session Token: {GLPISessionToken}") # Changement des headers pour remplacer l'user token par le token de session GLPIHeaders = { 'Content-Type': 'application/json', "Session-Token": GLPISessionToken, "App-Token": GLPIAppToken } # Adresse de recherche des appareils présents dans ordinateurs sur GLPI GLPIAPISearchComputer = '/apirest.php/search/computer?' for device in devices: if(device["EnrollmentStatus"] != 'Enrolled'): continue if(args.serialnumber != None and device["SerialNumber"] != args.serialnumber): continue if(device["Imei"] != ''): if(debug): print(f"Imei = {device['Imei']}") # Recherche des appareils en fonction du numéro de série ou de l'imei # l'imei pouvant être dans le champ numéro de série ou les champs imei custom search_parameter = f'is_deleted=0&criteria[0][field]=5&withindexes=true&criteria[0][searchtype]=contains&criteria[0][value]=^{device["SerialNumber"]}$'\ f'&criteria[1][link]=OR&criteria[1][field]=5&criteria[1][searchtype]=contains&criteria[1][value]=^{device["Imei"]}$'\ f'&criteria[2][link]=OR&criteria[2][field]=76667&criteria[2][searchtype]=contains&criteria[2][value]=^{device["Imei"]}$'\ f'&criteria[3][link]=OR&criteria[3][field]=76670&criteria[3][searchtype]=contains&criteria[3][value]=^{device["Imei"]}$' else: # Recherche des appareils en fonction du numéro de série seulement search_parameter = f'is_deleted=0&criteria[0][field]=5&withindexes=true&criteria[0][searchtype]=contains&criteria[0][value]=^{device["SerialNumber"]}$' if(debug): print(f"Serial Number = {device['SerialNumber']}") searchUri = f"{GLPIServer}{GLPIAPISearchComputer}{search_parameter}" if(debug): print(f"searchURI = {searchUri}") search = requests.get(searchUri, headers=GLPIHeaders) # On ne gère pas pour l'instant d'autres code que le code 200 # voir en fonction des codes erreurs retournés par le serveur if(search.status_code != 200): break search = search.json() if(search["totalcount"] == 1): # Récupération de l'utilisateur de l'appareil dans la fiche GLPI de l'appareil for device_id, data in search["data"].items(): device_user = search["data"][device_id]["70"] if(debug): print(f"user on device in GLPI : {device_user}") # Vérification que l'appareil est associé à un utilisateur dans GLPI if(device_user != None): # Récupération de l'utilisateur sur Magenta cmdURI = f'/API/system/users/search?username={device_user}' airwatchHeaders = getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri=cmdURI, User=airwatchAPIUser, password=airwatchAPIPassword, CertPath=airwatchCertPath, CertPassword=airwatchCertPass) uri = f"{airwatchServer}{cmdURI}" if(debug): print(f"Airwatch user search uri : {uri}") user = requests.get(uri, headers=airwatchHeaders) # On ne gère pas pour l'instant d'autres code que le code 200 # voir en fonction des codes erreurs retournés par le serveur if(user.status_code != 200): break user = user.json() # Changement de l'utilisateur assigné sur l'appareil dans Magenta cmdURI = f'/API/mdm/devices/{device["Id"]["Value"]}/enrollmentuser/{user["Users"][0]["Id"]["Value"]}' patchUri = f'{airwatchServer}{cmdURI}' airwatchHeaders = getAirwatchHeaders(airwatchAuthMethod, airwatchAPIKey, uri=cmdURI, User=airwatchAPIUser, password=airwatchAPIPassword, CertPath=airwatchCertPath, CertPassword=airwatchCertPass) if(debug): print(f"patchUri = {patchUri}") requests.patch(patchUri, headers=airwatchHeaders) # Suppression du verrou os.remove(lockFile) exit(0)