Files
glpi-airwatch-sync/scripts/syncGLPI.py
2025-06-03 17:55:48 +02:00

356 lines
13 KiB
Python

#!/usr/bin/python3
import os
import base64
import requests
import json
import argparse
from datetime import datetime
parser = argparse.ArgumentParser()
parser.add_argument("-debug", action=argparse.BooleanOptionalAction)
parser.add_argument("-searchFilter", type=str, choices=["Id", "SerialNumber", "Imei", "UserName"])
parser.add_argument("-searchValue", type=str)
parser.add_argument("-airwatchServer", type=str, default="https://apimagenta.phm.education.gouv.fr")
parser.add_argument("-airwatchAPIKey", type=str, default="")
parser.add_argument("-airwatchAPIUser", type=str, default="")
parser.add_argument("-airwatchAPIPassword", type=str, default="")
parser.add_argument("-stagingUser", type=str, default="staging-pr")
parser.add_argument("-glpiServer", type=str, default="http://127.0.0.1/glpi")
parser.add_argument("-glpiAppToken", type=str, default="")
parser.add_argument("-glpiUserToken", type=str, default="")
parser.add_argument("-force", action=argparse.BooleanOptionalAction)
args = parser.parse_args()
#======== Paramètres du script ========#
# Emplacement du verrou
lockFile = './magentaGLPIUpdate.lock'
debug=args.debug
# Informations du serveur Airwatch
#airwatchServer = 'https://pp-apimagenta.phm.education.gouv.fr'
airwatchServer = args.airwatchServer
airwatchAPIKey = args.airwatchAPIKey
airwatchAPIUser = args.airwatchAPIUser
airwatchAPIPassword = args.airwatchAPIPassword
stagingUser = args.stagingUser
# Informations du serveur GLPI
GLPIServer = args.glpiServer
GLPIAppToken = args.glpiAppToken
GLPIUserToken = args.glpiUserToken
# Filtres
searchFilter = args.searchFilter
searchValue = args.searchValue
# Platform exclusion (12 = computer)
platformFilterEnabled = True
platformFilterOut = [12]
# ====================================== #
# Vérification de la présence du verrou avant de continuer
if(os.path.isfile(lockFile) and not args.force):
if(debug):
print('Lock file is present, exiting...')
exit(0)
else:
open(lockFile, "w").close()
# Adresse de recherche des appareils
# avec limite de 500 appareils par page (limite max de l'API)
airwatchAPIDevicesSearchURI = f"/API/mdm/devices/search?pagesize=500&page="
# User token formé par la concaténation du nom de l'utilisateur API et de son mot de passe converti en base 64
airwatchAPIUserToken = base64.b64encode(f"{airwatchAPIUser}:{airwatchAPIPassword}".encode('ascii')).decode("ascii")
airwatchHeaders = {
"Authorization": f"Basic {airwatchAPIUserToken}",
"aw-tenant-code": airwatchAPIKey,
"Accept": "application/json"
}
# Page de départ pour la recherche
pageNumber = 0
# Initialisation de la variable devices qui va stocker l'ensemble des appareils trouvés
devices = []
uri = f"{airwatchServer}{airwatchAPIDevicesSearchURI}{pageNumber}"
if(debug):
print(f"Uri for device search on airwatch : {uri}")
result = requests.get(uri, headers=airwatchHeaders)
if(debug):
print(f"Result of request : {result}")
# On vérifie qu'on a bien un retour OK pour la requête API
if(result.status_code != 200):
print(result.json())
# Suppression du verrou
os.remove(lockFile)
exit(0)
result = result.json()
# On fait une requête pour chaque page tant qu'on a pas atteint le nombre
# d'appareils trouvé dans la première requête
while(len(devices) != result["Total"]):
uri = f"{airwatchServer}{airwatchAPIDevicesSearchURI}{pageNumber}"
result = requests.get(uri, headers=airwatchHeaders).json()
devices += result["Devices"]
pageNumber += 1
# ====================== Début suppression des doublons ================================= #
# On récupére les numéros de série
serials = [device["SerialNumber"] for device in devices]
# On garde ceux qui sont présent plus d'une fois et qui n'ont pas HUBNOSERIAL (BYOD) comme numéro de série
serials = {serial for serial in serials if serials.count(serial) > 1 and serial != 'HUBNOSERIAL'}
# Récupération des id et de la dernière date d'enrolement
devicesDouble = {}
for serial in serials:
# on fait une liste des appareils avec le même numéro de série
# que l'on stocke dans un dictionnaire avec le numéro de série en clé
devicesDouble[serial] = [[device["Id"]["Value"],datetime.strptime(device["LastEnrolledOn"], "%Y-%m-%dT%H:%M:%S.%f")] for device in devices if serial == device["SerialNumber"]]
if(debug):
print(f"Doublons détectés: {len(devicesDouble)}")
# On supprime les doublons qui ne se sont pas enrôlés en dernier
devicesToDelete = []
for k,v in devicesDouble.items():
latest = None
for d in v:
if(latest == None):
latest = d
else:
if(latest[1] < d[1]):
devicesToDelete += [latest[0]]
latest = d
else:
devicesToDelete += [d[0]]
# On retire ces appareils de la liste
devices = [d for d in devices if d["Id"]["Value"] not in devicesToDelete]
# envoi de la requête de suppression des appareils sur magenta
airwatchAPIDeleteURI = '/API/mdm/devices/'
for device in devicesToDelete:
uri = f"{airwatchServer}{airwatchAPIDeleteURI}{device}"
if(debug):
print(f"Suppression de {uri}")
requests.delete(uri, headers=airwatchHeaders)
# ====================== Fin suppression des doublons ================================= #
if(searchFilter != None):
if(debug):
print(f"SearchFilter set to {searchFilter}")
print(f"SearchValue set to {searchValue}")
if(searchFilter == 'Id'):
devices = [device for device in devices if device["Id"]["Value"] == searchValue]
else:
devices = [device for device in devices if device[searchFilter] == searchValue]
# Adresse d'initalisation de l'api GLPI
GLPIAPIInitUri = '/apirest.php/initSession/'
GLPIHeaders = {
'Content-Type': 'application/json',
"Authorization": f"user_token {GLPIUserToken}",
"App-Token": GLPIAppToken
}
# Récupération d'un token de session
uri = f"{GLPIServer}{GLPIAPIInitUri}"
result = requests.get(uri, headers=GLPIHeaders)
if(debug):
print(f"GLPI api access : {result}")
if(result.status_code != 200):
# Suppression du verrou
os.remove(lockFile)
exit(1)
GLPISessionToken = result.json()["session_token"]
if(debug):
print(f"GLPI session Token: {GLPISessionToken}")
# Changement des headers pour remplacer l'user token par le token de session
GLPIHeaders = {
'Content-Type': 'application/json',
"Session-Token": GLPISessionToken,
"App-Token": GLPIAppToken
}
# Adresse de recherche des appareils présents dans ordinateurs sur GLPI
GLPIAPISearchComputer = 'apirest.php/search/computer?'
platforms = {
2:"Apple iOS",
5:"Android",
12:"Windows Desktop"
}
processorArchs = {
0:{
"osArch":"arm64",
"softwareArch":"arm64"
},
9:{
"osArch":"64-bit",
"softwareArch":"x86_64"
}
}
for device in devices:
if(device["EnrollmentStatus"] != 'Enrolled'):
continue
if(device["Imei"] != ''):
if(debug):
print(f"Imei = {device['Imei']}")
# Recherche des appareils en fonction du numéro de série ou de l'imei
# l'imei pouvant être dans le champ numéro de série ou les champs imei custom
search_parameter = f'is_deleted=0&criteria[0][field]=5&withindexes=true&criteria[0][searchtype]=contains&criteria[0][value]=^{device["SerialNumber"]}$'\
f'&criteria[1][link]=OR&criteria[1][field]=5&criteria[1][searchtype]=contains&criteria[1][value]=^{device["Imei"]}$'\
f'&criteria[2][link]=OR&criteria[2][field]=76667&criteria[2][searchtype]=contains&criteria[2][value]=^{device["Imei"]}$'\
f'&criteria[3][link]=OR&criteria[3][field]=76670&criteria[3][searchtype]=contains&criteria[3][value]=^{device["Imei"]}$'
else:
# Recherche des appareils en fonction du numéro de série seulement
search_parameter = f'is_deleted=0&criteria[0][field]=5&withindexes=true&criteria[0][searchtype]=contains&criteria[0][value]=^{device["SerialNumber"]}$'
if(debug):
print(f"Serial Number = {device['SerialNumber']}")
searchUri = f"{GLPIServer}{GLPIAPISearchComputer}{search_parameter}"
if(debug):
print(f"searchURI = {searchUri}")
search = requests.get(searchUri, headers=GLPIHeaders)
# On ne gère pas pour l'instant d'autres code que le code 200
# voir en fonction des codes erreurs retournés par le serveur
if(search.status_code != 200):
break
search = search.json()
if(search["totalcount"] == 1):
# Récupération de l'utilisateur de l'appareil dans la fiche GLPI de l'appareil
for device_id, data in search["data"].items():
platformId = device["PlatformId"]["Id"]["Value"]
if(platformId in platforms.keys()):
platformName = platforms[platformId]
else:
platformName = "Unknown"
processorArch = device["ProcessorArchitecture"]
if(processorArch in processorArchs.keys()):
osArch = processorArchs[processorArch]["osArch"]
softwareArch = processorArchs[processorArch]["softwareArch"]
else:
osArch = "Unknown"
softwareArch = "Unknown"
inventory = {
"action":"inventory",
"content":{
"accesslog":{
"logdate": datetime.strptime(device["LastSeen"], "%Y-%m-%dT%H:%M:%S.%f").strftime("%Y-%m-%d %H:%M:%S")
},
"versionclient":"Airwatch Synchronizer",
"users":[
{
"login": device["UserName"]
}
],
"operatingsystem":{
"name": platformName,
"version": device["OperatingSystem"],
"full_name": f"{platformName} {device['OperatingSystem']}",
"arch": osArch
},
"softwares":[
],
"hardware":{
"name":data["1"],
"uuid":device["Uuid"],
"memory":device["TotalPhysicalMemory"]
}
},
"tag":device["LocationGroupName"],
"deviceid":f"{data['1']} - {device['SerialNumber']}",
"itemtype":"Computer"
}
# Récupération des applications présents sur les appareils
airwatchAPIAppsSearchURI = f"/api/mdm/devices/{device['Uuid']}/apps/search"
uri = f"{airwatchServer}{airwatchAPIAppsSearchURI}"
apps = requests.get(uri, headers=airwatchHeaders).json()
for app in apps["app_items"]:
if(app["installed_status"] != "Installed"):
continue
install_date = datetime.strptime(app["latest_uem_action_time"], "%Y-%m-%dT%H:%M:%S.%f").strftime("%Y-%m-%d")
if(install_date == "0001-01-01"):
inventory["content"]["softwares"] += [{
"name": app["name"],
"guid": app["bundle_id"],
"version": app["installed_version"],
"filesize": app["size"],
"arch": softwareArch
}]
else:
inventory["content"]["softwares"] += [{
"name": app["name"],
"guid": app["bundle_id"],
"version": app["installed_version"],
"install_date": install_date,
"filesize": app["size"],
"arch": softwareArch
}]
# Mise à jour du friendly name sur Airwatch
airwatchAPIURI = f"/API/mdm/devices/{device['Id']['Value']}"
uri = f"{airwatchServer}{airwatchAPIURI}"
updateDeviceDetails = {
"DeviceFriendlyName":f"{data['1']} {platformName} {device['OperatingSystem']} - {device['UserName']}"
}
requests.put(uri, headers=airwatchHeaders, json=updateDeviceDetails)
headers = {
"Content-Type":"Application/x-compress",
"user-agent":"Airwatch Synchronizer"
}
if(debug):
print(f"Updating {device_id} on GLPI")
# filtre des plateformes
if(platformFilterEnabled):
if device["PlatformId"]["Id"]["Value"] in platformFilterOut:
continue
result = requests.post(GLPIServer, headers=headers, json=inventory)
if(debug):
print(result.json())
if(debug):
print('Removing lock')
os.remove(lockFile)