[Physical AI W1D2] 5/6 — ROS 2 서비스·액션 구현: 요청/응답과 목표/피드백

2026. 6. 14. 17:15·피지컬AI

[Physical AI W1D2 · 5/6]

즉시 응답이 필요한 서비스(AddTwoInts)와 오래 걸리는 작업의 액션(Fibonacci)을 직접 구현한다. Server/Client 코드를 rclpy로 작성해 빌드·실행하고, CLI로도 호출하며 토픽과 무엇이 다른지 손으로 확인한다.

이 글에서 직접 만드는 것

  • 서비스: 두 정수를 더하는 add_two_ints Server/Client + ros2 service call
  • 액션: 피보나치 수열을 피드백과 함께 계산하는 fibonacci Server/Client + ros2 action send_goal
  • 토픽 vs 서비스 vs 액션을 코드로 체감

(4편의 패키지 ros2_comm_practice를 그대로 확장합니다. 같은 빌드·실행 사이클을 반복합니다.)


들어가며

4편의 토픽은 "계속 흘려보내기"였습니다. 이번엔 요청하면 답하는 서비스와 목표를 주면 진행 상황을 보고하며 끝내는 액션을 만듭니다. 3편 개념을 코드로 굳히는 단계입니다.

💡 Colab 단일 터미널 안내 — 아래 "터미널 1/2"는 개념 구분입니다. Colab(터미널 1개)에선 서버를 백그라운드로 띄우고 클라이언트를 포그라운드로 실행하세요. 예: ros2 run ros2_comm_practice add_two_ints_server > server.log 2>&1 & 로 서버를 띄운 뒤 클라이언트를 실행, 끝나면 pkill -f add_two_ints_server로 정리. (액션도 동일 패턴)


1. 서비스 Server 작성 — 두 정수 더하기

두 정수를 받아 합을 돌려주는 서비스를 만듭니다.

cd ~/ros2_comm_ws/src/ros2_comm_practice/ros2_comm_practice
nano add_two_ints_server.py
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts


class AddTwoIntsServer(Node):
    def __init__(self):
        super().__init__('add_two_ints_server')

        self.srv = self.create_service(
            AddTwoInts,
            'add_two_ints',
            self.add_two_ints_callback
        )

        self.get_logger().info('AddTwoInts service server is ready.')

    def add_two_ints_callback(self, request, response):
        response.sum = request.a + request.b

        self.get_logger().info(
            f'Request: a={request.a}, b={request.b}, sum={response.sum}'
        )

        return response


def main(args=None):
    rclpy.init(args=args)

    node = AddTwoIntsServer()

    rclpy.spin(node)

    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
  • create_service(AddTwoInts, 'add_two_ints', callback) — 서비스 생성(타입·이름·콜백).
  • 콜백은 request(입력 a·b)를 받아 response.sum을 채워 반환합니다. 이게 서비스의 "요청→응답" 본질.
  • AddTwoInts는 example_interfaces가 제공하는 표준 서비스 타입(4편에서 설치).

2. 서비스 Client 작성

cd ~/ros2_comm_ws/src/ros2_comm_practice/ros2_comm_practice
nano add_two_ints_client.py
import sys
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts


class AddTwoIntsClient(Node):
    def __init__(self):
        super().__init__('add_two_ints_client')

        self.client = self.create_client(
            AddTwoInts,
            'add_two_ints'
        )

        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Service not available, waiting...')

    def send_request(self, a, b):
        request = AddTwoInts.Request()
        request.a = a
        request.b = b

        future = self.client.call_async(request)

        rclpy.spin_until_future_complete(self, future)

        return future.result()


def main(args=None):
    rclpy.init(args=args)

    if len(sys.argv) != 3:
        print('Usage: ros2 run ros2_comm_practice add_two_ints_client 3 5')
        return

    a = int(sys.argv[1])
    b = int(sys.argv[2])

    node = AddTwoIntsClient()

    response = node.send_request(a, b)

    node.get_logger().info(f'Result: {a} + {b} = {response.sum}')

    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
  • wait_for_service() — 서버가 뜰 때까지 대기(서버 먼저, 클라이언트 나중에 떠도 안전).
  • call_async() + spin_until_future_complete() — 비동기로 요청을 보내고 응답이 올 때까지 기다립니다.

3. 서비스 등록 · 빌드 · 실행

setup.py의 console_scripts에 추가합니다.

