import base64 import requests from cryptography.hazmat.primitives.serialization import pkcs12, pkcs7 from cryptography.hazmat.primitives import hashes, serialization class AirwatchAPI: def __init__(self, settings): self.Server = settings["AIRWATCH"]["Server"] self.APIKey = settings["AIRWATCH"]["APIKey"] self.AuthMethod = (settings["AIRWATCH"]["AuthenticationMethod"]).upper() if(self.AuthMethod == "PASSWORD"): self.APIUser = settings["AIRWATCH"]["APIUser"] self.APIPassword = settings["AIRWATCH"]["APIPassword"] if(self.AuthMethod == "CMSURL"): self.CertificatePath = settings["AIRWATCH"]["CertificatePath"] self.CertificatePassword = settings["AIRWATCH"]["CertificatePassword"] def GetHeaders(self, uri): if(self.AuthMethod == "PASSWORD"): airwatchAPIUserToken = base64.b64encode(f"{self.APIUser}:{self.APIPassword}".encode('ascii')).decode("ascii") return { "Authorization": f"Basic {airwatchAPIUserToken}", "aw-tenant-code": self.APIKey, "Accept": "application/json" } else: signing_data = uri.split('?')[0] with open(self.CertificatePath, 'rb') as certfile: cert = certfile.read() key, certificate, additional_certs = pkcs12.load_key_and_certificates(cert, self.CertificatePassword.encode()) options = [pkcs7.PKCS7Options.DetachedSignature] signed_data = pkcs7.PKCS7SignatureBuilder().set_data(signing_data.encode("UTF-8")).add_signer(certificate, key, hashes.SHA256()).sign(serialization.Encoding.DER, options) signed_data_b64 = base64.b64encode(signed_data).decode() return { "Authorization": f"CMSURL'1 {signed_data_b64}", "aw-tenant-code": self.APIKey, "Accept": "application/json" } def GetUser(self, username): cmdURI = f'/API/system/users/search?username={username}' airwatchHeaders = self.GetHeaders(cmdURI) uri = f"{self.Server}{cmdURI}" user = requests.get(uri, headers=airwatchHeaders) if(user.status_code == 200): return AirwatchUser(user.json()["Users"][0]) return None def GetDevices(self, user: str = ""): if(user == ""): cmdURI = f"/API/mdm/devices/search?pagesize=500&page=" else: cmdURI = f"/API/mdm/devices/search?user={user}&pagesize=500&page=" airwatchHeaders = self.GetHeaders(cmdURI) pageNum = 0 devices = [] uri = f"{self.Server}{cmdURI}{pageNum}" result = requests.get(uri, headers=airwatchHeaders) if(result.status_code == 200): deviceTotalCount = result.json()["Total"] while(len(devices) != deviceTotalCount): uri = f"{self.Server}{cmdURI}{pageNum}" result = requests.get(uri, headers=airwatchHeaders).json()["Devices"] for device in result: devices += [AirwatchDevice(device)] pageNum += 1 return devices return None def GetDeviceApps(self, device): cmdURI = f"/api/mdm/devices/{device.Uuid}/apps/search" airwatchHeaders = self.GetHeaders(cmdURI) uri = f"{self.Server}{cmdURI}" apps = [] result = requests.get(uri, headers=airwatchHeaders) if(result.status_code == 200): for app in result.json()["app_items"]: apps += [AirwatchApplication(app)] return apps return None def ResetDEPProfiles(self, groupUuid): cmdURI = f"/API/mdm/dep/groups/{groupUuid}/devices" uri = f"{self.Server}{cmdURI}" airwatchHeaders = self.GetHeaders(cmdURI) result = requests.get(uri, headers=airwatchHeaders) if(result.status_code == 200): for device in result.json(): if (device["enrollmentStatus"] != "Unenrolled"): continue assignDEPProfileURI = f"/API/mdm/dep/profiles/{device['profileUuid']}/devices/{device['deviceSerialNumber']}?action=Assign" uri = f"{airwatchServer}{assignDEPProfileURI}" airwatchHeaders = self.GetHeaders(assignDEPProfileURI) requests.put(uri, headers=airwatchHeaders) return True return False def GetEnrollmentTokens(self, groupUuid, deviceType=2): cmdURI = f"/API/mdm/groups/{groupUuid}/enrollment-tokens?device_type={deviceType}&page_size=500" uri = f"{self.Server}{cmdURI}" airwatchHeaders = self.GetHeaders(cmdURI) result = requests.get(uri, headers=airwatchHeaders) return result.json()["tokens"] def UpdateUserOnEnrollmentTokens(self, groupUuid,user, serialnumber): body = { "registration_type": "REGISTER_DEVICE", "device_registration_record": { "user_uuid": user.Uuid, "friendly_name": serialnumber, "ownership_type": "CORPORATE_DEDICATED", "serial_number": serialnumber, "to_email_address": user.Email, "message_type": 0 } } cmdURI = f"/API/mdm/groups/{groupUuid}/enrollment-tokens" airwatchHeaders = self.GetHeaders(cmdURI) airwatchHeaders["Accept"] = "application/json;version=2" uri = f"{self.Server}{cmdURI}" print(uri) return requests.post(uri, headers=airwatchHeaders, json=body) def SyncDEPDevices(self, groupUuid): cmdURI = f"/API/mdm/dep/groups/{groupUuid}/devices?action=sync" uri = f"{self.Server}{cmdURI}" airwatchHeaders = self.GetHeaders(cmdURI) return requests.put(uri, headers=airwatchHeaders).status_code def SetDeviceFriendlyName(self, device, friendlyName): cmdURI = f"/API/mdm/devices/{device.Id}" airwatchHeaders = self.GetHeaders(cmdURI) uri = f"{self.Server}{cmdURI}" body = { "DeviceFriendlyName":friendlyName } return requests.put(uri, headers=airwatchHeaders, json=body).status_code def SetDeviceUser(self, device, airwatchUser): cmdURI = f'/API/mdm/devices/{device.Id}/enrollmentuser/{airwatchUser.Id}' uri = f"{self.Server}{cmdURI}" airwatchHeaders = self.GetHeaders(cmdURI) return requests.patch(uri, headers=airwatchHeaders).status_code def DeleteDevice(self, device): cmdURI = f"/API/mdm/devices/{device.Id}" airwatchHeaders = self.GetHeaders(cmdURI) uri = f"{self.Server}{cmdURI}" return requests.delete(uri, headers=airwatchHeaders).status_code class AirwatchUser: def __init__(self, user): self.Id = user["Id"]["Value"] self.Uuid = user["Uuid"] self.UserName = user["UserName"] self.FirstName = user["FirstName"] self.LastName = user["LastName"] self.Enabled = user["Status"] self.Email = user["Email"] self.DisplayName = user["DisplayName"] self.Group = user["Group"] self.GroupId = user["LocationGroupId"] self.OrgUuid = user["OrganizationGroupUuid"] self.DeviceCount = int(user["EnrolledDevicesCount"]) class AirwatchDevice: def __init__(self, device): self.Id = device["Id"]["Value"] self.Uuid = device["Uuid"] self.SerialNumber = device["SerialNumber"] self.Imei = device["Imei"] self.MacAddress = device["MacAddress"] self.FriendlyName = device["DeviceFriendlyName"] self.GroupId = device["LocationGroupId"]["Id"]["Value"] self.Group = device["LocationGroupName"] self.GroupUuid = device["LocationGroupId"]["Uuid"] if(device["UserId"].get("Id") != None): self.UserId = device["UserId"]["Id"]["Value"] self.User = device["UserName"] else: self.UserId = None self.User = None self.UserEmail = device["UserEmailAddress"] self.PlatformId = device["PlatformId"]["Id"]["Value"] self.Platform = device["Platform"] self.OS = device["OperatingSystem"] self.Arch = device["ProcessorArchitecture"] self.TotalMemory = device["TotalPhysicalMemory"] self.EnrollmentStatus = device["EnrollmentStatus"] self.LastEnrolledOn = device["LastEnrolledOn"] self.LastSeen = device["LastSeen"] class AirwatchApplication: def __init__(self, application): self.Name = application["name"] self.Guid = application["bundle_id"] self.Size = application["size"] self.Version = application["installed_version"] self.InstallDate = application["latest_uem_action_time"] self.Status = application["installed_status"]