AWS DMS 기반 CDC 구축 도중 발견한 데이터 정합성 오류를 DMS의 메타데이터 컬럼인 AR_H_CHANGE_SEQ 활용하여 해결한 경험을 기록
사전지식
AWS DMS Target Endpoint의 설정이 아래와 같다고 가정했을 때
- CdcMaxBatchInverval: 300
최대 300초(5분) 동안 대기하다가 누적된 변경사항을 전송
- TimestampColumnName: dms_timestamp
변경사항이 target endpoint로 전송 되는 시간은 dms_timestamp 컬럼에 저장한다.
=> 동일한 배치 내에 있는 레코드들은 dms_timestamp가 같다는 의미이다.
예시
id | view_count | updated_at | dms_timestamp |
1 | 101 | 2025-05-17 13:03:00 | 2025-05-17 13:05:00 |
1 | 102 | 2025-05-17 13:03:09 | 2025-05-17 13:05:00 |
id 1의 view_count, updated_at 둘 다 다르지만 dms_timestamp는 같을 수 있다.
문제상황
cdc 파이프라인에서 최신 데이터 판별 로직에 dms_timestamp 컬럼을 사용했다.
다 확인해보지는 않았지만 먼저 생성된 2025-05-17 13:03:00 요 친구로 반영되는 것 같았다.
이렇게 데이터 정합성이 깨지고 있었다.
미봉책
최신 데이터 판별 로직에서 dms_timestamp보다 updated_at 컬럼을 우선하여 보도록 변경했다.
updated_at 컬럼이 없는 테이블도 분명 있으니 불안정한 로직이기는 하지만, 이때까지는 별다른 방법이 없는 줄 알았다.
AWS 전문가 선샌님을 모신 후
사내에서 CDC 프로젝트를 좀 활성화하기 위해 DMS 전문가를 모셔서 세션을 가졌고 해당 문제에 대해서 문의해보았는데,
아주 명쾌한 해결 방법이 있었다. DMS에 기본 메타데이터 컬럼인 AR_H_CHANGE_SEQ를 활용하기.
AR_H_CHANGE_SEQ 이 값은 작업 수준에서 35자리 고유 숫자를 나타냅니다. 처음 16자리는 타임스탬프의 일부이고, 마지막 19자리는 DBMS에서 증분한 record_id 번호입니다.
아래와 같이 dms table_mappings에 추가하면 된다.
{
"rule-type": "transformation",
"rule-id": "2",
"rule-name": "2",
"rule-target": "column",
"object-locator": {
"schema-name": "%",
"table-name": "%"
},
"rule-action": "add-column",
"value": "transact_id",
"expression": "$AR_H_CHANGE_SEQ",
"data-type": {
"type": "string",
"length": 50
}
}
id | view_count | updated_at | dms_timestamp | transact_id |
1 | 101 | 2025-05-17 13:03:00 | 2025-05-17 13:05:00 | 202505171305000000000001 |
1 | 102 | 2025-05-17 13:03:09 | 2025-05-17 13:05:00 | 202505171305000000000002 |
참고
변환 규칙 표현식을 사용하여 열 내용 정의 - AWS 데이터베이스 마이그레이션 서비스
예를 들어, 다음 변환 규칙은 먼저 대상 테이블 employee에 새 문자열 열 emp_seniority를 추가합니다. 급여 열에 SQLite round 함수를 사용하고 이 때 급여가 20,000과 같거나 이를 초과하는지 확인하는 CASE
docs.aws.amazon.com
'Data' 카테고리의 다른 글
[Spark] Delta Merge ConcurrentAppendException 탈출기 (0) | 2025.04.26 |
---|---|
[Spark] Kafka startingTimestamp 옵션 활용 (0) | 2025.04.17 |
[Spark] Delta Lack Change Data Feed (0) | 2025.04.06 |
[Spark] DataFrame API 테스트 하기 (0) | 2025.04.05 |
[Spark] 배치 조회 구현하기 (0) | 2025.03.25 |