본문 바로가기

AWS/PROJECT

DynamoDB 테이블에 디바이스 데이터 저장하기(Lambda, IoT Core)

728x90

AWS IoT Core로 디바이스의 상태 값을 받으면 이를 저장하기 위해 DynamoDB나 RDS를 사용한다.

여기에선 DynamoDB에 저장해보겠다.

AWS IoT Core Rule의 Action을 통해 DynamoDB에 저장하는 방법도 있지만,

열(column)마다 데이터가 저장되지 않으며, json 형식이 통으로 들어간다는 무시무시한 단점이 있다..

 

그래서 Lambda를 이용해 DynamoDB의 column마다 데이터를 저장하려고 한다.

 

DynamoDB 생성

1. AWS DynamoDB 콘솔 접속 > 테이블 > 테이블 생성 선택

2. 사진을 참고해 다음처럼 입력한다.

 

- 파티션 키 : 데이터의 고유한 식별자로 사용

- 정렬 키 : 시간, 상태, 다른 세부 데이터를 기준으로 파티션 내에서 고유하게 구분하거나 정렬하는 데에 사용

 

 

AWS IoT Core Rule 생성

1. AWS IoT 콘솔 > 메시지 라우팅 중 규칙 > 규칙 생성 선택

2. SQL문에 다음과 같이 입력

SELECT temperature, humidity, barometer, 
wind.velocity as wind_velocity, 
wind.bearing as wind_bearing, 
topic(2) as device_id, timestamp 
FROM 'device/+/data'

 

3. 규칙 작업 에선 Lambda 함수에 메시지 보내기를 선택한다. (함수 생성 방법까진 안 알려줄 거임)

4. 사용할 Lambda 함수를 선택한다.

 

 

Lambda 함수 구성

1. AWS Lambda > 함수 생성을 통해 함수를 생성한다.

필자는 python 3.13 버전을 선택했다.

2. AWS Lambda에 연결된 IAM Role에는 다음 권한(Policy)이 필요하다. 꼭 추가해주자.

{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": "dynamodb:PutItem",
        "Resource": "arn:aws:dynamodb:ap-northeast-2:ACCOUNTNUMBER:table/iot-ddb-test"
    }
}

3. Lambda 함수는 다음 코드를 입력하면 된다.

import boto3

dynamodb = boto3.resource('dynamodb')
table_name = 'iot-ddb-test'
table = dynamodb.Table(table_name)

def lambda_handler(event, context):
    # JSON 데이터 추출
    device_data = event
    print("디바이스 데이터", device_data)

    # 필요한 데이터 추출
    device_id = device_data['device_id']
    temperature = device_data['temperature']
    humidity = device_data['humidity']
    barometer = device_data['barometer']
    wind_velocity = device_data['wind_velocity']
    wind_bearing = device_data['wind_bearing']
    timestamp = device_data['timestamp']
    
    # DynamoDB에 저장
    table.put_item(
        Item={
            'time_stamp': timestamp,        # Partition Key
            'device_id': int(device_id),        # Sort Key
            'temperature': temperature,
            'humidity': humidity,
            'barometer': barometer,
            'wind_velocity': wind_velocity,
            'wind_bearing': wind_bearing
        }
    )
    return {'statusCode': 200, 'body': 'Data saved successfully'}

 

 

IoT Core 테스트

1. AWS IoT Core MQTT Client 탭에서 다음처럼 메시지를 발행한다.

- Topic : device/22/data

- 메시지 페이로드

{
    "temperature": 28,
    "humidity": 80,
    "barometer": 1013,
    "timestamp": "2024-12-30 00:00:30",
    "wind": {
      "velocity": 22,
      "bearing": 255
    }
}

 

2. IoT Core Rule에 메시지가 Lambda에게 전달된다.

3. Lambda의 Cloudwatch log group을 확인하면 다음과 같이 나타나는 것을 볼 수 있다.

4. DynamoDB에서 테이블에 데이터가 잘 들어갔는지 확인한다.

 

 

대량으로 처리하려면

IoT Core Rule -> Kinesis Data Streams -> Lambda -> DynamoDB 순서로 워크플로우를 구성하는 것이 좋다

Lambda는 동시 실행이 1000개 밖에 되지 않기 때문에 이보다 더 많이 필요하면 배치 처리가 가능한 kinesis 서비스를 추가해주는 게 좋다

 

끗.

728x90