Updated both scripts to use AirwatchAPI and GLPIAPI modules

This commit is contained in:
2025-07-05 13:09:28 +02:00
parent a4ee8a1ae8
commit 9f04afb153
8 changed files with 526 additions and 543 deletions

210
scripts/includes/GLPIAPI.py Normal file
View File

@ -0,0 +1,210 @@
import requests
import json
from datetime import datetime
class GLPIAPI:
def __init__(self, settings):
self.Server = settings["GLPI"]["Server"]
self.AppToken = settings["GLPI"]["AppToken"]
self.UserToken = settings["GLPI"]["UserToken"]
self.UserAgent = settings["GLPI"]["UserAgent"]
self.SessionToken = None
self.StatusCode = None
self.Headers = None
self.InitConnection()
def InitConnection(self):
initURI = '/apirest.php/initSession/'
GLPIHeaders = {
'Content-Type': 'application/json',
"Authorization": f"user_token {self.UserToken}",
"App-Token": self.AppToken
}
# Récupération d'un token de session
uri = f"{self.Server}{initURI}"
result = requests.get(uri, headers=GLPIHeaders)
self.StatusCode = result.status_code
if(result.status_code == 200):
self.SessionToken = result.json()["session_token"]
self.Headers = {
'Content-Type': 'application/json',
"Session-Token": self.SessionToken,
"App-Token": self.AppToken
}
def GetDevice(self, device):
if(device.Imei != ''):
# Recherche des appareils en fonction du numéro de série ou de l'imei
# l'imei pouvant être dans le champ numéro de série ou les champs imei custom
search_parameter = f'is_deleted=0&criteria[0][field]=5&withindexes=true&criteria[0][searchtype]=contains&criteria[0][value]=^{device.SerialNumber}$'\
f'&criteria[1][link]=OR&criteria[1][field]=5&criteria[1][searchtype]=contains&criteria[1][value]=^{device.Imei}$'\
f'&criteria[2][link]=OR&criteria[2][field]=76667&criteria[2][searchtype]=contains&criteria[2][value]=^{device.Imei}$'\
f'&criteria[3][link]=OR&criteria[3][field]=76670&criteria[3][searchtype]=contains&criteria[3][value]=^{device.Imei}$'
else:
# Recherche des appareils en fonction du numéro de série seulement
search_parameter = f'is_deleted=0&criteria[0][field]=5&withindexes=true&criteria[0][searchtype]=contains&criteria[0][value]=^{device.SerialNumber}$'
searchUri = f"{self.Server}/apirest.php/search/computer?{search_parameter}"
search = requests.get(searchUri, headers=self.Headers)
if(search.status_code == 200):
search = search.json()
if(search["totalcount"] == 1):
deviceID = list(search["data"].keys())[0]
data = search["data"][deviceID]
return deviceID, data, search["totalcount"]
elif(search["totalcount"] > 1):
return deviceID, search["data"], search["totalcount"]
else:
return None, None, 0
return None, None, None
def UpdateInventory(self, inventory):
headers = {
"Content-Type":"Application/x-compress",
"user-agent":self.UserAgent
}
return requests.post(self.Server, headers=headers, json=inventory)
def UpdateSerialNumber(self, deviceid, serialnumber):
body = {
"input" : {
"id" : deviceid,
"serial" : serialnumber
}
}
uri = f"{self.Server}/apirest.php/Computer/"
return requests.put(uri, headers=self.Headers, json=body)
def CreateInventoryForAirwatchDevice(self, device, deviceName, apps=None):
platforms = {
2:"Apple iOS",
5:"Android",
12:"Windows Desktop"
}
if(device.PlatformId in platforms.keys()):
platformName = platforms[device.PlatformId]
else:
platformName = "Unknown"
processorArchs = {
0:{
"osArch":"arm64",
"softwareArch":"arm64"
},
9:{
"osArch":"64-bit",
"softwareArch":"x86_64"
}
}
if(device.Arch in processorArchs.keys()):
osArch = processorArchs[device.Arch]["osArch"]
softwareArch = processorArchs[device.Arch]["softwareArch"]
else:
osArch = "Unknown"
softwareArch = "Unknown"
logDate = datetime.strptime(device.LastSeen, "%Y-%m-%dT%H:%M:%S.%f").strftime("%Y-%m-%d %H:%M:%S")
inventory = GLPIInventory(logdate=logDate, versionclient=self.UserAgent, tag=device.Group, deviceid=f"{deviceName} - {device.SerialNumber}", itemtype="Computer")
inventory.SetOperatingSystem(platformName, device.OS, osArch)
inventory.SetHardware(deviceName, device.Uuid, device.TotalMemory)
inventory.AddUser(device.User)
if(apps != None):
for app in apps:
if(app.Status != "Installed"):
continue
install_date = datetime.strptime(app.InstallDate, "%Y-%m-%dT%H:%M:%S.%f").strftime("%Y-%m-%d")
if(install_date == "1-01-01"):
inventory.AddSoftware(app.Name, app.Version, app.Size, softwareArch, app.Guid)
else:
inventory.AddSoftware(app.Name, app.Version, app.Size, softwareArch, app.Guid, install_date)
return inventory
class GLPIInventory:
def __init__(self, logdate=None, versionclient=None, tag=None, deviceid=None, itemtype=None):
self.logdate = logdate
self.versionclient = versionclient
self.users = []
self.operatingsystem = {}
self.softwares = []
self.hardware = {}
self.tag = tag
self.deviceId = deviceid
self.itemType = itemtype
def AddUser(self, user):
self.users += [{
"login": user
}]
def DelUser(self, user):
for i in range(0, len(self.users)):
if(self.users[i]["login"] == user):
del self.users[i]
def AddSoftware(self, name, version, filesize, arch, guid, install_date=None):
if(install_date == None):
self.softwares += [{
"name": name,
"guid": guid,
"version": version,
"filesize": filesize,
"arch": arch
}]
else:
self.softwares += [{
"name": name,
"guid": guid,
"version": version,
"install_date": install_date,
"filesize": filesize,
"arch": arch
}]
def DelSoftware(self, guid):
for i in range(0, len(self.softwares)):
if(self.softwares[i]["guid"] == guid):
del self.softwares[i]
def SetHardware(self, name, uuid, memory):
self.hardware = {
"name": name,
"uuid": uuid,
"memory": memory
}
def SetOperatingSystem(self, name, version, arch):
self.operatingsystem = {
"name": name,
"version": version,
"full_name": f"{name} {version}",
"arch": arch
}
def Json(self):
inventory = {
"action": "inventory",
"content":{
"accesslog":{
"logdate": self.logdate
},
"versionclient": self.versionclient,
"users": self.users,
"operatingsystem": self.operatingsystem,
"softwares": self.softwares,
"hardware": self.hardware
},
"tag": self.tag,
"deviceid": self.deviceId,
"itemtype": self.itemType
}
return json.dumps(inventory)

