개발환경
Desktop Ubuntu20.04 LTS
ROS2 foxy
설명
지난번 만들었던 온습도센서 퍼블리시 노드를 기반으로 한다.
하지만 매번 라즈베리의 GPIO를 통해 센서데이터를 받아오기 불편하다는 단점이 있다.
따라서 임의값을 지정하여 온습도 데이터를 퍼블리시하였다.
온습도 데이터중, 습도 데이터에 따라 가습기에 연결된 릴레이 스위치를 작동시킨다.
사람이 가장 쾌적할 수 있는 습도 범위는 40% ~ 60%라고 한다.
따라서 그 중간값인 50%를 기준으로 작동을 지시한다.
과정
1. DHT_FAKE_PUB 노드를 통해 임의의 온습도 데이터를 퍼블리시한다.
2. RELAY_PUB 노드가 이를 서브스크라이브 한다.
3. 서브스크라이브한 습도를 가지고 IF문을 거친다.
4. 정수형태의 1과 0을 퍼블리시 한다.
5. 1은 릴레이에 HIGH 신호를 보내 가습기를 작동하고, 0은 릴레이에 LOW신호를 보내 가습기를 끈다.
소스코드
DHT_FAKE_PUBLISHER.PY
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from std_msgs.msg import Float32MultiArray
from hanium_interface.msg import GetDht //사용자 정의 인터페이스를 사용한다.
//GetDht.msg 의 형태는 다음과 같다.
//Float32 hum
//Float32 temp
class dht_fake(Node):
def __init__(self):
super().__init__('dht_fake')
self.declare_parameter('qos_depth', 10)
qos_depth = self.get_parameter('qos_depth').value
//QOS 인수 설정
QOS_RKL10V = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=qos_depth,
durability=QoSDurabilityPolicy.VOLATILE)
//퍼블리셔 선언
self.dht_publisher = self.create_publisher(
GetDht,//사용자 정의 인터페이스
'dht_sensor_fake',//토픽 이름
QOS_RKL10V)
//1초마다 매서드 반복
self.timer = self.create_timer(1.0, self.publish_dht_sensor)
def publish_dht_sensor(self)://반복하여 퍼블리시할 내용
msg = GetDht() //사용자 정의 인터페이스를 msg로 선언
msg.hum = 53.5 //임의의 습도값을 지정
msg.temp = 23.5 //임의의 온도값을 지정
self.dht_publisher.publish(msg) //메세지를 퍼블리싱 한다
self.get_logger().info('습도: {0} 온도: {1}'.format(msg.hum,msg.temp))//로그로 확인
def main(args=None):
rclpy.init(args=args)
try:
dhtfake = dht_fake() //dhtfake인스턴스로 클래스 사용
try:
rclpy.spin(dhtfake) //spin으로 인터럽트때까지 반복
except KeyboardInterrupt:
dhtfake.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
dhtfake.destroy_node()
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
RELAY_PUB.PY
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from rclpy.callback_groups import ReentrantCallbackGroup
from std_msgs.msg import Int16
from std_msgs.msg import Float32
from hanium_interface.msg import GetDht
class HumSubscriber(Node):
command = 0
def __init__(self):
super().__init__('HumsubScriber')
self.hum = 0.0
QOS_RKL10V = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
durability=QoSDurabilityPolicy.VOLATILE)
self.subscribe_hum = self.create_subscription(
GetDht,
'dht_sensor_fake',
self.get_hum_value,
QOS_RKL10V
)
self.publisher_relay = self.create_publisher(
Int16,
'relay_state',
QOS_RKL10V,
)
def get_hum_value(self, msg):
//값을 서브스크라이브 한다.
self.hum = msg.hum
self.command = self.commander(self.hum)
self.get_logger().info('습도 값은 : {0}, 릴레이 조작값은 : [{1}]'.format(self.hum,self.command))
//값을 퍼블리싱 한다.
msg2 = Int16()
msg2.data = int(self.command)
self.publisher_relay.publish(msg2)
self.get_logger().info("릴레이 조작값을 보냅니다. {0}".format(msg2.data))
def commander(self, hum)://받아들인 습도값을 처리하여 0과 1로 반환
if 50.0 <= hum:
self.command = 0
else:
self.command = 1
command = self.command
return command
def main(args=None):
rclpy.init(args=args)
try:
sub_hum = HumSubscriber()
try:
rclpy.spin(sub_hum)
except KeyboardInterrupt:
sub_hum.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
sub_hum.destroy_node()
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
결과
습도값이 50% 이하일 때 릴레이 신호 퍼블리시

습도값이 50% 이상일 때 릴레이 신호 퍼블리시

'개인프로젝트 > 한이음 멘토링' 카테고리의 다른 글
[한이음_ROS2] 수집한 센서 데이터를 CSV 파일로 추출하기 (0) | 2022.08.06 |
---|---|
[한이음_임베디드]우분투에서 라즈베리파이 UART 활성화 (0) | 2022.07.09 |
[한이음_ROS2]오염도에 따른 Fan PWM 제어 노드 (0) | 2022.06.28 |
[한이음_ROS2]온습도 센서 Publisher (0) | 2022.06.19 |