205 lines
8.7 KiB
Python
205 lines
8.7 KiB
Python
import base64
|
|
import requests
|
|
from cryptography.hazmat.primitives.serialization import pkcs12, pkcs7
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
|
|
class AirwatchAPI:
|
|
def __init__(self, settings):
|
|
self.Server = settings["AIRWATCH"]["Server"]
|
|
self.APIKey = settings["AIRWATCH"]["APIKey"]
|
|
self.AuthMethod = (settings["AIRWATCH"]["AuthenticationMethod"]).upper()
|
|
|
|
if(self.AuthMethod == "PASSWORD"):
|
|
self.APIUser = settings["AIRWATCH"]["APIUser"]
|
|
self.APIPassword = settings["AIRWATCH"]["APIPassword"]
|
|
|
|
if(self.AuthMethod == "CMSURL"):
|
|
self.CertificatePath = settings["AIRWATCH"]["CertificatePath"]
|
|
self.CertificatePassword = settings["AIRWATCH"]["CertificatePassword"]
|
|
|
|
def GetHeaders(self, uri):
|
|
if(self.AuthMethod == "PASSWORD"):
|
|
airwatchAPIUserToken = base64.b64encode(f"{self.APIUser}:{self.APIPassword}".encode('ascii')).decode("ascii")
|
|
|
|
return {
|
|
"Authorization": f"Basic {airwatchAPIUserToken}",
|
|
"aw-tenant-code": self.APIKey,
|
|
"Accept": "application/json"
|
|
}
|
|
else:
|
|
signing_data = uri.split('?')[0]
|
|
with open(self.CertificatePath, 'rb') as certfile:
|
|
cert = certfile.read()
|
|
key, certificate, additional_certs = pkcs12.load_key_and_certificates(cert, self.CertificatePassword.encode())
|
|
options = [pkcs7.PKCS7Options.DetachedSignature]
|
|
signed_data = pkcs7.PKCS7SignatureBuilder().set_data(signing_data.encode("UTF-8")).add_signer(certificate, key, hashes.SHA256()).sign(serialization.Encoding.DER, options)
|
|
signed_data_b64 = base64.b64encode(signed_data).decode()
|
|
|
|
return {
|
|
"Authorization": f"CMSURL'1 {signed_data_b64}",
|
|
"aw-tenant-code": self.APIKey,
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
def GetUser(self, username):
|
|
cmdURI = f'/API/system/users/search?username={username}'
|
|
airwatchHeaders = self.GetHeaders(cmdURI)
|
|
uri = f"{self.Server}{cmdURI}"
|
|
user = requests.get(uri, headers=airwatchHeaders)
|
|
if(user.status_code == 200):
|
|
return AirwatchUser(user.json()["Users"][0])
|
|
|
|
return None
|
|
|
|
def GetDevices(self, user: str = ""):
|
|
if(user == ""):
|
|
cmdURI = f"/API/mdm/devices/search?pagesize=500&page="
|
|
else:
|
|
cmdURI = f"/API/mdm/devices/search?user={user}&pagesize=500&page="
|
|
airwatchHeaders = self.GetHeaders(cmdURI)
|
|
pageNum = 0
|
|
devices = []
|
|
uri = f"{self.Server}{cmdURI}{pageNum}"
|
|
result = requests.get(uri, headers=airwatchHeaders)
|
|
if(result.status_code == 200):
|
|
deviceTotalCount = result.json()["Total"]
|
|
|
|
while(len(devices) != deviceTotalCount):
|
|
uri = f"{self.Server}{cmdURI}{pageNum}"
|
|
result = requests.get(uri, headers=airwatchHeaders).json()["Devices"]
|
|
for device in result:
|
|
devices += [AirwatchDevice(device)]
|
|
pageNum += 1
|
|
return devices
|
|
return None
|
|
|
|
def GetDeviceApps(self, device):
|
|
cmdURI = f"/api/mdm/devices/{device.Uuid}/apps/search"
|
|
airwatchHeaders = self.GetHeaders(cmdURI)
|
|
uri = f"{self.Server}{cmdURI}"
|
|
apps = []
|
|
result = requests.get(uri, headers=airwatchHeaders)
|
|
if(result.status_code == 200):
|
|
for app in result.json()["app_items"]:
|
|
apps += [AirwatchApplication(app)]
|
|
return apps
|
|
return None
|
|
|
|
def ResetDEPProfiles(self, groupUuid):
|
|
cmdURI = f"/API/mdm/dep/groups/{groupUuid}/devices"
|
|
uri = f"{self.Server}{cmdURI}"
|
|
airwatchHeaders = self.GetHeaders(cmdURI)
|
|
result = requests.get(uri, headers=airwatchHeaders)
|
|
if(result.status_code == 200):
|
|
for device in result.json():
|
|
if (device["enrollmentStatus"] != "Unenrolled"):
|
|
continue
|
|
assignDEPProfileURI = f"/API/mdm/dep/profiles/{device['profileUuid']}/devices/{device['deviceSerialNumber']}?action=Assign"
|
|
uri = f"{airwatchServer}{assignDEPProfileURI}"
|
|
airwatchHeaders = self.GetHeaders(assignDEPProfileURI)
|
|
requests.put(uri, headers=airwatchHeaders)
|
|
return True
|
|
return False
|
|
|
|
def GetEnrollmentTokens(self, groupUuid, deviceType=2):
|
|
cmdURI = f"/API/mdm/groups/{groupUuid}/enrollment-tokens?device_type={deviceType}&page_size=500"
|
|
uri = f"{self.Server}{cmdURI}"
|
|
airwatchHeaders = self.GetHeaders(cmdURI)
|
|
result = requests.get(uri, headers=airwatchHeaders)
|
|
return result.json()["tokens"]
|
|
|
|
def UpdateUserOnEnrollmentTokens(self, groupUuid,user, serialnumber):
|
|
body = {
|
|
"registration_type": "REGISTER_DEVICE",
|
|
"device_registration_record": {
|
|
"user_uuid": user.Uuid,
|
|
"friendly_name": serialnumber,
|
|
"ownership_type": "CORPORATE_DEDICATED",
|
|
"serial_number": serialnumber,
|
|
"to_email_address": user.Email,
|
|
"message_type": 0
|
|
}
|
|
}
|
|
cmdURI = f"/API/mdm/groups/{groupUuid}/enrollment-tokens"
|
|
airwatchHeaders = self.GetHeaders(cmdURI)
|
|
airwatchHeaders["Accept"] = "application/json;version=2"
|
|
uri = f"{self.Server}{cmdURI}"
|
|
print(uri)
|
|
return requests.post(uri, headers=airwatchHeaders, json=body)
|
|
|
|
def SyncDEPDevices(self, groupUuid):
|
|
cmdURI = f"/API/mdm/dep/groups/{groupUuid}/devices?action=sync"
|
|
uri = f"{self.Server}{cmdURI}"
|
|
airwatchHeaders = self.GetHeaders(cmdURI)
|
|
return requests.put(uri, headers=airwatchHeaders).status_code
|
|
|
|
def SetDeviceFriendlyName(self, device, friendlyName):
|
|
cmdURI = f"/API/mdm/devices/{device.Id}"
|
|
airwatchHeaders = self.GetHeaders(cmdURI)
|
|
uri = f"{self.Server}{cmdURI}"
|
|
body = {
|
|
"DeviceFriendlyName":friendlyName
|
|
}
|
|
return requests.put(uri, headers=airwatchHeaders, json=body).status_code
|
|
|
|
def SetDeviceUser(self, device, airwatchUser):
|
|
cmdURI = f'/API/mdm/devices/{device.Id}/enrollmentuser/{airwatchUser.Id}'
|
|
uri = f"{self.Server}{cmdURI}"
|
|
airwatchHeaders = self.GetHeaders(cmdURI)
|
|
return requests.patch(uri, headers=airwatchHeaders).status_code
|
|
|
|
def DeleteDevice(self, device):
|
|
cmdURI = f"/API/mdm/devices/{device.Id}"
|
|
airwatchHeaders = self.GetHeaders(cmdURI)
|
|
uri = f"{self.Server}{cmdURI}"
|
|
return requests.delete(uri, headers=airwatchHeaders).status_code
|
|
|
|
class AirwatchUser:
|
|
|
|
def __init__(self, user):
|
|
self.Id = user["Id"]["Value"]
|
|
self.Uuid = user["Uuid"]
|
|
self.UserName = user["UserName"]
|
|
self.FirstName = user["FirstName"]
|
|
self.LastName = user["LastName"]
|
|
self.Enabled = user["Status"]
|
|
self.Email = user["Email"]
|
|
self.DisplayName = user["DisplayName"]
|
|
self.Group = user["Group"]
|
|
self.GroupId = user["LocationGroupId"]
|
|
self.OrgUuid = user["OrganizationGroupUuid"]
|
|
self.DeviceCount = int(user["EnrolledDevicesCount"])
|
|
|
|
class AirwatchDevice:
|
|
|
|
def __init__(self, device):
|
|
self.Id = device["Id"]["Value"]
|
|
self.Uuid = device["Uuid"]
|
|
self.SerialNumber = device["SerialNumber"]
|
|
self.Imei = device["Imei"]
|
|
self.MacAddress = device["MacAddress"]
|
|
self.FriendlyName = device["DeviceFriendlyName"]
|
|
self.GroupId = device["LocationGroupId"]["Id"]["Value"]
|
|
self.Group = device["LocationGroupName"]
|
|
self.GroupUuid = device["LocationGroupId"]["Uuid"]
|
|
self.UserId = device["UserId"]["Id"]["Value"]
|
|
self.User = device["UserName"]
|
|
self.UserEmail = device["UserEmailAddress"]
|
|
self.PlatformId = device["PlatformId"]["Id"]["Value"]
|
|
self.Platform = device["Platform"]
|
|
self.OS = device["OperatingSystem"]
|
|
self.Arch = device["ProcessorArchitecture"]
|
|
self.TotalMemory = device["TotalPhysicalMemory"]
|
|
self.EnrollmentStatus = device["EnrollmentStatus"]
|
|
self.LastEnrolledOn = device["LastEnrolledOn"]
|
|
self.LastSeen = device["LastSeen"]
|
|
|
|
class AirwatchApplication:
|
|
|
|
def __init__(self, application):
|
|
self.Name = application["name"]
|
|
self.Guid = application["bundle_id"]
|
|
self.Size = application["size"]
|
|
self.Version = application["installed_version"]
|
|
self.InstallDate = application["latest_uem_action_time"]
|
|
self.Status = application["installed_status"] |