Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions driver/connect.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,60 @@
import logging
import multiprocessing

from pymobiledevice3.lockdown import create_using_usbmux, LockdownClient

from pymobiledevice3.cli.remote import install_driver_if_required
from pymobiledevice3.cli.remote import select_device, RemoteServiceDiscoveryService
from pymobiledevice3.cli.remote import start_tunnel
from pymobiledevice3.cli.remote import verify_tunnel_imports
from pymobiledevice3.remote.common import TunnelProtocol
from pymobiledevice3.remote.module_imports import start_tunnel, verify_tunnel_imports
from pymobiledevice3.remote.tunnel_service import get_core_device_tunnel_services

from pymobiledevice3.services.amfi import AmfiService

from pymobiledevice3.exceptions import NoDeviceConnectedError

def get_usbmux_lockdownclient():
async def get_usbmux_lockdownclient():
while True:
try:
lockdown = create_using_usbmux()
lockdown = await create_using_usbmux()
except NoDeviceConnectedError:
print("请连接设备后按回车...")
input()
else:
break
while True:
lockdown = create_using_usbmux()
lockdown = await create_using_usbmux()
if lockdown.all_values.get("PasswordProtected"):
print("请解锁设备后按回车...")
input()
else:
break
return create_using_usbmux()
return await create_using_usbmux()

def get_version(lockdown: LockdownClient):
return lockdown.all_values.get("ProductVersion")

def get_developer_mode_status(lockdown: LockdownClient):
return lockdown.developer_mode_status
async def get_developer_mode_status(lockdown: LockdownClient):
return await lockdown.get_developer_mode_status()

def reveal_developer_mode(lockdown: LockdownClient):
AmfiService(lockdown).create_amfi_show_override_path_file()
async def reveal_developer_mode(lockdown: LockdownClient):
await AmfiService(lockdown).reveal_developer_mode_option_in_ui()

def enable_developer_mode(lockdown: LockdownClient):
AmfiService(lockdown).enable_developer_mode()
async def enable_developer_mode(lockdown: LockdownClient):
await AmfiService(lockdown).enable_developer_mode()

def get_serverrsd():
install_driver_if_required()

async def tunnel(queue: multiprocessing.Queue):
if not verify_tunnel_imports():
exit(1)
return select_device(None)


async def tunnel(rsd: RemoteServiceDiscoveryService, queue: multiprocessing.Queue):
async with start_tunnel(rsd, None) as tunnel_result:
queue.put((tunnel_result.address, tunnel_result.port))
await tunnel_result.client.wait_closed()
services = await get_core_device_tunnel_services()
if not services:
raise NoDeviceConnectedError()
service = services[0]

try:
# iOS 18.2+ removed QUIC tunnel support, so TCP must be used. On
# Python < 3.13 this relies on the sslpsk_pmd3 PSK-TLS fallback.
async with start_tunnel(service, protocol=TunnelProtocol.TCP) as tunnel_result:
queue.put((tunnel_result.address, tunnel_result.port))
await tunnel_result.client.wait_closed()
finally:
await service.close()
10 changes: 4 additions & 6 deletions driver/location.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from pymobiledevice3.cli.developer import LocationSimulation
async def set_location(location_simulation, lat: float, lng: float):
await location_simulation.set(lat, lng)

def set_location(dvt, lat: float, lng: float):
LocationSimulation(dvt).set(lat, lng)

def clear_location(dvt):
LocationSimulation(dvt).clear()
async def clear_location(location_simulation):
await location_simulation.clear()
8 changes: 4 additions & 4 deletions init/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from driver import connect

def init():
async def init():
# check if root on mac or Administrator on windows
if sys.platform == "win32":
if not ctypes.windll.shell32.IsUserAnAdmin():
Expand All @@ -19,7 +19,7 @@ def init():
sys.exit(1)

# get lockdown client
lockdown = connect.get_usbmux_lockdownclient()
lockdown = await connect.get_usbmux_lockdownclient()

# check version
version = connect.get_version(lockdown)
Expand All @@ -29,8 +29,8 @@ def init():
sys.exit(1)

# check developer mode status
developer_mode_status = connect.get_developer_mode_status(lockdown)
developer_mode_status = await connect.get_developer_mode_status(lockdown)
if not developer_mode_status:
connect.reveal_developer_mode(lockdown)
await connect.reveal_developer_mode(lockdown)
print("您未开启开发者模式,请打开设备的 设置-隐私与安全性-开发者模式 来开启,开启后需要重启并输入密码,完成后再次运行此程序")
sys.exit(1)
3 changes: 1 addition & 2 deletions init/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from driver import connect

