From a4ee8a1ae8d8f3ee46fdda7d40f0c733e37e7536 Mon Sep 17 00:00:00 2001 From: Jason SECULA Date: Thu, 26 Jun 2025 11:22:47 +0200 Subject: [PATCH] =?UTF-8?q?Ajout=20d'un=20module=20pour=20g=C3=A9rer=20les?= =?UTF-8?q?=20requ=C3=AAtes=20=C3=A0=20l'API=20Airwatch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/airwatchAPI.py | 179 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 scripts/airwatchAPI.py diff --git a/scripts/airwatchAPI.py b/scripts/airwatchAPI.py new file mode 100644 index 0000000..03e7a8d --- /dev/null +++ b/scripts/airwatchAPI.py @@ -0,0 +1,179 @@ +import base64 +import requests +from cryptography.hazmat.primitives.serialization import pkcs12, pkcs7 +from cryptography.hazmat.primitives import hashes, serialization + +class AirwatchAPI: + def __init__(self, settings): + self.Server = settings["AIRWATCH"]["Server"] + self.APIKey = settings["AIRWATCH"]["APIKey"] + self.AuthMethod = (settings["AIRWATCH"]["AuthenticationMethod"]).upper() + + if(self.AuthMethod == "PASSWORD"): + self.APIUser = settings["AIRWATCH"]["APIUser"] + self.APIPassword = settings["AIRWATCH"]["APIPassword"] + + if(self.AuthMethod == "CMSURL"): + self.CertificatePath = settings["AIRWATCH"]["CertificatePath"] + self.CertificatePassword = settings["AIRWATCH"]["CertificatePassword"] + + def GetHeaders(self, uri): + if(self.AuthMethod == "PASSWORD"): + airwatchAPIUserToken = base64.b64encode(f"{self.APIUser}:{self.APIPassword}".encode('ascii')).decode("ascii") + + return { + "Authorization": f"Basic {airwatchAPIUserToken}", + "aw-tenant-code": self.APIKey, + "Accept": "application/json" + } + else: + signing_data = uri.split('?')[0] + with open(self.CertificatePath, 'rb') as certfile: + cert = certfile.read() + key, certificate, additional_certs = pkcs12.load_key_and_certificates(cert, self.CertificatePassword.encode()) + options = [pkcs7.PKCS7Options.DetachedSignature] + signed_data = pkcs7.PKCS7SignatureBuilder().set_data(signing_data.encode("UTF-8")).add_signer(certificate, key, hashes.SHA256()).sign(serialization.Encoding.DER, options) + signed_data_b64 = base64.b64encode(signed_data).decode() + + return { + "Authorization": f"CMSURL'1 {signed_data_b64}", + "aw-tenant-code": self.APIKey, + "Accept": "application/json" + } + + def GetUser(self, username): + cmdURI = f'/API/system/users/search?username={username}' + airwatchHeaders = self.GetHeaders(cmdURI) + uri = f"{self.Server}{cmdURI}" + user = requests.get(uri, headers=airwatchHeaders) + if(user.status_code == 200): + return AirwatchUser(user.json()["Users"][0]) + + return None + + def GetDevices(self, user: str = ""): + if(user == ""): + cmdURI = f"/API/mdm/devices/search?pagesize=500&page=" + else: + cmdURI = f"/API/mdm/devices/search?user={user}&pagesize=500&page=" + airwatchHeaders = self.GetHeaders(cmdURI) + pageNum = 0 + devices = [] + uri = f"{self.Server}{cmdURI}{pageNum}" + result = requests.get(uri, headers=airwatchHeaders) + if(result.status_code == 200): + deviceTotalCount = result.json()["Total"] + + while(len(devices) != deviceTotalCount): + uri = f"{self.Server}{cmdURI}{pageNum}" + result = requests.get(uri, headers=airwatchHeaders).json()["Devices"] + for device in result: + devices += [AirwatchDevice(device)] + pageNum += 1 + return devices + return None + + def GetDeviceApps(self, device): + cmdURI = f"/api/mdm/devices/{device.Uuid}/apps/search" + airwatchHeaders = self.GetHeaders(cmdURI) + uri = f"{self.Server}{cmdURI}" + apps = [] + result = requests.get(uri, headers=airwatchHeaders) + if(result.status_code == 200): + for app in result.json()["app_items"]: + apps += [AirwatchApplication(app)] + return apps + return None + + def ResetDEPProfiles(self, groupUuid): + cmdURI = f"/API/mdm/dep/groups/156147ba-3086-4ddc-8326-a6b9b8bf335a/devices" + uri = f"{self.Server}{cmdURI}" + airwatchHeaders = self.GetHeaders(cmdURI) + result = requests.get(uri, headers=airwatchHeaders) + if(result.status_code == 200): + for device in result.json(): + if (device["enrollmentStatus"] != "Unenrolled"): + continue + assignDEPProfileURI = f"/API/mdm/dep/profiles/{device['profileUuid']}/devices/{device['deviceSerialNumber']}?action=Assign" + uri = f"{airwatchServer}{assignDEPProfileURI}" + airwatchHeaders = self.GetHeaders(assignDEPProfileURI) + requests.put(uri, headers=airwatchHeaders) + return True + return False + + def SyncDEPDevices(self, groupUuid): + cmdURI = f"/API/mdm/dep/groups/{groupUuid}/devices?action=sync" + uri = f"{self.Server}{cmdURI}" + airwatchHeaders = self.GetHeaders(cmdURI) + return requests.put(uri, headers=airwatchHeaders).status_code + + def SetDeviceFriendlyName(self, device, friendlyName): + cmdURI = f"/API/mdm/devices/{device.Id}" + airwatchHeaders = self.GetHeaders(cmdURI) + uri = f"{self.Server}{cmdURI}" + body = { + "DeviceFriendlyName":friendlyName + } + return requests.put(uri, headers=airwatchHeaders, json=body).status_code + + def SetDeviceUser(self, device, airwatchUser): + cmdURI = f'/API/mdm/devices/{device.Id}/enrollmentuser/{airwatchUser.Id}' + uri = f"{self.Server}{cmdURI}" + airwatchHeaders = self.GetHeaders(cmdURI) + return requests.patch(uri, headers=airwatchHeaders).status_code + + def DeleteDevice(self, device): + cmdURI = f"/API/mdm/devices/{device.Id}" + airwatchHeaders = self.GetHeaders(cmdURI) + uri = f"{self.Server}{cmdURI}" + return requests.delete(uri, headers=airwatchHeaders).status_code + +class AirwatchUser: + + def __init__(self, user): + self.Id = user["Id"]["Value"] + self.Uuid = user["Uuid"] + self.UserName = user["UserName"] + self.FirstName = user["FirstName"] + self.LastName = user["LastName"] + self.Enabled = user["Status"] + self.Email = user["Email"] + self.DisplayName = user["DisplayName"] + self.Group = user["Group"] + self.GroupId = user["LocationGroupId"] + self.OrgUuid = user["OrganizationGroupUuid"] + self.DeviceCount = int(user["EnrolledDevicesCount"]) + +class AirwatchDevice: + + def __init__(self, device): + self.Id = device["Id"]["Value"] + self.Uuid = device["Uuid"] + self.SerialNumber = device["SerialNumber"] + self.Imei = device["Imei"] + self.MacAddress = device["MacAddress"] + self.FriendlyName = device["DeviceFriendlyName"] + self.GroupId = device["LocationGroupId"]["Id"]["Value"] + self.Group = device["LocationGroupName"] + self.GroupUuid = device["LocationGroupId"]["Uuid"] + self.UserId = device["UserId"]["Id"]["Value"] + self.User = device["UserName"] + self.UserEmail = device["UserEmailAddress"] + self.PlatformId = device["PlatformId"]["Id"]["Value"] + self.Platform = device["Platform"] + self.OS = device["OperatingSystem"] + self.Arch = device["ProcessorArchitecture"] + self.TotalMemory = device["TotalPhysicalMemory"] + self.EnrollmentStatus = device["EnrollmentStatus"] + self.LastEnrolledOn = device["LastEnrolledOn"] + self.LastSeen = device["LastSeen"] + +class AirwatchApplication: + + def __init__(self, application): + self.Name = application["name"] + self.Guid = application["bundle_id"] + self.Size = application["size"] + self.Version = application["installed_version"] + self.InstallDate = application["latest_uem_action_time"] + self.Status = application["installed_status"] \ No newline at end of file