[Flink] Checkpoint, Savepoint
Checkpoint, Savepoint가 필요한 이유
- 장애 복구: 예상치 못한 장애 발생 시 이전 상태로 자동 복구
- 계획된 재시작: 배포, 스케일링, 유지보수 등으로 작업을 중단했다가 재시작할 때
Checkpoint
Flink가 자동으로 관리하는 상태 스냅샷
장점
- 자동 생성 및 관리
- 빠른 생성과 복원
- 장애 발생 시 자동 복구
단점
- 최신 몇 개만 보존 (기본값 1개)
- 작업 그래프 변경 시 호환성 제한
- Flink 버전 업그레이드 시 제약
체크포인트 활성화 방법
1. 코드로 설정
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 30초마다 체크포인트 생성
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
// 상태 백엔드 설정
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints/");
// 작업 취소 시에도 체크포인트 보존
env.getCheckpointConfig()
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
2. properties 파일로 관리
# application.properties
execution.checkpointing.interval= 30s
execution.checkpointing.mode= EXACTLY_ONCE
execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION
state.backend.type= hashmap
state.checkpoints.dir= s3://my-bucket/checkpoints/
아래 코드로 불러오기
private static void loadApplicationProperties(Configuration conf) {
try (InputStream input = UserFeatureJob.class.getClassLoader().getResourceAsStream("application.properties")) {
Properties properties = new Properties();
properties.load(input);
// 모든 properties를 Configuration에 추가
for (String key : properties.stringPropertyNames()) {
String value = properties.getProperty(key);
if (value != null && !value.trim().isEmpty()) {
conf.setString(key, value);
}
}
} catch (Exception e) {
System.err.println("Failed to load application.properties: " + e.getMessage());
}
}
Savepoint
사용자가 수동으로 생성하는 상태 백업
장점
- 영구 보존 (사용자가 삭제할 때까지)
- 높은 호환성 (버전 업그레이드, 그래프 변경 등)
- 정확한 시점 보장
단점
- 수동 관리 필요
- 생성 시간이 오래 걸림
- 저장 공간 더 많이 사용
Savepoint 저장 방법
# 정규 세이브포인트 생성
bin/flink savepoint --type canonical <job-id> s3://my-bucket/savepoints/
# 네이티브 세이브포인트 생성
bin/flink savepoint --type native <job-id> s3://my-bucket/savepoints/
# 작업 중지와 함께 세이브포인트 생성
bin/flink stop --savepointPath s3://my-bucket/savepoints/ <job-id>
curl -X POST http://localhost:8081/jobs/aa4a736de5c0e9384ccb833fb66f528c/savepoints\
-H "Content-Type: application/json"\
-d '{"target-directory": "/tmp/flink-savepoints-new"}'
curl도 된다.
Savepoint type canonical와 native 차이
canonical(default)
- Flink 표준 형식으로 저장
- 모든 상태 백엔드에서 읽기 가능
- 높은 호환성과 이식성
canonical 언제 사용?
- 프로덕션 배포 전 안전한 백업
- Flink 버전 업그레이드
- 상태 백엔드 변경 (HashMap ↔ RocksDB)
- 장기 보존이 필요한 경우
native
- 상태 백엔드의 원본 형식으로 저장
- 같은 백엔드에서만 읽기 가능
- 빠른 생성과 복원
native 언제 사용?
- 개발/테스트 환경에서 빠른 재시작
- 같은 설정 내에서 임시 백업
- 리스케일링만 필요한 경우
Savepoint 사용
1. CLI
bin/flink run -s s3://my-bucket/checkpoints/job-abc123/chk-150 my-job.jar
2. Properties 사용
# application.properties
execution.savepoint.path=s3://my-bucket/checkpoints/job-abc123/chk-150
위 loadApplicationProperties()로 적용될 예정
시나리오 테스트
이벤트를 발행하고 -> job을 종료한다. -> 다시 들어와서 이벤트 발행
위 테스트는 동일하고 활성화 여부로 결과 확인할 예정
체크포인트를 활성화하지 않았을 때
아래가 checkpoint default 설정인가보다
이벤트를 발행하고 -> job을 종료한다. -> 다시 들어와서 이벤트 발행
윈도우 닫혔지만 결과 발행되지 않음.(10분 세션 테스트, session_features가 없음)
체크포인트를 활성화 했을 때
250715 업데이트.
아래 테스트는 실패다. 다시 해보니 체크포인트 복구가 안 된다. 로컬에서는 JOB ID가 재발행 돼서 이전에 생성된 체크포인트를 읽어오지 않는다...
아래 코드가 체크포인트 복구처럼 보인 이유는?
컨슘하고 체크포인트 저장 전(30초)에 종료해버림 -> offset 커밋 안 됨 -> 재시작 시 모든 이벤트 읽음 -> 복구된 거처럼 보임!
execution.checkpointing.interval=30000
execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
state.backend.type=filesystem
state.checkpoints.dir=file:///tmp/flink-checkpoints
execution.savepoint.path=
이벤트를 발행하고 -> job을 종료한다. -> 다시 들어와서 이벤트 발행
결론 : 윈도우 닫혀서 세션 피처 생성된 것 확인 완료
참고
https://zjffdu.gitbook.io/flink-on-zeppelin/checkpoint-and-savepoint
Checkpoint & Savepoint | Flink on Zeppelin
Checkpoint & Savepoint Checkpoint and savepoint is used for store flink job state, they are indispensable for flink job if you want to use flink in production. There're 2 scenarios that you will use checkpoint and savepoint. In fail over, flink will read t
zjffdu.gitbook.io