entry_points={
    'console_scripts': [
        'simple_publisher = ros2_comm_practice.simple_publisher:main',
        'simple_subscriber = ros2_comm_practice.simple_subscriber:main',
        'add_two_ints_server = ros2_comm_practice.add_two_ints_server:main',
        'add_two_ints_client = ros2_comm_practice.add_two_ints_client:main',
    ],
},
cd ~/ros2_comm_ws
colcon build
source install/setup.bash

# 터미널 1 — 서버
ros2 run ros2_comm_practice add_two_ints_server

# 터미널 2 — 클라이언트 (3 + 5)
source /opt/ros/humble/setup.bash
cd ~/ros2_comm_ws && source install/setup.bash
ros2 run ros2_comm_practice add_two_ints_client 3 5
# [INFO] [add_two_ints_client]: Result: 3 + 5 = 8

CLI로도 서비스를 직접 호출할 수 있습니다.

ros2 service list
ros2 service type /add_two_ints          # example_interfaces/srv/AddTwoInts
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 10, b: 20}"
# response:
# example_interfaces.srv.AddTwoInts_Response(sum=30)

💡 ros2 service call은 코드 없이도 서버를 두드려 볼 수 있어, "서버가 살아 있나" 확인에 유용합니다.


4. 액션 Server 작성 — 피보나치

액션은 목표(Goal) 를 받아 피드백(Feedback) 을 보내며 결과(Result) 를 반환합니다. 목표 숫자만큼 피보나치 수열을 계산하는 fibonacci 액션을 만듭니다.

cd ~/ros2_comm_ws/src/ros2_comm_practice/ros2_comm_practice
nano fibonacci_action_server.py
import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from example_interfaces.action import Fibonacci


class FibonacciActionServer(Node):
    def __init__(self):
        super().__init__('fibonacci_action_server')

        self.action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback
        )

        self.get_logger().info('Fibonacci action server is ready.')

    def execute_callback(self, goal_handle):
        self.get_logger().info(
            f'Received goal: order={goal_handle.request.order}'
        )

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.sequence = [0, 1]

        for i in range(2, goal_handle.request.order):
            next_value = feedback_msg.sequence[i - 1] + feedback_msg.sequence[i - 2]
            feedback_msg.sequence.append(next_value)

            self.get_logger().info(f'Feedback: {feedback_msg.sequence}')

            goal_handle.publish_feedback(feedback_msg)

            time.sleep(1)

        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = feedback_msg.sequence

        self.get_logger().info(f'Result: {result.sequence}')

        return result


def main(args=None):
    rclpy.init(args=args)

    node = FibonacciActionServer()

    rclpy.spin(node)

    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
  • ActionServer(self, Fibonacci, 'fibonacci', execute_callback) — 액션 서버 생성.
  • 계산 중간마다 goal_handle.publish_feedback()으로 진행 상황을 보냅니다(time.sleep(1)로 "오래 걸리는 작업"을 흉내).
  • goal_handle.succeed() 후 Result를 반환 — 서비스와 달리 중간 피드백 + 최종 결과가 분리됩니다.

💡 참고 — 3편에서 액션의 기능으로 Cancel(취소) 을 소개했지만, 이 예제 서버는 취소를 구현하지 않습니다(rclpy 기본값은 취소 요청 거부). 실제로 취소를 지원하려면 ActionServer(..., cancel_callback=...)로 취소 콜백을 따로 등록하고 execute_callback에서 goal_handle.is_cancel_requested를 확인해야 합니다. 여기서는 Goal·Feedback·Result 흐름에 집중합니다.


5. 액션 Client 작성

cd ~/ros2_comm_ws/src/ros2_comm_practice/ros2_comm_practice
nano fibonacci_action_client.py
import sys
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from example_interfaces.action import Fibonacci


class FibonacciActionClient(Node):
    def __init__(self):
        super().__init__('fibonacci_action_client')

        self.action_client = ActionClient(
            self,
            Fibonacci,
            'fibonacci'
        )

    def send_goal(self, order):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self.action_client.wait_for_server()

        self.get_logger().info(f'Sending goal: order={order}')

        send_goal_future = self.action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback
        )

        send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()

        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected.')
            return

        self.get_logger().info('Goal accepted.')

        result_future = goal_handle.get_result_async()
        result_future.add_done_callback(self.get_result_callback)

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info(f'Received feedback: {feedback.sequence}')

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info(f'Received result: {result.sequence}')
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)

    if len(sys.argv) != 2:
        print('Usage: ros2 run ros2_comm_practice fibonacci_action_client 8')
        return

    order = int(sys.argv[1])

    node = FibonacciActionClient()
    node.send_goal(order)

    rclpy.spin(node)


