Data

[Flink] Checkpoint, Savepoint

코딩콩 2025. 7. 15. 12:07

Checkpoint, Savepoint가 필요한 이유

  • 장애 복구: 예상치 못한 장애 발생 시 이전 상태로 자동 복구
  • 계획된 재시작: 배포, 스케일링, 유지보수 등으로 작업을 중단했다가 재시작할 때

Checkpoint

Flink가 자동으로 관리하는 상태 스냅샷

장점

  • 자동 생성 및 관리
  • 빠른 생성과 복원
  • 장애 발생 시 자동 복구

단점

  • 최신 몇 개만 보존 (기본값 1개)
  • 작업 그래프 변경 시 호환성 제한
  • Flink 버전 업그레이드 시 제약

 

체크포인트 활성화 방법

1. 코드로 설정

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 30초마다 체크포인트 생성
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);

// 상태 백엔드 설정
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints/");

// 작업 취소 시에도 체크포인트 보존
env.getCheckpointConfig()
   .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2. properties 파일로 관리

# application.properties
execution.checkpointing.interval= 30s
execution.checkpointing.mode= EXACTLY_ONCE
execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION
state.backend.type= hashmap
state.checkpoints.dir= s3://my-bucket/checkpoints/

아래 코드로 불러오기

    private static void loadApplicationProperties(Configuration conf) {
        try (InputStream input = UserFeatureJob.class.getClassLoader().getResourceAsStream("application.properties")) {
            Properties properties = new Properties();
            properties.load(input);

            // 모든 properties를 Configuration에 추가
            for (String key : properties.stringPropertyNames()) {
                String value = properties.getProperty(key);
                if (value != null && !value.trim().isEmpty()) {
                    conf.setString(key, value);
                }
            }
        } catch (Exception e) {
            System.err.println("Failed to load application.properties: " + e.getMessage());
        }
    }

 

Savepoint

사용자가 수동으로 생성하는 상태 백업

장점

  • 영구 보존 (사용자가 삭제할 때까지)
  • 높은 호환성 (버전 업그레이드, 그래프 변경 등)
  • 정확한 시점 보장

단점

  • 수동 관리 필요
  • 생성 시간이 오래 걸림
  • 저장 공간 더 많이 사용

Savepoint 저장 방법

# 정규 세이브포인트 생성
bin/flink savepoint --type canonical <job-id> s3://my-bucket/savepoints/

# 네이티브 세이브포인트 생성
bin/flink savepoint --type native <job-id> s3://my-bucket/savepoints/

# 작업 중지와 함께 세이브포인트 생성
bin/flink stop --savepointPath s3://my-bucket/savepoints/ <job-id>
curl -X POST http://localhost:8081/jobs/aa4a736de5c0e9384ccb833fb66f528c/savepoints\
-H "Content-Type: application/json"\
-d '{"target-directory": "/tmp/flink-savepoints-new"}'

curl도 된다.

Savepoint type canonical와 native 차이

canonical(default)

  • Flink 표준 형식으로 저장
  • 모든 상태 백엔드에서 읽기 가능
  • 높은 호환성과 이식성

canonical 언제 사용?

  • 프로덕션 배포 전 안전한 백업
  • Flink 버전 업그레이드
  • 상태 백엔드 변경 (HashMap ↔ RocksDB)
  • 장기 보존이 필요한 경우

native

  • 상태 백엔드의 원본 형식으로 저장
  • 같은 백엔드에서만 읽기 가능
  • 빠른 생성과 복원

native 언제 사용?

  • 개발/테스트 환경에서 빠른 재시작
  • 같은 설정 내에서 임시 백업
  • 리스케일링만 필요한 경우

Savepoint 사용

1. CLI

bin/flink run -s s3://my-bucket/checkpoints/job-abc123/chk-150 my-job.jar

2. Properties 사용

# application.properties
execution.savepoint.path=s3://my-bucket/checkpoints/job-abc123/chk-150

위 loadApplicationProperties()로 적용될 예정

 

시나리오 테스트

이벤트를 발행하고 -> job을 종료한다. -> 다시 들어와서 이벤트 발행

위 테스트는 동일하고 활성화 여부로 결과 확인할 예정

체크포인트를 활성화하지 않았을 때

아래가 checkpoint default 설정인가보다

이벤트를 발행하고 -> job을 종료한다. -> 다시 들어와서 이벤트 발행

윈도우 닫혔지만 결과 발행되지 않음.(10분 세션 테스트, session_features가 없음)

 

체크포인트를 활성화 했을 때

250715 업데이트.

아래 테스트는 실패다. 다시 해보니 체크포인트 복구가 안 된다. 로컬에서는 JOB ID가 재발행 돼서 이전에 생성된 체크포인트를 읽어오지 않는다...

아래 코드가 체크포인트 복구처럼 보인 이유는?

컨슘하고 체크포인트 저장 전(30초)에 종료해버림 -> offset 커밋 안 됨 -> 재시작 시 모든 이벤트 읽음 -> 복구된 거처럼 보임!

execution.checkpointing.interval=30000
execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION
state.backend.type=filesystem
state.checkpoints.dir=file:///tmp/flink-checkpoints
execution.savepoint.path=

이벤트를 발행하고 -> job을 종료한다. -> 다시 들어와서 이벤트 발행

결론 : 윈도우 닫혀서 세션 피처 생성된 것 확인 완료

 

참고

https://zjffdu.gitbook.io/flink-on-zeppelin/checkpoint-and-savepoint

 

Checkpoint & Savepoint | Flink on Zeppelin

Checkpoint & Savepoint Checkpoint and savepoint is used for store flink job state, they are indispensable for flink job if you want to use flink in production. There're 2 scenarios that you will use checkpoint and savepoint. In fail over, flink will read t

zjffdu.gitbook.io