View File

View File

@ -0,0 +1,205 @@
import base64
import requests
from cryptography.hazmat.primitives.serialization import pkcs12, pkcs7
from cryptography.hazmat.primitives import hashes, serialization
class AirwatchAPI:
def __init__(self, settings):
self.Server = settings["AIRWATCH"]["Server"]
self.APIKey = settings["AIRWATCH"]["APIKey"]
self.AuthMethod = (settings["AIRWATCH"]["AuthenticationMethod"]).upper()
if(self.AuthMethod == "PASSWORD"):
self.APIUser = settings["AIRWATCH"]["APIUser"]
self.APIPassword = settings["AIRWATCH"]["APIPassword"]
if(self.AuthMethod == "CMSURL"):
self.CertificatePath = settings["AIRWATCH"]["CertificatePath"]
self.CertificatePassword = settings["AIRWATCH"]["CertificatePassword"]
def GetHeaders(self, uri):
if(self.AuthMethod == "PASSWORD"):
airwatchAPIUserToken = base64.b64encode(f"{self.APIUser}:{self.APIPassword}".encode('ascii')).decode("ascii")
return {
"Authorization": f"Basic {airwatchAPIUserToken}",
"aw-tenant-code": self.APIKey,
"Accept": "application/json"
}
else:
signing_data = uri.split('?')[0]
with open(self.CertificatePath, 'rb') as certfile:
cert = certfile.read()
key, certificate, additional_certs = pkcs12.load_key_and_certificates(cert, self.CertificatePassword.encode())
options = [pkcs7.PKCS7Options.DetachedSignature]
signed_data = pkcs7.PKCS7SignatureBuilder().set_data(signing_data.encode("UTF-8")).add_signer(certificate, key, hashes.SHA256()).sign(serialization.Encoding.DER, options)
signed_data_b64 = base64.b64encode(signed_data).decode()
return {
"Authorization": f"CMSURL'1 {signed_data_b64}",
"aw-tenant-code": self.APIKey,
"Accept": "application/json"
}
def GetUser(self, username):
cmdURI = f'/API/system/users/search?username={username}'
airwatchHeaders = self.GetHeaders(cmdURI)
uri = f"{self.Server}{cmdURI}"
user = requests.get(uri, headers=airwatchHeaders)
if(user.status_code == 200):
return AirwatchUser(user.json()["Users"][0])
return None
def GetDevices(self, user: str = ""):
if(user == ""):
cmdURI = f"/API/mdm/devices/search?pagesize=500&page="
else:
cmdURI = f"/API/mdm/devices/search?user={user}&pagesize=500&page="
airwatchHeaders = self.GetHeaders(cmdURI)
pageNum = 0
devices = []
uri = f"{self.Server}{cmdURI}{pageNum}"
result = requests.get(uri, headers=airwatchHeaders)
if(result.status_code == 200):
deviceTotalCount = result.json()["Total"]
while(len(devices) != deviceTotalCount):
uri = f"{self.Server}{cmdURI}{pageNum}"
result = requests.get(uri, headers=airwatchHeaders).json()["Devices"]
for device in result:
devices += [AirwatchDevice(device)]
pageNum += 1
return devices
return None
def GetDeviceApps(self, device):
cmdURI = f"/api/mdm/devices/{device.Uuid}/apps/search"
airwatchHeaders = self.GetHeaders(cmdURI)
uri = f"{self.Server}{cmdURI}"
apps = []
result = requests.get(uri, headers=airwatchHeaders)
if(result.status_code == 200):
for app in result.json()["app_items"]:
apps += [AirwatchApplication(app)]
return apps
return None
def ResetDEPProfiles(self, groupUuid):
cmdURI = f"/API/mdm/dep/groups/{groupUuid}/devices"
uri = f"{self.Server}{cmdURI}"
airwatchHeaders = self.GetHeaders(cmdURI)
result = requests.get(uri, headers=airwatchHeaders)
if(result.status_code == 200):
for device in result.json():
if (device["enrollmentStatus"] != "Unenrolled"):
continue
assignDEPProfileURI = f"/API/mdm/dep/profiles/{device['profileUuid']}/devices/{device['deviceSerialNumber']}?action=Assign"
uri = f"{airwatchServer}{assignDEPProfileURI}"
airwatchHeaders = self.GetHeaders(assignDEPProfileURI)
requests.put(uri, headers=airwatchHeaders)
return True
return False
def GetEnrollmentTokens(self, groupUuid, deviceType=2):
cmdURI = f"/API/mdm/groups/{groupUuid}/enrollment-tokens?device_type={deviceType}&page_size=500"
uri = f"{self.Server}{cmdURI}"
airwatchHeaders = self.GetHeaders(cmdURI)
result = requests.get(uri, headers=airwatchHeaders)
return result.json()["tokens"]
def UpdateUserOnEnrollmentTokens(self, groupUuid,user, serialnumber):
body = {
"registration_type": "REGISTER_DEVICE",
"device_registration_record": {
"user_uuid": user.Uuid,
"friendly_name": serialnumber,
"ownership_type": "CORPORATE_DEDICATED",
"serial_number": serialnumber,
"to_email_address": user.Email,
"message_type": 0
}
}
cmdURI = f"/API/mdm/groups/{groupUuid}/enrollment-tokens"
airwatchHeaders = self.GetHeaders(cmdURI)
airwatchHeaders["Accept"] = "application/json;version=2"
uri = f"{self.Server}{cmdURI}"
print(uri)
return requests.post(uri, headers=airwatchHeaders, json=body)
def SyncDEPDevices(self, groupUuid):
cmdURI = f"/API/mdm/dep/groups/{groupUuid}/devices?action=sync"
uri = f"{self.Server}{cmdURI}"
airwatchHeaders = self.GetHeaders(cmdURI)
return requests.put(uri, headers=airwatchHeaders).status_code
def SetDeviceFriendlyName(self, device, friendlyName):
cmdURI = f"/API/mdm/devices/{device.Id}"
airwatchHeaders = self.GetHeaders(cmdURI)
uri = f"{self.Server}{cmdURI}"
body = {
"DeviceFriendlyName":friendlyName
}
return requests.put(uri, headers=airwatchHeaders, json=body).status_code
def SetDeviceUser(self, device, airwatchUser):
cmdURI = f'/API/mdm/devices/{device.Id}/enrollmentuser/{airwatchUser.Id}'
uri = f"{self.Server}{cmdURI}"
airwatchHeaders = self.GetHeaders(cmdURI)
return requests.patch(uri, headers=airwatchHeaders).status_code
def DeleteDevice(self, device):
cmdURI = f"/API/mdm/devices/{device.Id}"
airwatchHeaders = self.GetHeaders(cmdURI)
uri = f"{self.Server}{cmdURI}"
return requests.delete(uri, headers=airwatchHeaders).status_code
class AirwatchUser:
def __init__(self, user):
self.Id = user["Id"]["Value"]
self.Uuid = user["Uuid"]
self.UserName = user["UserName"]
self.FirstName = user["FirstName"]
self.LastName = user["LastName"]
self.Enabled = user["Status"]
self.Email = user["Email"]
self.DisplayName = user["DisplayName"]
self.Group = user["Group"]
self.GroupId = user["LocationGroupId"]
self.OrgUuid = user["OrganizationGroupUuid"]
self.DeviceCount = int(user["EnrolledDevicesCount"])
class AirwatchDevice:
def __init__(self, device):
self.Id = device["Id"]["Value"]
self.Uuid = device["Uuid"]
self.SerialNumber = device["SerialNumber"]
self.Imei = device["Imei"]
self.MacAddress = device["MacAddress"]
self.FriendlyName = device["DeviceFriendlyName"]
self.GroupId = device["LocationGroupId"]["Id"]["Value"]
self.Group = device["LocationGroupName"]
self.GroupUuid = device["LocationGroupId"]["Uuid"]
self.UserId = device["UserId"]["Id"]["Value"]
self.User = device["UserName"]
self.UserEmail = device["UserEmailAddress"]
self.PlatformId = device["PlatformId"]["Id"]["Value"]
self.Platform = device["Platform"]
self.OS = device["OperatingSystem"]
self.Arch = device["ProcessorArchitecture"]
self.TotalMemory = device["TotalPhysicalMemory"]
self.EnrollmentStatus = device["EnrollmentStatus"]
self.LastEnrolledOn = device["LastEnrolledOn"]
self.LastSeen = device["LastSeen"]
class AirwatchApplication:
def __init__(self, application):
self.Name = application["name"]
self.Guid = application["bundle_id"]
self.Size = application["size"]
self.Version = application["installed_version"]
self.InstallDate = application["latest_uem_action_time"]
self.Status = application["installed_status"]