# 安卓

# 获取进程 cpu 使用时间

def getprocessCpuStat(self):
    """get the cpu usage of a process at a certain time"""
    cmd = f'cat /proc/{self.pid}/stat'
    result = adb.shell(cmd=cmd, deviceId=self.deviceId)
    r = re.compile("\\s+")
    toks = r.split(result)
    processCpu = float(int(toks[13]) + int(toks[14]) + int(toks[15]) + int(toks[16]))
    return processCpu

# 获取总 cpu 使用时间

def getTotalCpuStat(self):
    """get the total cpu usage at a certain time"""
    cmd = f'cat /proc/stat |{d.filterType()} ^cpu'
    result = adb.shell(cmd=cmd, deviceId=self.deviceId)
    r = re.compile(r'(?<!cpu)\d+')
    toks = r.findall(result)
    totalCpu = 0
    for i in range(1, 9):
        totalCpu += float(toks[i])
    return float(totalCpu)

# 获取 CPU 核数

def getCpuCores(self):
    """get Android cpu cores"""
    cmd = 'cat /sys/devices/system/cpu/online'
    result = adb.shell(cmd=cmd, deviceId=self.deviceId)
    try:
        nums = int(result.split('-')[1]) + 1
    except:
        nums = 1
    return nums

# 获取系统 CPU 使用时间

def getSysCpuStat(self):
    """get the total cpu usage at a certain time"""
    cmd = f'cat /proc/stat |{d.filterType()} ^cpu'
    result = adb.shell(cmd=cmd, deviceId=self.deviceId)
    r = re.compile(r'(?<!cpu)\d+')
    toks = r.findall(result)
    ileCpu = int(toks[4])
    sysCpu = self.getTotalCpuStat() - ileCpu
    return sysCpu

# 采集结果

间隔 0.5s 连续获取两次采集结果,然后计算得到 cpu 使用率 https://yestermorrow.github.io/2021/03/17/CPU 使用率 /

def getAndroidCpuRate(self, noLog=False):
    """get the Android cpu rate of a process"""
    processCpuTime_1 = self.getprocessCpuStat()
    totalCpuTime_1 = self.getTotalCpuStat()
    sysCpuTime_1 = self.getSysCpuStat()
    time.sleep(0.5)
    processCpuTime_2 = self.getprocessCpuStat()
    totalCpuTime_2 = self.getTotalCpuStat()
    sysCpuTime_2 = self.getSysCpuStat()
    appCpuRate = round(float((processCpuTime_2 - processCpuTime_1) / (totalCpuTime_2 - totalCpuTime_1) * 100), 2)
    sysCpuRate = round(float((sysCpuTime_2 - sysCpuTime_1) / (totalCpuTime_2 - totalCpuTime_1) * 100), 2)
    if noLog is False:
        apm_time = datetime.datetime.now().strftime('%H:%M:%S.%f')
        f.add_log(os.path.join(f.report_dir,'cpu_app.log'), apm_time, appCpuRate)
        f.add_log(os.path.join(f.report_dir,'cpu_sys.log'), apm_time, sysCpuRate)
    return appCpuRate, sysCpuRate

# IOS

ios 基于 pydevice 和 instrument 协议解析

采集数据来自于 tidevice_instruments.py iter_cpu_memory 方法

def iter_cpu_memory(self) -> Iterator[dict]:
    """
    Close connection after iterator stop
    Iterator content eg:
        [{'CPUCount': 2,
        'EnabledCPUs': 2,
        'EndMachAbsTime': 2158497307470,
        'PerCPUUsage': [{'CPU_NiceLoad': 0.0,
                        'CPU_SystemLoad': -1.0,
                        'CPU_TotalLoad': 13.0,
                        'CPU_UserLoad': -1.0},
                        {'CPU_NiceLoad': 0.0,
                        'CPU_SystemLoad': -1.0,
                        'CPU_TotalLoad': 31.0,
                        'CPU_UserLoad': -1.0}],
        'StartMachAbsTime': 2158473307786,
        'SystemCPUUsage': {'CPU_NiceLoad': 0.0,
                            'CPU_SystemLoad': -1.0,
                            'CPU_TotalLoad': 44.0,
                            'CPU_UserLoad': -1.0},
        'Type': 33},
        {'EndMachAbsTime': 2158515468993,
        "cpuUsage", "ctxSwitch", "intWakeups", "physFootprint",
            "memResidentSize", "memAnon", "pid"
        'Processes': {0: [0.20891292720792148, # cpuUsage
                            335770139, # contextSwitch
                            120505483, # interruptWakeups
                            7913472,   # physical Footprint
                            869646336, # memory RSS
                            232210432, # memory Anon?
                            0],        # pid
                        1: [0.0005502246751775457,
                            691065,
                            6038,
                            12353840,
                            4177920,
                            12255232,
                            1]
                        }
        }]
    """
    config = {
        "bm": 0,
        "cpuUsage": True,
        "procAttrs": [
            "memVirtualSize", "cpuUsage", "ctxSwitch", "intWakeups",
            "physFootprint", "memResidentSize", "memAnon", "pid"
        ],
        "sampleInterval": 1000000000, # 1e9 ns == 1s
        "sysAttrs": [
            "vmExtPageCount", "vmFreeCount", "vmPurgeableCount",
            "vmSpeculativeCount", "physMemSize"
        ],
        "ur": 1000
    }
    channel_id = self.make_channel(InstrumentsService.Sysmontap)
    self.call_message(channel_id, "setConfig:", [config])
    self.call_message(channel_id, "start", [])
    # channel = self.make_channel(
    #     "com.apple.instruments.server.services.processcontrol")
    # aux = AUXMessageBuffer()
    # aux.append_obj(1)  # TODO: pid
    # payload = DTXPayload.build("startObservingPid:", aux)
    # self.send_dtx_message(channel, payload)
    notification_channel_id = (1<<32) - channel_id
    try:
        for m in self.iter_message(Event.NOTIFICATION):
            if m.flags == 0x01 and m.channel_id == notification_channel_id:
                yield m.result
    except GeneratorExit:
        self.close() # 停止 connection,防止消息不停的发过来,暂时不会别的方法
        # print("Stop channel")
        ## The following code is not working
        # self.call_message(channel_id, "stopSampling")
        # aux = AUXMessageBuffer()
        # aux.append_obj(channel_id)
        # self.send_dtx_message(channel_id, DTXPayload.build('_channelCanceled:', aux))
  • 该方法与 instrument 建立通信 发送协议请求,获取数据
    其数据结果包含了
  • cpu 数量
  • 启动的 cpu
  • 单个 cpu 使用率