def tunnel_proc(queue: multiprocessing.Queue):
server_rsd = connect.get_serverrsd()
asyncio.run(connect.tunnel(server_rsd, queue))
asyncio.run(connect.tunnel(queue))


def tunnel():
Expand Down
35 changes: 23 additions & 12 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import signal
import logging
import asyncio
import coloredlogs
import os

from driver import location

from pymobiledevice3.cli.remote import RemoteServiceDiscoveryService
from pymobiledevice3.cli.developer import DvtSecureSocketProxyService
from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService
from pymobiledevice3.services.dvt.instruments.dvt_provider import DvtProvider
from pymobiledevice3.services.dvt.instruments.location_simulation import LocationSimulation

from init import init
from init import tunnel
Expand Down Expand Up @@ -34,7 +36,7 @@



def main():
async def amain():
# set level
logger = logging.getLogger(__name__)
coloredlogs.install(level=logging.INFO)
Expand All @@ -43,7 +45,7 @@ def main():
logger.setLevel(logging.DEBUG)
coloredlogs.install(level=logging.DEBUG)

init.init()
await init.init()
logger.info("init done")

# start the tunnel in another process
Expand All @@ -59,22 +61,25 @@ def main():
loc = route.get_route()
logger.info(f"got route from {config.config.routeConfig}")


with RemoteServiceDiscoveryService((address, port)) as rsd:
with DvtSecureSocketProxyService(rsd) as dvt:
rsd = RemoteServiceDiscoveryService((address, port))
await rsd.connect()
try:
async with DvtProvider(rsd) as dvt, LocationSimulation(dvt) as location_simulation:
try:
print(f"已开始模拟跑步,速度大约为 {config.config.v} m/s")
print("会无限循环,按 Ctrl+C 退出")
print("请勿直接关闭窗口,否则无法还原正常定位")
run.run(dvt, loc, config.config.v)
except KeyboardInterrupt:
await run.run(location_simulation, loc, config.config.v)
except (KeyboardInterrupt, asyncio.CancelledError):
logger.debug("get KeyboardInterrupt (inner)")
logger.debug(f"Is process alive? {process.is_alive()}")
finally:
logger.debug(f"Is process alive? {process.is_alive()}")
logger.debug("Start to clear location")
location.clear_location(dvt)
await location.clear_location(location_simulation)
logger.info("Location cleared")
finally:
await rsd.close()


except KeyboardInterrupt:
Expand All @@ -86,8 +91,14 @@ def main():
process.terminate()
logger.info("tunnel process terminated")
print("Bye")




def main():
try:
asyncio.run(amain())
except KeyboardInterrupt:
pass


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pymobiledevice3==2.46.1
pymobiledevice3==9.27.0
PyYAML==6.0.1
geopy==2.4.1
14 changes: 8 additions & 6 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import math
import time
import random
import asyncio

from geopy.distance import geodesic

Expand Down Expand Up @@ -134,22 +135,23 @@ def fixLockT(loc: list, v, dt):
t += dt
return fixedLoc

def run1(dvt, loc: list, v, dt=0.2):
async def run1(location_simulation, loc: list, v, dt=0.2):
fixedLoc = fixLockT(loc, v, dt)
nList = (5, 6, 7, 8, 9)
n = nList[random.randint(0, len(nList)-1)]
fixedLoc = randLoc(fixedLoc, n=n) # a path will be divided into n parts for random route
clock = time.time()
for i in fixedLoc:
# utils.setLoc(bd09Towgs84(i))
location.set_location(dvt, **bd09Towgs84(i))
while time.time()-clock < dt:
pass
await location.set_location(location_simulation, **bd09Towgs84(i))
remaining = dt - (time.time() - clock)
if remaining > 0:
await asyncio.sleep(remaining)
clock = time.time()

def run(dvt, loc: list, v, d=15):
async def run(location_simulation, loc: list, v, d=15):
random.seed(time.time())
while True:
vRand = 1000/(1000/v-(2*random.random()-1)*d)
run1(dvt, loc, vRand)
await run1(location_simulation, loc, vRand)
print("跑完一圈了")