From d8658bd987ec06d1336427cd8e4471a2236046ac Mon Sep 17 00:00:00 2001 From: anish Date: Wed, 3 Jun 2026 13:09:23 +0530 Subject: [PATCH] manager.py: support multiple application processes --- .../manager/agent_group.py | 116 ++++++++++++++++++ .../manager/launcher/launcher_gzsim.py | 28 ++++- .../manager/manager.py | 113 ++++++++++------- 3 files changed, 210 insertions(+), 47 deletions(-) create mode 100644 robotics_application_manager/manager/agent_group.py diff --git a/robotics_application_manager/manager/agent_group.py b/robotics_application_manager/manager/agent_group.py new file mode 100644 index 0000000..88f9590 --- /dev/null +++ b/robotics_application_manager/manager/agent_group.py @@ -0,0 +1,116 @@ +"""Agent group abstraction for the Robotics Application Manager. + +RAM's lifecycle operations (play / pause / resume / reset / stop) used to act on +a single application process. Multi-agent exercises (e.g. drone cat-and-mouse, +which runs a "cat" and a "mouse") need them to act on *all* the agents at once. + +Rather than teaching the RAM FiniteStateMachine about multiple agents, the FSM +stays single-target: it fires one transition, and that transition fans the +operation out over every member of this group. So a single ``pause`` trigger +becomes "suspend agent0, suspend agent1, ... ", a single ``reset`` becomes a +sequence over each agent, and so on. The FSM definition never changes; only the +execution layer behind each handler iterates the group. + +This holds N agents (1 for most exercises, 2 for cat-mouse, more in future +multi-robot exercises) and exposes the per-agent *process* operations. World / +simulator level calls (pause_sim, reset_sim) remain single calls in the manager, +because a shared Gazebo world has one physics clock and cannot be paused +per-agent. +""" + +import os +import signal + +import psutil + +from robotics_application_manager.libs import stop_process_and_children +from robotics_application_manager.ram_logging import LogManager + + +class AgentGroup: + """A set of agent processes managed as one unit. + + Each lifecycle method fans the operation out over every member, so callers + treat the whole group as if it were a single agent. + """ + + def __init__(self): + # name -> subprocess.Popen + self._agents = {} + + # ----- membership ------------------------------------------------------- + + def add(self, name, proc): + """Register an agent process under a name (e.g. 'agentA', 'agentB').""" + self._agents[name] = proc + + def clear(self): + """Forget all members without touching the processes.""" + self._agents = {} + + def names(self): + return list(self._agents.keys()) + + def __len__(self): + return len(self._agents) + + def __bool__(self): + # Lets callers keep writing `if self.application_processes:` + return bool(self._agents) + + def __iter__(self): + return iter(self._agents.values()) + + # ----- group lifecycle operations (each fans out over all members) ------ + + def pause_all(self): + """Suspend every agent process and its whole child tree (SIGSTOP).""" + for proc in self._agents.values(): + self._apply_to_tree(proc, lambda child: child.suspend()) + + def resume_all(self): + """Resume every agent process and its whole child tree (SIGCONT).""" + for proc in self._agents.values(): + self._apply_to_tree(proc, lambda child: child.resume()) + + def signal_stop_all(self): + """SIGSTOP each top-level agent process (used to gate them before + the simulator unpauses, so no agent acts on a still-paused world).""" + self._signal_all(signal.SIGSTOP) + + def signal_cont_all(self): + """SIGCONT each top-level agent process (release after unpause).""" + self._signal_all(signal.SIGCONT) + + def kill_all(self): + """Terminate every agent process and its children, then empty the group.""" + for name, proc in self._agents.items(): + try: + stop_process_and_children(proc) + except Exception: + LogManager.logger.exception(f"Error stopping agent '{name}'") + self.clear() + + # ----- helpers ---------------------------------------------------------- + + def _signal_all(self, sig): + for proc in self._agents.values(): + try: + os.kill(proc.pid, sig) + except ProcessLookupError: + pass + + @staticmethod + def _apply_to_tree(proc, fn): + """Run ``fn`` on the process and every descendant, tolerating races.""" + try: + parent = psutil.Process(proc.pid) + tree = parent.children(recursive=True) + tree.append(parent) + for child in tree: + try: + fn(child) + except psutil.NoSuchProcess: + pass + except psutil.NoSuchProcess: + pass diff --git a/robotics_application_manager/manager/launcher/launcher_gzsim.py b/robotics_application_manager/manager/launcher/launcher_gzsim.py index 261b19a..2335aee 100644 --- a/robotics_application_manager/manager/launcher/launcher_gzsim.py +++ b/robotics_application_manager/manager/launcher/launcher_gzsim.py @@ -1,3 +1,4 @@ +import re import sys from .launcher_interface import ILauncher from robotics_application_manager.manager.docker_thread import DockerThread @@ -14,6 +15,23 @@ from robotics_application_manager import LogManager +def _find_drone_namespaces(): + """Return list of drone namespaces that have a state_machine/_reset service.""" + try: + result = subprocess.run( + ["ros2", "service", "list", "--include-hidden-services"], + capture_output=True, text=True, timeout=10, + ) + namespaces = [] + for line in result.stdout.splitlines(): + m = re.match(r"^/([^/]+)/platform/state_machine/_reset$", line.strip()) + if m: + namespaces.append(m.group(1)) + return namespaces + except Exception: + return [] + + def call_gzservice(service, reqtype, reptype, timeout, req): command = f"gz service -s {service} --reqtype {reqtype} --reptype {reptype} --timeout {timeout} --req '{req}'" try: @@ -151,9 +169,10 @@ def unpause(self): ) def reset(self): - if is_ros_service_available("/drone0/platform/state_machine/_reset"): + # Reset state machine for every drone namespace currently running. + for ns in _find_drone_namespaces(): call_service( - "/drone0/platform/state_machine/_reset", + f"/{ns}/platform/state_machine/_reset", "std_srvs/srv/Trigger", "{}", ) @@ -164,8 +183,9 @@ def reset(self): "3000", "reset: {all: true}", ) - if is_ros_service_available("/drone0/controller/_reset"): - call_service("/drone0/controller/_reset", "std_srvs/srv/Trigger", "{}") + for ns in _find_drone_namespaces(): + if is_ros_service_available(f"/{ns}/controller/_reset"): + call_service(f"/{ns}/controller/_reset", "std_srvs/srv/Trigger", "{}") def get_dri_path(self): directory_path = "/dev/dri" diff --git a/robotics_application_manager/manager/manager.py b/robotics_application_manager/manager/manager.py index 2e49bde..a0ceb08 100644 --- a/robotics_application_manager/manager/manager.py +++ b/robotics_application_manager/manager/manager.py @@ -16,7 +16,6 @@ import signal import subprocess import re -import psutil import shutil import time import base64 @@ -34,7 +33,6 @@ from robotics_application_manager.libs import ( check_gpu_acceleration, get_class_from_file, - stop_process_and_children, ConfigurationManager, ) from robotics_application_manager.ram_logging import LogManager @@ -45,6 +43,7 @@ ) from robotics_application_manager.manager.lint import Lint from robotics_application_manager.manager.editor import serialize_completions +from robotics_application_manager.manager.agent_group import AgentGroup class Manager: @@ -226,7 +225,10 @@ def __init__(self, host: str, port: int): self.world_type = None self.robot_launcher = None self.tools_launcher = None - self.application_process = None + # Group of agent processes (1 for single-agent exercises, 2+ for + # multi-agent ones like drone cat-mouse). Lifecycle ops fan out over + # all members; the FSM stays single-target. See AgentGroup. + self.application_processes = AgentGroup() self.running = True self.linter = Lint() @@ -722,6 +724,10 @@ def on_change_style(self, event): except Exception as e: LogManager.logger.exception(f"Error refreshing GTK applications: {e}") + def _kill_all_applications(self): + # Fan kill out over every agent in the group, then empty it. + self.application_processes.kill_all() + def on_run_application(self, event): """ Handle the 'run_application' event. @@ -734,12 +740,7 @@ def on_run_application(self, event): event: The event object containing application configuration and code data. """ # Kill already running code - try: - proc = psutil.Process(self.application_process.pid) - proc.suspend() - proc.kill() - except Exception: - pass + self._kill_all_applications() # Delete old files if os.path.exists("/workspace/code"): @@ -751,11 +752,14 @@ def on_run_application(self, event): entrypoint = app_cfg["entrypoint"] to_lint = app_cfg["linter"] - # Unzip the app + # the code comes as a base64 data-uri from the browser, strip the header part if app_cfg["code"].startswith("data:"): _, _, code = app_cfg["code"].partition("base64,") with open("/workspace/code/app.zip", "wb") as result: result.write(base64.b64decode(code)) + + # just extract evrything to /workspace/code — if theres a processB/ folder + # inside the zip it'll end up at /workspace/code/processB/ on its own zip_ref = zipfile.ZipFile("/workspace/code/app.zip", "r") zip_ref.extractall("/workspace/code") zip_ref.close() @@ -786,9 +790,8 @@ def on_run_application(self, event): if returncode != 0: raise Exception("Failed to compile") - self.unpause_sim() if entrypoint.endswith(".launch.py"): - self.application_process = subprocess.Popen( + proc = subprocess.Popen( [ f"source /workspace/code/install/setup.bash && ros2 launch {entrypoint}" ], @@ -801,8 +804,7 @@ def on_run_application(self, event): executable="/bin/bash", ) else: - - self.application_process = subprocess.Popen( + proc = subprocess.Popen( [ "source /workspace/code/install/setup.bash && ros2 run academy academyCode" ], @@ -814,6 +816,8 @@ def on_run_application(self, event): shell=True, executable="/bin/bash", ) + self.application_processes.add("agentA", proc) + self.unpause_sim() return # Pass the linter @@ -831,8 +835,7 @@ def on_run_application(self, event): fds = os.listdir("/dev/pts/") console_fd = str(max(map(int, fds[:-1]))) - self.unpause_sim() - self.application_process = subprocess.Popen( + proc = subprocess.Popen( ["python3", entrypoint], stdin=open("/dev/pts/" + console_fd, "r"), stdout=open("/dev/pts/" + console_fd, "w"), @@ -840,6 +843,39 @@ def on_run_application(self, event): bufsize=1024, universal_newlines=True, ) + self.application_processes.add("agentA", proc) + + # check if theres a second agent in processB/ subdir — this one is + # pre-programmed from the server side so no need to lint it + processb_entrypoint = os.path.join( + "/workspace/code/processB", os.path.basename(entrypoint) + ) + if os.path.isfile(processb_entrypoint): + # PYTHONPATH must include /workspace/code so processB can import + # commons (hal_interfaces, gui_interfaces, etc.) which extract there. + # Python only adds the script's own directory to sys.path, not parent. + proc_b_env = os.environ.copy() + proc_b_env["PYTHONPATH"] = "/workspace/code:" + proc_b_env.get("PYTHONPATH", "") + proc_b = subprocess.Popen( + ["python3", processb_entrypoint], + env=proc_b_env, + stdin=open("/dev/pts/" + console_fd, "r"), + stdout=open("/dev/pts/" + console_fd, "w"), + stderr=sys.stdout, + bufsize=1024, + universal_newlines=True, + ) + self.application_processes.add("agentB", proc_b) + + # SIGSTOP every agent first, then unpause gazebo, then SIGCONT them all + # together — so no agent acts on a still-paused world and they start in + # lockstep. using finally so even if unpause fails we dont leave frozen + # processes behind. Fans out over the whole group (1 agent or N). + self.application_processes.signal_stop_all() + try: + self.unpause_sim() + finally: + self.application_processes.signal_cont_all() LogManager.logger.info("Run application transition finished") @@ -853,11 +889,16 @@ def on_terminate_application(self, event): Parameters: event: The event object associated with the termination request. """ - if self.application_process: + if self.application_processes: try: - stop_process_and_children(self.application_process) - self.application_process = None + # Pause sim first — freezes Gazebo physics immediately so every + # agent in the group (drones, robots, etc.) stops mid-motion + # before its process is killed. Then kill_all() fans the kill + # out over the whole group and reset_sim() resets every agent. + # General: works for N agents across all exercises with no + # exercise-specific zero-velocity code. self.pause_sim() + self._kill_all_applications() self.reset_sim() except Exception: LogManager.logger.exception("No application running") @@ -894,10 +935,9 @@ def on_disconnect(self, event): terminates launchers, and restarts the script. """ - if self.application_process: + if self.application_processes: try: - stop_process_and_children(self.application_process) - self.application_process = None + self._kill_all_applications() except Exception as e: LogManager.logger.exception("Exception stopping application process") @@ -929,15 +969,9 @@ def process_message(self, message): self.consumer.send_message(message.response(response)) def on_pause(self, msg): - if self.application_process is not None: - proc = psutil.Process(self.application_process.pid) - children = proc.children(recursive=True) - children.append(proc) - for p in children: - try: - p.suspend() - except psutil.NoSuchProcess: - pass + if self.application_processes: + # Suspend every agent in the group, then freeze the shared world. + self.application_processes.pause_all() self.pause_sim() else: LogManager.logger.warning( @@ -953,15 +987,9 @@ def on_resume(self, msg): Parameters: msg: The event or message triggering the resume action. """ - if self.application_process is not None: - proc = psutil.Process(self.application_process.pid) - children = proc.children(recursive=True) - children.append(proc) - for p in children: - try: - p.resume() - except psutil.NoSuchProcess: - pass + if self.application_processes: + # Unfreeze the shared world, then resume every agent in the group. + self.application_processes.resume_all() self.unpause_sim() else: LogManager.logger.warning( @@ -1028,10 +1056,9 @@ def signal_handler(sign, frame): except Exception as e: LogManager.logger.exception("Exception stopping consumer") - if self.application_process: + if self.application_processes: try: - stop_process_and_children(self.application_process) - self.application_process = None + self._kill_all_applications() except Exception as e: LogManager.logger.exception( "Exception stopping application process"