diff --git a/robotics_application_manager/manager/agent_group.py b/robotics_application_manager/manager/agent_group.py new file mode 100644 index 0000000..88f9590 --- /dev/null +++ b/robotics_application_manager/manager/agent_group.py @@ -0,0 +1,116 @@ +"""Agent group abstraction for the Robotics Application Manager. + +RAM's lifecycle operations (play / pause / resume / reset / stop) used to act on +a single application process. Multi-agent exercises (e.g. drone cat-and-mouse, +which runs a "cat" and a "mouse") need them to act on *all* the agents at once. + +Rather than teaching the RAM FiniteStateMachine about multiple agents, the FSM +stays single-target: it fires one transition, and that transition fans the +operation out over every member of this group. So a single ``pause`` trigger +becomes "suspend agent0, suspend agent1, ... ", a single ``reset`` becomes a +sequence over each agent, and so on. The FSM definition never changes; only the +execution layer behind each handler iterates the group. + +This holds N agents (1 for most exercises, 2 for cat-mouse, more in future +multi-robot exercises) and exposes the per-agent *process* operations. World / +simulator level calls (pause_sim, reset_sim) remain single calls in the manager, +because a shared Gazebo world has one physics clock and cannot be paused +per-agent. +""" + +import os +import signal + +import psutil + +from robotics_application_manager.libs import stop_process_and_children +from robotics_application_manager.ram_logging import LogManager + + +class AgentGroup: + """A set of agent processes managed as one unit. + + Each lifecycle method fans the operation out over every member, so callers + treat the whole group as if it were a single agent. + """ + + def __init__(self): + # name -> subprocess.Popen + self._agents = {} + + # ----- membership ------------------------------------------------------- + + def add(self, name, proc): + """Register an agent process under a name (e.g. 'agentA', 'agentB').""" + self._agents[name] = proc + + def clear(self): + """Forget all members without touching the processes.""" + self._agents = {} + + def names(self): + return list(self._agents.keys()) + + def __len__(self): + return len(self._agents) + + def __bool__(self): + # Lets callers keep writing `if self.application_processes:` + return bool(self._agents) + + def __iter__(self): + return iter(self._agents.values()) + + # ----- group lifecycle operations (each fans out over all members) ------ + + def pause_all(self): + """Suspend every agent process and its whole child tree (SIGSTOP).""" + for proc in self._agents.values(): + self._apply_to_tree(proc, lambda child: child.suspend()) + + def resume_all(self): + """Resume every agent process and its whole child tree (SIGCONT).""" + for proc in self._agents.values(): + self._apply_to_tree(proc, lambda child: child.resume()) + + def signal_stop_all(self): + """SIGSTOP each top-level agent process (used to gate them before + the simulator unpauses, so no agent acts on a still-paused world).""" + self._signal_all(signal.SIGSTOP) + + def signal_cont_all(self): + """SIGCONT each top-level agent process (release after unpause).""" + self._signal_all(signal.SIGCONT) + + def kill_all(self): + """Terminate every agent process and its children, then empty the group.""" + for name, proc in self._agents.items(): + try: + stop_process_and_children(proc) + except Exception: + LogManager.logger.exception(f"Error stopping agent '{name}'") + self.clear() + + # ----- helpers ---------------------------------------------------------- + + def _signal_all(self, sig): + for proc in self._agents.values(): + try: + os.kill(proc.pid, sig) + except ProcessLookupError: + pass + + @staticmethod + def _apply_to_tree(proc, fn): + """Run ``fn`` on the process and every descendant, tolerating races.""" + try: + parent = psutil.Process(proc.pid) + tree = parent.children(recursive=True) + tree.append(parent) + for child in tree: + try: + fn(child) + except psutil.NoSuchProcess: + pass + except psutil.NoSuchProcess: + pass diff --git a/robotics_application_manager/manager/launcher/launcher_gzsim.py b/robotics_application_manager/manager/launcher/launcher_gzsim.py index bddd677..b0da9c7 100644 --- a/robotics_application_manager/manager/launcher/launcher_gzsim.py +++ b/robotics_application_manager/manager/launcher/launcher_gzsim.py @@ -1,3 +1,4 @@ +import re import sys from .launcher_interface import ILauncher from robotics_application_manager.manager.docker_thread import DockerThread @@ -20,6 +21,23 @@ from gz.transport13 import Node +def _find_drone_namespaces(): + """Return list of drone namespaces that have a state_machine/_reset service.""" + try: + result = subprocess.run( + ["ros2", "service", "list", "--include-hidden-services"], + capture_output=True, text=True, timeout=10, + ) + namespaces = [] + for line in result.stdout.splitlines(): + m = re.match(r"^/([^/]+)/platform/state_machine/_reset$", line.strip()) + if m: + namespaces.append(m.group(1)) + return namespaces + except Exception: + return [] + + def call_gzservice(service, reqtype, reptype, timeout, req): command = f"gz service -s {service} --reqtype {reqtype} --reptype {reptype} --timeout {timeout} --req '{req}'" try: @@ -157,9 +175,12 @@ def unpause(self): ) def reset(self, robot_entity=None): - if is_ros_service_available("/drone0/platform/state_machine/_reset"): + # reset the AS2 state machine for every drone that's up (not just drone0). + # _find_drone_namespaces returns [] for non-drone exercises, so this is + # a no-op there and only the world reset below runs. + for ns in _find_drone_namespaces(): call_service( - "/drone0/platform/state_machine/_reset", + f"/{ns}/platform/state_machine/_reset", "std_srvs/srv/Trigger", "{}", ) @@ -190,8 +211,10 @@ def reset(self, robot_entity=None): 10000, ) - if is_ros_service_available("/drone0/controller/_reset"): - call_service("/drone0/controller/_reset", "std_srvs/srv/Trigger", "{}") + # reset each drone's controller too (not just drone0) + for ns in _find_drone_namespaces(): + if is_ros_service_available(f"/{ns}/controller/_reset"): + call_service(f"/{ns}/controller/_reset", "std_srvs/srv/Trigger", "{}") def get_dri_path(self): directory_path = "/dev/dri" diff --git a/robotics_application_manager/manager/manager.py b/robotics_application_manager/manager/manager.py index d438046..d0a1334 100644 --- a/robotics_application_manager/manager/manager.py +++ b/robotics_application_manager/manager/manager.py @@ -16,7 +16,6 @@ import signal import subprocess import re -import psutil import shutil import time import base64 @@ -34,7 +33,6 @@ from robotics_application_manager.libs import ( check_gpu_acceleration, get_class_from_file, - stop_process_and_children, ConfigurationManager, ) from robotics_application_manager.ram_logging import LogManager @@ -45,6 +43,7 @@ ) from robotics_application_manager.manager.lint import Lint from robotics_application_manager.manager.editor import serialize_completions +from robotics_application_manager.manager.agent_group import AgentGroup class Manager: @@ -227,7 +226,10 @@ def __init__(self, host: str, port: int): self.robot_launcher = None self.robot_config = None self.tools_launcher = None - self.application_process = None + # Group of agent processes (1 for single-agent exercises, 2+ for + # multi-agent ones like drone cat-mouse). Lifecycle ops fan out over + # all members; the FSM stays single-target. See AgentGroup. + self.application_processes = AgentGroup() self.running = True self.linter = Lint() @@ -724,6 +726,10 @@ def on_change_style(self, event): except Exception as e: LogManager.logger.exception(f"Error refreshing GTK applications: {e}") + def _kill_all_applications(self): + # Fan kill out over every agent in the group, then empty it. + self.application_processes.kill_all() + def on_run_application(self, event): """ Handle the 'run_application' event. @@ -736,12 +742,7 @@ def on_run_application(self, event): event: The event object containing application configuration and code data. """ # Kill already running code - try: - proc = psutil.Process(self.application_process.pid) - proc.suspend() - proc.kill() - except Exception: - pass + self._kill_all_applications() # Delete old files if os.path.exists("/workspace/code"): @@ -753,11 +754,14 @@ def on_run_application(self, event): entrypoint = app_cfg["entrypoint"] to_lint = app_cfg["linter"] - # Unzip the app + # the code comes as a base64 data-uri from the browser, strip the header part if app_cfg["code"].startswith("data:"): _, _, code = app_cfg["code"].partition("base64,") with open("/workspace/code/app.zip", "wb") as result: result.write(base64.b64decode(code)) + + # just extract evrything to /workspace/code — if theres a processB/ folder + # inside the zip it'll end up at /workspace/code/processB/ on its own zip_ref = zipfile.ZipFile("/workspace/code/app.zip", "r") zip_ref.extractall("/workspace/code") zip_ref.close() @@ -788,9 +792,8 @@ def on_run_application(self, event): if returncode != 0: raise Exception("Failed to compile") - self.unpause_sim() if entrypoint.endswith(".launch.py"): - self.application_process = subprocess.Popen( + proc = subprocess.Popen( [ f"source /workspace/code/install/setup.bash && ros2 launch {entrypoint}" ], @@ -804,8 +807,7 @@ def on_run_application(self, event): start_new_session=True, ) else: - - self.application_process = subprocess.Popen( + proc = subprocess.Popen( [ "source /workspace/code/install/setup.bash && ros2 run academy academyCode" ], @@ -818,6 +820,8 @@ def on_run_application(self, event): executable="/bin/bash", start_new_session=True, ) + self.application_processes.add("agentA", proc) + self.unpause_sim() return # Pass the linter @@ -835,8 +839,7 @@ def on_run_application(self, event): fds = os.listdir("/dev/pts/") console_fd = str(max(map(int, fds[:-1]))) - self.unpause_sim() - self.application_process = subprocess.Popen( + proc = subprocess.Popen( ["python3", entrypoint], stdin=open("/dev/pts/" + console_fd, "r"), stdout=open("/dev/pts/" + console_fd, "w"), @@ -844,6 +847,39 @@ def on_run_application(self, event): bufsize=1024, universal_newlines=True, ) + self.application_processes.add("agentA", proc) + + # check if theres a second agent in processB/ subdir — this one is + # pre-programmed from the server side so no need to lint it + processb_entrypoint = os.path.join( + "/workspace/code/processB", os.path.basename(entrypoint) + ) + if os.path.isfile(processb_entrypoint): + # PYTHONPATH must include /workspace/code so processB can import + # commons (hal_interfaces, gui_interfaces, etc.) which extract there. + # Python only adds the script's own directory to sys.path, not parent. + proc_b_env = os.environ.copy() + proc_b_env["PYTHONPATH"] = "/workspace/code:" + proc_b_env.get("PYTHONPATH", "") + proc_b = subprocess.Popen( + ["python3", processb_entrypoint], + env=proc_b_env, + stdin=open("/dev/pts/" + console_fd, "r"), + stdout=open("/dev/pts/" + console_fd, "w"), + stderr=sys.stdout, + bufsize=1024, + universal_newlines=True, + ) + self.application_processes.add("agentB", proc_b) + + # SIGSTOP every agent first, then unpause gazebo, then SIGCONT them all + # together — so no agent acts on a still-paused world and they start in + # lockstep. using finally so even if unpause fails we dont leave frozen + # processes behind. Fans out over the whole group (1 agent or N). + self.application_processes.signal_stop_all() + try: + self.unpause_sim() + finally: + self.application_processes.signal_cont_all() LogManager.logger.info("Run application transition finished") @@ -857,11 +893,16 @@ def on_terminate_application(self, event): Parameters: event: The event object associated with the termination request. """ - if self.application_process: + if self.application_processes: try: - stop_process_and_children(self.application_process) - self.application_process = None + # Pause sim first — freezes Gazebo physics immediately so every + # agent in the group (drones, robots, etc.) stops mid-motion + # before its process is killed. Then kill_all() fans the kill + # out over the whole group and reset_sim() resets every agent. + # General: works for N agents across all exercises with no + # exercise-specific zero-velocity code. self.pause_sim() + self._kill_all_applications() self.reset_sim() except Exception: LogManager.logger.exception("No application running") @@ -898,10 +939,9 @@ def on_disconnect(self, event): terminates launchers, and restarts the script. """ - if self.application_process: + if self.application_processes: try: - stop_process_and_children(self.application_process) - self.application_process = None + self._kill_all_applications() except Exception as e: LogManager.logger.exception("Exception stopping application process") @@ -933,15 +973,9 @@ def process_message(self, message): self.consumer.send_message(message.response(response)) def on_pause(self, msg): - if self.application_process is not None: - proc = psutil.Process(self.application_process.pid) - children = proc.children(recursive=True) - children.append(proc) - for p in children: - try: - p.suspend() - except psutil.NoSuchProcess: - pass + if self.application_processes: + # Suspend every agent in the group, then freeze the shared world. + self.application_processes.pause_all() self.pause_sim() else: LogManager.logger.warning( @@ -957,15 +991,9 @@ def on_resume(self, msg): Parameters: msg: The event or message triggering the resume action. """ - if self.application_process is not None: - proc = psutil.Process(self.application_process.pid) - children = proc.children(recursive=True) - children.append(proc) - for p in children: - try: - p.resume() - except psutil.NoSuchProcess: - pass + if self.application_processes: + # Unfreeze the shared world, then resume every agent in the group. + self.application_processes.resume_all() self.unpause_sim() else: LogManager.logger.warning( @@ -1039,10 +1067,9 @@ def signal_handler(sign, frame): except Exception as e: LogManager.logger.exception("Exception stopping consumer") - if self.application_process: + if self.application_processes: try: - stop_process_and_children(self.application_process) - self.application_process = None + self._kill_all_applications() except Exception as e: LogManager.logger.exception( "Exception stopping application process"