DAU 파이프라인 설계

이 글에서는 데이터 사이언티스트에게 제공하기 위해서 DAU를 추출하는 파이프라인을 직접 설계해보고자 한다.

파이프라인 설계

데이터 수집

먼저, DAU를 추출하기 위해서는 사용자의 APP 사용 여부를 알아야 한다. APP으로부터 사용자의 접속 여부를 알아오는 방법에 대해 생각해 보자.

Message Queue

APP을 진입하면 무조건 특정 API를 호출한다고 가정한다면, 해당 API를 호출 여부를 통해 DAU를 추출할 수 있다.

먼저 백엔드 개발자로부터 해당 API가 호출되면 해당 로그를 전달받아야 한다. 이때 Kafka 또는 Google Pub/Sub과 같은 메시지 큐를 통해 전달받을 수 있다.

Google Pub/Sub은 메모리에 저장하기 때문에 안정성은 낮지만 속도가 빠르다. Kafka는 파일 형태로 디스크에 저장하기 때문에 안정성은 높지만 속도가 Google Pub/Sub보다는 느릴 수 있다. 해당 파이프라인에서는 속도보다는 안정성이 중요하기 때문에 Kafka를 선택하고자 한다.

전달받을 데이터의 스키마는 다음과 같다.

1
2
3
4
{
  "userId": 0,
  "date": "YYYY-MM-DD"
}

Kafka를 통해 전달받은 데이터는 변환하기 전 먼저 DataLake에 저장하고자 한다. 해당 데이터는 DAU뿐만 아니라 MAU나 유저의 하루 APP 실행 수 등 여러 데이터에도 활용될 수 있을 것 같기 때문에 DataLake에 저장하여 필요에 따라 다른 파이프라인에도 같이 사용하고자 한다.

Kafka와 Connector를 사용하여 BigQuery에 바로 적재하고자 한다.

데이터 처리

Tool 선택

전날 APP에 접속한 사용자의 수를 수집하기 위해, 00시마다 BigQuery로부터 데이터를 추출하여 유저의 중복 데이터를 삭제하고 데이터 사이언티스트에게 해당 데이터를 전달하는 작업이 필요하다. 해당 작업은 다양항 방법으로 실행시킬 수 있다.

00시에 실행되는 Server 구현

00시에 실행되는 Server를 구현하는 방법이다.

Google Cloud Console의 Scheduler를 사용하여 00시마다 HTTP를 통해 해당 Server를 Trigger해도 되고, 혹은 해당 Server가 00시마다 실행될 수 있도록 하는 방법이다.

현재, 서버를 어떤 것을 사용하고 있느냐에 따라 방법은 다양하다!

[장점]

  • 필요에 따라 복잡한 로직이나 데이터 처리를 자유롭게 구현할 수 있다.
  • 서버 환경을 원하는 대로 설정하고 관리할 수 있다.

[단점]

  • 서버 인프라를 추가로 관리해야 한다
  • 복잡한 워크플로우나 대규모 데이터 처리에는 제한적일 수 있습니다.
  • 모니터링과 로깅 기능이 제한적일 수 있다.

Airflow DAG 실행

Airflow DAG로 데이터 처리를 진행하는 방법이다.

[장점]

  • DAG(Directed Acyclic Graph)를 사용하여 복잡한 의존성 및 작업 흐름을 명확하게 정의할 수 있다.
  • Airflow에서 모니터링과 로깅 기능을 제공하기 때문에 빠른 대응이 가능하다.

[단점]

  • 초기 설정과 구성과 복잡할 수 있으며, 운영 중 관리가 필요하다.

만약, Airflow를 이미 사용하고 있는 곳이라면 당연히 확장성이 좋고 모니터링이 용이한 Airflow 를 선택하는 것이 더 좋은 선택일 것이다! 하지만 Airflow를 사용하고 있지 않다면 DAU를 추출하는 작업은 복잡한 의존성 혹은 워크플로우가 아니기 때문에 초기 설정과 구성이 복잡하지 않은 00시에 실행되는 Server 구현 방법이 적절할 것이라고 생각한다. 따라서, 00시에 실행되는 Server 구현 방법을 선택하고자 한다.

이때, 모니터링과 로깅 기능을 위해 Log를 설정하고, 오류가 발생할 경우 알람이 보내질 수 있도록 구현하는 것이 필요할 것이다.

최적화

BigQuery에서 데이터를 추출할 때 중요한 것은 쿼리 성능을 최적화하는 것이다. 해당 BigQuery 테이블에는 매일 매일 사용자의 로그가 담겨있을 것이다. 그렇기 때문에 파티셔닝을 사용하여 불필요한 데이터 스캔을 방지하고 쿼리 성능을 향상시키고 비용을 절감하고자 한다. 테이블은 날짜를 기준으로 분할할 것이다.

이때, DAU를 산출하고자 하는 날짜의 로그를 모두 가져와 중복되는 userId를 제거해야 한다.

또한, 대규모 데이터셋일 경우, Spark를 사용하는 것이 도움이 될 수 있다. 만약, Bigqeury의 테이블이 user_id(int - 4바이트), date(date - 4바이트)의 컬럼을 가지고 있고 100만명이 사용하고 있다고 하면 약 8MB로 대규모 데이터셋이라고 하기는 어렵다!

따라서, Spark를 사용하지 않고 파이프라인을 구현하는 것이 좋을 것 같다고 생각한다.

데이터 저장

CronJob을 사용하여 DAU를 산출한 뒤, 이를 데이터 사이언티스트에게 전달해야 한다. 전달하는 방식은 데이터 사이언티스트의 요청에 맞춰 전달해주고 한다. (ex. DB, xlsx, Bigquery)

image