if __name__ == '__main__':
    main()
  • 액션 클라이언트는 콜백 체인으로 동작합니다: goal 전송 → goal_response_callback(수락 여부) → feedback_callback(진행 상황 반복) → get_result_callback(최종 결과). 토픽·서비스보다 구조가 복잡한 건 "장시간 + 피드백 + 취소"를 다루기 때문입니다.

6. 액션 등록 · 빌드 · 실행

setup.py에 두 줄 추가:

entry_points={
    'console_scripts': [
        'simple_publisher = ros2_comm_practice.simple_publisher:main',
        'simple_subscriber = ros2_comm_practice.simple_subscriber:main',
        'add_two_ints_server = ros2_comm_practice.add_two_ints_server:main',
        'add_two_ints_client = ros2_comm_practice.add_two_ints_client:main',
        'fibonacci_action_server = ros2_comm_practice.fibonacci_action_server:main',
        'fibonacci_action_client = ros2_comm_practice.fibonacci_action_client:main',
    ],
},
cd ~/ros2_comm_ws
colcon build
source install/setup.bash

# 터미널 1 — 액션 서버
ros2 run ros2_comm_practice fibonacci_action_server

# 터미널 2 — 액션 클라이언트 (order=8)
source /opt/ros/humble/setup.bash
cd ~/ros2_comm_ws && source install/setup.bash
ros2 run ros2_comm_practice fibonacci_action_client 8

클라이언트 출력 — 피드백이 한 줄씩 쌓이다가 결과로 마무리됩니다.

[INFO] Sending goal: order=8
[INFO] Goal accepted.
[INFO] Received feedback: [0, 1, 1]
[INFO] Received feedback: [0, 1, 1, 2]
[INFO] Received feedback: [0, 1, 1, 2, 3]
[INFO] Received feedback: [0, 1, 1, 2, 3, 5]
[INFO] Received feedback: [0, 1, 1, 2, 3, 5, 8]
[INFO] Received feedback: [0, 1, 1, 2, 3, 5, 8, 13]
[INFO] Received result: [0, 1, 1, 2, 3, 5, 8, 13]

CLI로도 목표를 보낼 수 있습니다.

ros2 action list                # /fibonacci
ros2 action info /fibonacci
ros2 action send_goal /fibonacci example_interfaces/action/Fibonacci "{order: 8}" --feedback

7. 흔한 오류와 해결

오류 원인 / 해결
ros2: command not found ROS 2 환경 미적용 → source /opt/ros/humble/setup.bash
Package 'ros2_comm_practice' not found 빌드/소스 누락 → colcon build → source install/setup.bash
실행 파일 못 찾음 setup.py의 console_scripts 미등록 또는 재빌드 안 함
서비스가 응답 없음 서버 미실행/이름 불일치 → ros2 service list, ros2 service type /add_two_ints
액션 서버 못 찾음 ros2 action list, ros2 action info /fibonacci 로 확인

⚠️ 거의 모든 오류는 "빌드 후 source install/setup.bash를 안 했거나, setup.py 등록을 빠뜨렸거나" 입니다. 막히면 이 둘부터 점검하세요.


8. 직접 해볼 과제

  • 토픽: simple_publisher.py가 battery_status 토픽으로 Robot battery level is N%를 100에서 1씩 줄여가며 1초마다 발행하도록 변경.
  • 서비스: add_two_ints_server.py를 두 수의 곱을 반환하는 multiply_two_ints로 변경(타입은 AddTwoInts 재사용).
  • 액션: fibonacci_action_server.py의 피드백에 진행률을 함께 출력 — Progress: 3/8, sequence: [0, 1, 1].
  • 설계: 이동 로봇이 카메라·LiDAR로 장애물을 인식하고 목표까지 이동 후 로봇 팔로 물체를 집는 시스템을 노드 5개+/토픽 4개+/서비스 1개+/액션 1개+ 로 설계하고 통신 흐름 설명.

5편 정리

  • 서비스 = create_service/create_client, 콜백이 request→response. 즉시 응답.
  • 액션 = ActionServer/ActionClient, publish_feedback + succeed + Result. 장시간·피드백·취소.
  • 등록·빌드·실행 사이클은 토픽과 동일(setup.py → colcon build → source → ros2 run).
  • CLI: ros2 service call, ros2 action send_goal --feedback.

