From feb213f0c0614a9b2ea56e3bb0925b77fa6ddc65 Mon Sep 17 00:00:00 2001 From: Jason Secula Date: Wed, 11 Feb 2026 19:42:12 +0100 Subject: [PATCH] Initial Commit --- AirwatchAPI.py | 224 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 AirwatchAPI.py diff --git a/AirwatchAPI.py b/AirwatchAPI.py new file mode 100644 index 0000000..46b546d --- /dev/null +++ b/AirwatchAPI.py @@ -0,0 +1,224 @@ +import base64 +import requests +from cryptography.hazmat.primitives.serialization import pkcs12, pkcs7 +from cryptography.hazmat.primitives import hashes, serialization + +class AirwatchAPI: + def __init__(self, Server, APIKey, AuthMethod, APIUser=None, APIPassword=None, CertPath=None, CertPass=""): + self.Server = Server + self.APIKey = APIKey + self.AuthMethod = (AuthMethod).upper() + + if(self.AuthMethod == "PASSWORD"): + self.APIUser = APIUser + self.APIPassword = APIPassword + + if(self.AuthMethod == "CMSURL"): + self.CertificatePath = CertPath + self.CertificatePassword = CertPass + + def GetHeaders(self, uri): + if(self.AuthMethod == "PASSWORD"): + airwatchAPIUserToken = base64.b64encode(f"{self.APIUser}:{self.APIPassword}".encode('ascii')).decode("ascii") + + return { + "Authorization": f"Basic {airwatchAPIUserToken}", + "aw-tenant-code": self.APIKey, + "Accept": "application/json" + } + else: + signing_data = uri.split('?')[0] + with open(self.CertificatePath, 'rb') as certfile: + cert = certfile.read() + key, certificate, additional_certs = pkcs12.load_key_and_certificates(cert, self.CertificatePassword.encode()) + options = [pkcs7.PKCS7Options.DetachedSignature] + signed_data = pkcs7.PKCS7SignatureBuilder().set_data(signing_data.encode("UTF-8")).add_signer(certificate, key, hashes.SHA256()).sign(serialization.Encoding.DER, options) + signed_data_b64 = base64.b64encode(signed_data).decode() + + return { + "Authorization": f"CMSURL'1 {signed_data_b64}", + "aw-tenant-code": self.APIKey, + "Accept": "application/json" + } + + def GetUser(self, username): + cmdURI = f'/API/system/users/search?username={username}' + airwatchHeaders = self.GetHeaders(cmdURI) + uri = f"{self.Server}{cmdURI}" + user = requests.get(uri, headers=airwatchHeaders) + if(user.status_code == 200): + return AirwatchUser(user.json()["Users"][0]) + + return None + + def GetDevices(self, user: str = ""): + if(user == ""): + cmdURI = f"/API/mdm/devices/search?pagesize=500&page=" + else: + cmdURI = f"/API/mdm/devices/search?user={user}&pagesize=500&page=" + airwatchHeaders = self.GetHeaders(cmdURI) + pageNum = 0 + devices = [] + uri = f"{self.Server}{cmdURI}{pageNum}" + result = requests.get(uri, headers=airwatchHeaders) + if(result.status_code == 200): + deviceTotalCount = result.json()["Total"] + + while(len(devices) != deviceTotalCount): + uri = f"{self.Server}{cmdURI}{pageNum}" + result = requests.get(uri, headers=airwatchHeaders).json()["Devices"] + for device in result: + devices += [AirwatchDevice(device)] + pageNum += 1 + return devices + return None + + def GetDeviceApps(self, device): + cmdURI = f"/api/mdm/devices/{device.Uuid}/apps/search" + airwatchHeaders = self.GetHeaders(cmdURI) + uri = f"{self.Server}{cmdURI}" + apps = [] + result = requests.get(uri, headers=airwatchHeaders) + if(result.status_code == 200): + for app in result.json()["app_items"]: + apps += [AirwatchApplication(app)] + return apps + return None + + def ResetDEPProfiles(self, groupUuid): + cmdURI = f"/API/mdm/dep/groups/{groupUuid}/devices" + uri = f"{self.Server}{cmdURI}" + airwatchHeaders = self.GetHeaders(cmdURI) + result = requests.get(uri, headers=airwatchHeaders) + if(result.status_code == 200): + for device in result.json(): + if (device["enrollmentStatus"] != "Unenrolled"): + continue + assignDEPProfileURI = f"/API/mdm/dep/profiles/{device['profileUuid']}/devices/{device['deviceSerialNumber']}?action=Assign" + uri = f"{airwatchServer}{assignDEPProfileURI}" + airwatchHeaders = self.GetHeaders(assignDEPProfileURI) + requests.put(uri, headers=airwatchHeaders) + return True + return False + + def GetEnrollmentTokens(self, groupUuid, deviceType=2): + cmdURI = f"/API/mdm/groups/{groupUuid}/enrollment-tokens?device_type={deviceType}&page_size=500" + uri = f"{self.Server}{cmdURI}" + airwatchHeaders = self.GetHeaders(cmdURI) + result = requests.get(uri, headers=airwatchHeaders) + return result.json()["tokens"] + + def UpdateUserOnEnrollmentTokens(self, groupUuid,user, serialnumber): + body = { + "registration_type": "REGISTER_DEVICE", + "device_registration_record": { + "user_uuid": user.Uuid, + "friendly_name": serialnumber, + "ownership_type": "CORPORATE_DEDICATED", + "serial_number": serialnumber, + "to_email_address": user.Email, + "message_type": 0 + } + } + cmdURI = f"/API/mdm/groups/{groupUuid}/enrollment-tokens" + airwatchHeaders = self.GetHeaders(cmdURI) + airwatchHeaders["Accept"] = "application/json;version=2" + uri = f"{self.Server}{cmdURI}" + print(uri) + return requests.post(uri, headers=airwatchHeaders, json=body) + + def SyncDEPDevices(self, groupUuid): + cmdURI = f"/API/mdm/dep/groups/{groupUuid}/devices?action=sync" + uri = f"{self.Server}{cmdURI}" + airwatchHeaders = self.GetHeaders(cmdURI) + return requests.put(uri, headers=airwatchHeaders).status_code + + def AddDevicesToTag(self, tagID, devices): + cmdURI = f"/API/mdm/tags/{tagID}/adddevices" + airwatchHeaders = self.GetHeaders(cmdURI) + uri = f"{self.Server}{cmdURI}" + body = { + "BulkValues":{ + "Value": devices + } + } + return requests.post(uri, headers=airwatchHeaders, json=body).status_code + + def SetDeviceFriendlyName(self, device, friendlyName): + cmdURI = f"/API/mdm/devices/{device.Id}" + airwatchHeaders = self.GetHeaders(cmdURI) + uri = f"{self.Server}{cmdURI}" + body = { + "DeviceFriendlyName":friendlyName + } + return requests.put(uri, headers=airwatchHeaders, json=body).status_code + + def SetDeviceUser(self, device, airwatchUser): + cmdURI = f'/API/mdm/devices/{device.Id}/enrollmentuser/{airwatchUser.Id}' + uri = f"{self.Server}{cmdURI}" + airwatchHeaders = self.GetHeaders(cmdURI) + return requests.patch(uri, headers=airwatchHeaders).status_code + + def DeleteDevice(self, device): + cmdURI = f"/API/mdm/devices/{device.Id}" + airwatchHeaders = self.GetHeaders(cmdURI) + uri = f"{self.Server}{cmdURI}" + return requests.delete(uri, headers=airwatchHeaders).status_code + +class AirwatchUser: + + def __init__(self, user): + self.Id = user["Id"]["Value"] + self.Uuid = user["Uuid"] + self.UserName = user["UserName"] + self.FirstName = user["FirstName"] + self.LastName = user["LastName"] + self.Enabled = user["Status"] + self.Email = user["Email"] + self.DisplayName = user["DisplayName"] + self.Group = user["Group"] + self.GroupId = user["LocationGroupId"] + self.OrgUuid = user["OrganizationGroupUuid"] + if(user["EnrolledDevicesCount"] != ''): + self.DeviceCount = int(user["EnrolledDevicesCount"]) + else: + self.DeviceCount = 0 + +class AirwatchDevice: + + def __init__(self, device): + self.Id = device["Id"]["Value"] + self.Uuid = device["Uuid"] + self.SerialNumber = device["SerialNumber"] + self.Imei = device["Imei"] + self.MacAddress = device["MacAddress"] + self.FriendlyName = device["DeviceFriendlyName"] + self.ReportedName = device["DeviceReportedName"] + self.GroupId = device["LocationGroupId"]["Id"]["Value"] + self.Group = device["LocationGroupName"] + self.GroupUuid = device["LocationGroupId"]["Uuid"] + if(device["UserId"].get("Id") != None): + self.UserId = device["UserId"]["Id"]["Value"] + self.User = device["UserName"] + else: + self.UserId = None + self.User = None + self.UserEmail = device["UserEmailAddress"] + self.PlatformId = device["PlatformId"]["Id"]["Value"] + self.Platform = device["Platform"] + self.OS = device["OperatingSystem"] + self.Arch = device["ProcessorArchitecture"] + self.TotalMemory = device["TotalPhysicalMemory"] + self.EnrollmentStatus = device["EnrollmentStatus"] + self.LastEnrolledOn = device["LastEnrolledOn"] + self.LastSeen = device["LastSeen"] + +class AirwatchApplication: + + def __init__(self, application): + self.Name = application["name"] + self.Guid = application["bundle_id"] + self.Size = application["size"] + self.Version = application["installed_version"] + self.InstallDate = application["latest_uem_action_time"] + self.Status = application["installed_status"] \ No newline at end of file