[Physical AI W1D2 · 5/6]
즉시 응답이 필요한 서비스(AddTwoInts)와 오래 걸리는 작업의 액션(Fibonacci)을 직접 구현한다. Server/Client 코드를 rclpy로 작성해 빌드·실행하고, CLI로도 호출하며 토픽과 무엇이 다른지 손으로 확인한다.
이 글에서 직접 만드는 것
- 서비스: 두 정수를 더하는
add_two_intsServer/Client +ros2 service call- 액션: 피보나치 수열을 피드백과 함께 계산하는
fibonacciServer/Client +ros2 action send_goal- 토픽 vs 서비스 vs 액션을 코드로 체감
(4편의 패키지 ros2_comm_practice를 그대로 확장합니다. 같은 빌드·실행 사이클을 반복합니다.)
들어가며
4편의 토픽은 "계속 흘려보내기"였습니다. 이번엔 요청하면 답하는 서비스와 목표를 주면 진행 상황을 보고하며 끝내는 액션을 만듭니다. 3편 개념을 코드로 굳히는 단계입니다.
💡 Colab 단일 터미널 안내 — 아래 "터미널 1/2"는 개념 구분입니다. Colab(터미널 1개)에선 서버를 백그라운드로 띄우고 클라이언트를 포그라운드로 실행하세요. 예:
ros2 run ros2_comm_practice add_two_ints_server > server.log 2>&1 &로 서버를 띄운 뒤 클라이언트를 실행, 끝나면pkill -f add_two_ints_server로 정리. (액션도 동일 패턴)
1. 서비스 Server 작성 — 두 정수 더하기
두 정수를 받아 합을 돌려주는 서비스를 만듭니다.
cd ~/ros2_comm_ws/src/ros2_comm_practice/ros2_comm_practice
nano add_two_ints_server.py
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class AddTwoIntsServer(Node):
def __init__(self):
super().__init__('add_two_ints_server')
self.srv = self.create_service(
AddTwoInts,
'add_two_ints',
self.add_two_ints_callback
)
self.get_logger().info('AddTwoInts service server is ready.')
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info(
f'Request: a={request.a}, b={request.b}, sum={response.sum}'
)
return response
def main(args=None):
rclpy.init(args=args)
node = AddTwoIntsServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
create_service(AddTwoInts, 'add_two_ints', callback)— 서비스 생성(타입·이름·콜백).- 콜백은
request(입력 a·b)를 받아response.sum을 채워 반환합니다. 이게 서비스의 "요청→응답" 본질. AddTwoInts는example_interfaces가 제공하는 표준 서비스 타입(4편에서 설치).
2. 서비스 Client 작성
cd ~/ros2_comm_ws/src/ros2_comm_practice/ros2_comm_practice
nano add_two_ints_client.py
import sys
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class AddTwoIntsClient(Node):
def __init__(self):
super().__init__('add_two_ints_client')
self.client = self.create_client(
AddTwoInts,
'add_two_ints'
)
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting...')
def send_request(self, a, b):
request = AddTwoInts.Request()
request.a = a
request.b = b
future = self.client.call_async(request)
rclpy.spin_until_future_complete(self, future)
return future.result()
def main(args=None):
rclpy.init(args=args)
if len(sys.argv) != 3:
print('Usage: ros2 run ros2_comm_practice add_two_ints_client 3 5')
return
a = int(sys.argv[1])
b = int(sys.argv[2])
node = AddTwoIntsClient()
response = node.send_request(a, b)
node.get_logger().info(f'Result: {a} + {b} = {response.sum}')
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
wait_for_service()— 서버가 뜰 때까지 대기(서버 먼저, 클라이언트 나중에 떠도 안전).call_async()+spin_until_future_complete()— 비동기로 요청을 보내고 응답이 올 때까지 기다립니다.
3. 서비스 등록 · 빌드 · 실행
setup.py의 console_scripts에 추가합니다.
entry_points={
'console_scripts': [
'simple_publisher = ros2_comm_practice.simple_publisher:main',
'simple_subscriber = ros2_comm_practice.simple_subscriber:main',
'add_two_ints_server = ros2_comm_practice.add_two_ints_server:main',
'add_two_ints_client = ros2_comm_practice.add_two_ints_client:main',
],
},
cd ~/ros2_comm_ws
colcon build
source install/setup.bash
# 터미널 1 — 서버
ros2 run ros2_comm_practice add_two_ints_server
# 터미널 2 — 클라이언트 (3 + 5)
source /opt/ros/humble/setup.bash
cd ~/ros2_comm_ws && source install/setup.bash
ros2 run ros2_comm_practice add_two_ints_client 3 5
# [INFO] [add_two_ints_client]: Result: 3 + 5 = 8
CLI로도 서비스를 직접 호출할 수 있습니다.
ros2 service list
ros2 service type /add_two_ints # example_interfaces/srv/AddTwoInts
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 10, b: 20}"
# response:
# example_interfaces.srv.AddTwoInts_Response(sum=30)
💡
ros2 service call은 코드 없이도 서버를 두드려 볼 수 있어, "서버가 살아 있나" 확인에 유용합니다.
4. 액션 Server 작성 — 피보나치
액션은 목표(Goal) 를 받아 피드백(Feedback) 을 보내며 결과(Result) 를 반환합니다. 목표 숫자만큼 피보나치 수열을 계산하는 fibonacci 액션을 만듭니다.
cd ~/ros2_comm_ws/src/ros2_comm_practice/ros2_comm_practice
nano fibonacci_action_server.py
import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from example_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self.action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback
)
self.get_logger().info('Fibonacci action server is ready.')
def execute_callback(self, goal_handle):
self.get_logger().info(
f'Received goal: order={goal_handle.request.order}'
)
feedback_msg = Fibonacci.Feedback()
feedback_msg.sequence = [0, 1]
for i in range(2, goal_handle.request.order):
next_value = feedback_msg.sequence[i - 1] + feedback_msg.sequence[i - 2]
feedback_msg.sequence.append(next_value)
self.get_logger().info(f'Feedback: {feedback_msg.sequence}')
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.sequence
self.get_logger().info(f'Result: {result.sequence}')
return result
def main(args=None):
rclpy.init(args=args)
node = FibonacciActionServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
ActionServer(self, Fibonacci, 'fibonacci', execute_callback)— 액션 서버 생성.- 계산 중간마다
goal_handle.publish_feedback()으로 진행 상황을 보냅니다(time.sleep(1)로 "오래 걸리는 작업"을 흉내). goal_handle.succeed()후Result를 반환 — 서비스와 달리 중간 피드백 + 최종 결과가 분리됩니다.
💡 참고 — 3편에서 액션의 기능으로 Cancel(취소) 을 소개했지만, 이 예제 서버는 취소를 구현하지 않습니다(rclpy 기본값은 취소 요청 거부). 실제로 취소를 지원하려면
ActionServer(..., cancel_callback=...)로 취소 콜백을 따로 등록하고execute_callback에서goal_handle.is_cancel_requested를 확인해야 합니다. 여기서는 Goal·Feedback·Result 흐름에 집중합니다.
5. 액션 Client 작성
cd ~/ros2_comm_ws/src/ros2_comm_practice/ros2_comm_practice
nano fibonacci_action_client.py
import sys
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from example_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self.action_client = ActionClient(
self,
Fibonacci,
'fibonacci'
)
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self.action_client.wait_for_server()
self.get_logger().info(f'Sending goal: order={order}')
send_goal_future = self.action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected.')
return
self.get_logger().info('Goal accepted.')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Received feedback: {feedback.sequence}')
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(f'Received result: {result.sequence}')
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
if len(sys.argv) != 2:
print('Usage: ros2 run ros2_comm_practice fibonacci_action_client 8')
return
order = int(sys.argv[1])
node = FibonacciActionClient()
node.send_goal(order)
rclpy.spin(node)
if __name__ == '__main__':
main()
- 액션 클라이언트는 콜백 체인으로 동작합니다: goal 전송 →
goal_response_callback(수락 여부) →feedback_callback(진행 상황 반복) →get_result_callback(최종 결과). 토픽·서비스보다 구조가 복잡한 건 "장시간 + 피드백 + 취소"를 다루기 때문입니다.
6. 액션 등록 · 빌드 · 실행
setup.py에 두 줄 추가:
entry_points={
'console_scripts': [
'simple_publisher = ros2_comm_practice.simple_publisher:main',
'simple_subscriber = ros2_comm_practice.simple_subscriber:main',
'add_two_ints_server = ros2_comm_practice.add_two_ints_server:main',
'add_two_ints_client = ros2_comm_practice.add_two_ints_client:main',
'fibonacci_action_server = ros2_comm_practice.fibonacci_action_server:main',
'fibonacci_action_client = ros2_comm_practice.fibonacci_action_client:main',
],
},
cd ~/ros2_comm_ws
colcon build
source install/setup.bash
# 터미널 1 — 액션 서버
ros2 run ros2_comm_practice fibonacci_action_server
# 터미널 2 — 액션 클라이언트 (order=8)
source /opt/ros/humble/setup.bash
cd ~/ros2_comm_ws && source install/setup.bash
ros2 run ros2_comm_practice fibonacci_action_client 8
클라이언트 출력 — 피드백이 한 줄씩 쌓이다가 결과로 마무리됩니다.
[INFO] Sending goal: order=8
[INFO] Goal accepted.
[INFO] Received feedback: [0, 1, 1]
[INFO] Received feedback: [0, 1, 1, 2]
[INFO] Received feedback: [0, 1, 1, 2, 3]
[INFO] Received feedback: [0, 1, 1, 2, 3, 5]
[INFO] Received feedback: [0, 1, 1, 2, 3, 5, 8]
[INFO] Received feedback: [0, 1, 1, 2, 3, 5, 8, 13]
[INFO] Received result: [0, 1, 1, 2, 3, 5, 8, 13]
CLI로도 목표를 보낼 수 있습니다.
ros2 action list # /fibonacci
ros2 action info /fibonacci
ros2 action send_goal /fibonacci example_interfaces/action/Fibonacci "{order: 8}" --feedback
7. 흔한 오류와 해결
| 오류 | 원인 / 해결 |
|---|---|
ros2: command not found |
ROS 2 환경 미적용 → source /opt/ros/humble/setup.bash |
Package 'ros2_comm_practice' not found |
빌드/소스 누락 → colcon build → source install/setup.bash |
| 실행 파일 못 찾음 | setup.py의 console_scripts 미등록 또는 재빌드 안 함 |
| 서비스가 응답 없음 | 서버 미실행/이름 불일치 → ros2 service list, ros2 service type /add_two_ints |
| 액션 서버 못 찾음 | ros2 action list, ros2 action info /fibonacci 로 확인 |
⚠️ 거의 모든 오류는 "빌드 후
source install/setup.bash를 안 했거나, setup.py 등록을 빠뜨렸거나" 입니다. 막히면 이 둘부터 점검하세요.
8. 직접 해볼 과제
- 토픽:
simple_publisher.py가battery_status토픽으로Robot battery level is N%를 100에서 1씩 줄여가며 1초마다 발행하도록 변경. - 서비스:
add_two_ints_server.py를 두 수의 곱을 반환하는multiply_two_ints로 변경(타입은 AddTwoInts 재사용). - 액션:
fibonacci_action_server.py의 피드백에 진행률을 함께 출력 —Progress: 3/8, sequence: [0, 1, 1]. - 설계: 이동 로봇이 카메라·LiDAR로 장애물을 인식하고 목표까지 이동 후 로봇 팔로 물체를 집는 시스템을 노드 5개+/토픽 4개+/서비스 1개+/액션 1개+ 로 설계하고 통신 흐름 설명.
5편 정리
- 서비스 =
create_service/create_client, 콜백이request→response. 즉시 응답. - 액션 =
ActionServer/ActionClient,publish_feedback+succeed+Result. 장시간·피드백·취소. - 등록·빌드·실행 사이클은 토픽과 동일(
setup.py→colcon build→source→ros2 run). - CLI:
ros2 service call,ros2 action send_goal --feedback.
이로써 노드·토픽·서비스·액션 4종을 모두 손으로 구현했습니다. 로봇 시스템은 이 넷을 조합해 만들어집니다.
다음 편 예고
지금까지는 패키지 하나에 코드를 쌓았습니다. 6편에서는 한 걸음 물러나 패키지 자체를 제대로 다룹니다 — Catkin과 Colcon의 차이, Python(ament_python)·C++(ament_cmake) 패키지를 각각 생성·빌드하고, setup.py vs CMakeLists.txt, 빌드 옵션, 실무 패키지 설계까지. ROS 2 개발의 기본기를 마무리합니다.
📚 Week1 Day2 전체 목차 (총 6편)
- 1/6 리눅스 기초 — 쉘·파일시스템·핵심 명령어
- 2/6 리눅스 실전 8 시나리오
- 3/6 ROS 2 통신 4종 개념 — 노드·토픽·서비스·액션
- 4/6 ROS 2 토픽 Pub/Sub 직접 만들기
- 5/6 ROS 2 서비스·액션 구현 — 이번 글
- 6/6 ROS 2 패키지 빌드 — Python·C++·colcon
'피지컬AI' 카테고리의 다른 글
| [Physical AI W2D1] 1/6 — RViz로 로봇을 눈으로 보다: 시각화 흐름·Fixed Frame·Display (0) | 2026.06.20 |
|---|---|
| [Physical AI W1D2] 6/6 — ROS 2 패키지 빌드 마스터: Python·C++·colcon (0) | 2026.06.14 |
| [Physical AI W1D2] 4/6 — ROS 2 토픽 직접 만들기: Publisher·Subscriber (0) | 2026.06.14 |
| [Physical AI W1D2] 3/6 — ROS 2 통신의 4가지 길: 노드·토픽·서비스·액션 (0) | 2026.06.14 |
| [Physical AI W1D2] 2/6 — 리눅스 실전 8 시나리오: 명령어를 손에 익히기 (0) | 2026.06.14 |