이로써 노드·토픽·서비스·액션 4종을 모두 손으로 구현했습니다. 로봇 시스템은 이 넷을 조합해 만들어집니다.

다음 편 예고

지금까지는 패키지 하나에 코드를 쌓았습니다. 6편에서는 한 걸음 물러나 패키지 자체를 제대로 다룹니다 — Catkin과 Colcon의 차이, Python(ament_python)·C++(ament_cmake) 패키지를 각각 생성·빌드하고, setup.py vs CMakeLists.txt, 빌드 옵션, 실무 패키지 설계까지. ROS 2 개발의 기본기를 마무리합니다.

📚 Week1 Day2 전체 목차 (총 6편)

  • 1/6 리눅스 기초 — 쉘·파일시스템·핵심 명령어
  • 2/6 리눅스 실전 8 시나리오
  • 3/6 ROS 2 통신 4종 개념 — 노드·토픽·서비스·액션
  • 4/6 ROS 2 토픽 Pub/Sub 직접 만들기
  • 5/6 ROS 2 서비스·액션 구현 — 이번 글
  • 6/6 ROS 2 패키지 빌드 — Python·C++·colcon
저작자표시 (새창열림)

'피지컬AI' 카테고리의 다른 글

[Physical AI W2D1] 1/6 — RViz로 로봇을 눈으로 보다: 시각화 흐름·Fixed Frame·Display  (0) 2026.06.20
[Physical AI W1D2] 6/6 — ROS 2 패키지 빌드 마스터: Python·C++·colcon  (0) 2026.06.14
[Physical AI W1D2] 4/6 — ROS 2 토픽 직접 만들기: Publisher·Subscriber  (0) 2026.06.14
[Physical AI W1D2] 3/6 — ROS 2 통신의 4가지 길: 노드·토픽·서비스·액션  (0) 2026.06.14
[Physical AI W1D2] 2/6 — 리눅스 실전 8 시나리오: 명령어를 손에 익히기  (0) 2026.06.14
'피지컬AI' 카테고리의 다른 글
  • [Physical AI W2D1] 1/6 — RViz로 로봇을 눈으로 보다: 시각화 흐름·Fixed Frame·Display
  • [Physical AI W1D2] 6/6 — ROS 2 패키지 빌드 마스터: Python·C++·colcon
  • [Physical AI W1D2] 4/6 — ROS 2 토픽 직접 만들기: Publisher·Subscriber
  • [Physical AI W1D2] 3/6 — ROS 2 통신의 4가지 길: 노드·토픽·서비스·액션
hyeseong-dev
hyeseong-dev
안녕하세요. 백엔드 개발자 이혜성입니다.
  • hyeseong-dev
    어제 오늘 그리고 내일
    hyeseong-dev
  • 전체
    오늘
    어제
    • 분류 전체보기 (342) N
      • 여러가지 (11) N
        • 알고리즘 & 자료구조 (73)
        • 오류 (4)
        • 이것저것 (29)
        • 일기 (1)
      • 프레임워크 (39)
        • 자바 스프링 (39)
        • React Native (0)
      • 프로그래밍 언어 (39)
        • 파이썬 (31)
        • 자바 (3)
        • 스프링부트 (5)
      • 컴퓨터 구조와 운영체제 (3)
      • DB (17)
        • SQL (0)
        • Redis (17)
      • 클라우드 컴퓨팅 (21)
        • 도커 (2)
        • AWS (19)
      • 스케쥴 (65)
        • 세미나 (0)
        • 수료 (0)
        • 스터디 (24)
        • 시험 (41)
      • 트러블슈팅 (1)
      • 자격증 (2) N
        • 정보처리기사 (0)
        • 정보보안기사 (1)
        • 네트워크관리사 (1) N
      • 재태크 (0)
        • 암호화폐 (0)
        • 기타 (0)
      • 피지컬AI (26)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    FastAPI
    완전탐색
    EC2
    피지컬ai
    AWS네트워크계층으로읽기
    취업리부트
    moveit
    Python
    운동학
    항해99
    rclpy
    네트워크
    클라우드
    그리디
    자바
    TF
    celery
    java
    Spring WebFlux
    ROS2
    docker
    SAA
    Redis
    동차변환행렬
    프로그래머스
    로봇팔
    역운동학
    Spring Boot
    WebFlux
    AWS
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
hyeseong-dev
[Physical AI W1D2] 5/6 — ROS 2 서비스·액션 구현: 요청/응답과 목표/피드백
상단으로

티스토리툴바