5 분 소요

빅데이터의 분산 처리

쿼리 엔진

SQL-on-Hadoop에 의한 데이터 처리의 구체적인 예로서, ‘Hive’에 의한 구조화 데이터의 생성과 ‘Presto’에 의한 대화식 쿼리에 대해 설명한다.

데이터 마트 구축의 파이프라인

Hadoop에 의한 구조화 데이터의 작성과 이를 이용한 쿼리의 실행이 어떤 것인지를 알기 위해, 하나의 예로서 Hive와 Presto를 결합한 데이터 파이프라인을 예상해보겠다.

우선 처음에 분산 스토리지에 저장된 데이터를 구조화하고 열 지향 스토리지 형식으로 저장한다.
이것은 다수의 텍스트 파일을 읽어 들여 가공하는 부하가 큰 처리가 되기 때문에 Hive를 이용한다.

그리고 완성된 구조화 데이터를 결합, 집계하고 비정규화 테이블로 데이터 마트에 써서 내보낸다.
열 지향 스토리지를 이용한 쿼리의 실행에는 Presto를 사용함으로써 실행 시간을 단축할 수 있다.

Hive에서 만든 각 테이블의 정보는 ‘Hive 메타 스토어(Hive Metastore)’라고 불리는 특별한 데이터베이스에 저장된다.
이것은 Hive뿐만 아니라 다른 SQL-on-Hadoop의 쿼리 엔진에서도 공통의 테이블 정보로 참고된다.

Hive에 의한 구조화 데이터 작성

우선 Hive를 사용하여 구조화 데이터를 작성한다.
다음과 같이 단말 터미널에서 Hive를 시작하고 CREATE EXTERNAL TABLE로 ‘외부 테이블(external table)’을 정의한다.

$ Hive

CREATE EXTERNAL TABLE access_log_csv(
    time string, request string, status int, bytes int
)

ROW FORMAT SETDS 'org.apache.hadoop.hive.serde2.OpenCSVSerde'

STORED AS TEXTFILE LOCATION '/var/log/access_log/'

TBLPROPERTIES ('skip.header.line.count'='1');

‘외부 테이블’이란 Hive의 외부에 있는 특정 파일을 참고해 마치 거기에 테이블이 존재하는 것처럼 읽어 들이기 위해 지정한다.
위의 예에서는 ‘access_log_csv’라는 테이블명을 참고해 데이터를 추출함으로써 텍스트 파일이 로드되고 구조화 데이터로의 변환이 이루어진다.

Hive를 비롯한 대부분의 SQL-on-Hadoop의 쿼리 엔진은 MPP 데이터베이스처럼 데이터를 내부로 가져오지 않아도 텍스트 파일을 그대로 집계할 수 있다.
예를 들어, 다음과 같이 쿼리를 실행하면 외부 테이블로 지정한 경로에 포함된 모든 CSV 파일이 로드되고 집계된다.

SELECT status, count(*) cnt
FROM acess_log_csv GROUP BY status LIMIT 2;
OK
200    1701534
302    46573
Time taken: 8.664 seconds, Fetched: 2 row(s)

이처럼 데이터를 그 자리에서 바로 집계할 수 있는 성질은 특히 애드 혹 데이터를 분석하기에 유용하며, 시간을 들여 데이터를 전송하지 않고도 원하는 정보를 얻을 수 있다.

그렇지만 CSV 파일을 그대로 집계하는 것은 비효율적이다.
쿼리를 실행시킬 때마다 매번 텍스트를 읽어 들이기 때문에 확실히 빠르다고는 말할 수 없다.
이렇게 된다면 너무 느리기 때문에 열 지향 스토리지로 변환한다.

열 지향 스토리지로의 변환 (데이터 집계의 고속화(배치형 쿼리 엔진용))

테이블을 열 지향 스토리지 형식은 ORC 형식으로 변환한다.
Hive의 경우, 테이블마다 스토리지 형식을 지정할 수 있다.

:: ORC 형식의 테이블 'access_log_orc'로 변환

CREATE TABLE access_log_orc STORED AS ORC AS
SELECT cast(time AS timestamp) time,
       request,
       status,
       cast(bytes AS bigint) bytes
FROM access_log_csv;
:: ORC 형식의 테이블 집계

SELECT status, count(*) cnt
FROM access_log_orc GROUP BY status LIMIT 2;
OK
200    1701534
302    46573
Time taken: 1.567 seconds, Fetched: 2 row(s)

이처럼 텍스트 데이터를 열 지향 스토리지로 변환함으로써 데이터의 집계가 크게 고속화된다.
그러나 그것의 작성은 시간이 걸리는 프로세스이므로, Hive와 같은 배치형의 쿼리 엔진에서 실행하는 데 적합하다.

원래 데이터가 텍스트이든 스키마리스 데이터이든 간에 그것이 Hive에서 읽을 수 있는 형식이라면 무엇이든지 쿼리를 조금 고쳐 쓰는 것만으로 어떤 테이블이라도 만들 수 있다.
이것이 Hive를 이용한 데이터 구조화 프로세스다.

Hive로 비정규화 테이블을 작성하기

데이터의 구조화가 완료되면 다음은 데이터 마트의 구축이다.
즉, 테이블을 결합 및 집약해서 ‘비정규화 테이블’을 만든다.

시간이 걸리는 배치 처리는 원칙적으로 Hive를 사용해야 한다.

서브 쿼리 안에서 레코드 수 줄이기 (초기 단계에서 팩트 테이블을 작게 하기)

Hive 쿼리는 SQL과 매우 유사하지만, 그 특성은 일반적인 RDB와는 전혀 다르다.

:: 1) 비효율적인 쿼리의 예
:: 테이블을 결합한 후에 WHERE 로 검색
SELECT
FROM access_log a
JOIN users b ON b.id = a.user_id
WHERE b.created_at = '2017-01-01'

:: 2) 보다 효율적인 쿼리의 예
SELECT ...
FROM (
:: 처음에 시간으로 팩트 테이블 검색
  SELECT * access_log
  WHERE time >= TIMESTAMP '2017-01-01 00:00:00'
) a
JOIN users b ON b.id = a.user_id
WHERE b.created_at = '2017-01-01'

‘1)’과 같은 쿼리의 경우, 팩트 테이블(“access_log”)과 디멘전 테이블(“user”)을 결합하고 WHERE로 조건을 부여하는 간단한 쿼리이지만 비효율적이다.
팩트 테이블을 필터링할 조건이 아무것도 없기 때문에, 이대로는 모든 데이터를 읽어 들인 후에 결합하고 이후에 나오는 WHERE에 의한 검색을 하게 된다.

그 결과 대량의 중간 데이터가 생성되고, 그 대부분을 그냥 버려 낭비가 큰 처리가 된다.

‘2)’처럼 서브 쿼리 안에서 팩트 테이블을 작게 하는 것이 확실하다.
‘초기에 팩트 테이블을 작게 하는 것’이 빅데이터의 집계에서 중요하다.

데이터 편향 피하기 (분산 시스템의 성능 발휘를 위해)

고속화를 방해하는 다른 하나의 문제는 ‘데이터의 편차(data skew, 데이터 스큐)’다.

액세스 로그를 집계함으로써 일별 고유 유저 수의 추리를 알고 싶다고 하자.

:: 1) 비효율적인 쿼리의 예
:: distinct count는 분산되지 않는다.
SELECT date, count(distinct user_id) users
FROM access_log GROUP BY date

:: 2) 보다 효율적인 쿼리의 예
SELECT date, count(*) users
FROM (
:: 최초에 중복을 없앤다.
  SELECT DISTINCT date, user_id FROM access_log
) t
GROUP BY date

‘1)’과 같은 쿼리의 경우, distinct count는 분산되지 않아도 GROUP BY에 의한 그룹화는 분산 처리된다.
만약 하루 데이터양에 편차가 있다면 문제가 표면화된다.

날짜가 아니라 웹페이지당 고유 방문자 수를 알고 싶다고 하자.
웹페이지의 조회 수에는 큰 편차가 있기 때문에 distinct count가 극단적으로 늦어지고, 전체적으로 쿼리 실행 시간이 늘어나게 된다.
이것이 데이터의 편향 문제다.

분산 시스템의 성능을 발휘하기 위해서는 이러한 데이터의 편차를 최대한 없애고, 모든 노드에 데이터가 균등하게 분산되도록 해야 한다.
이 예라면 ‘2)’처럼 SELECT DISTINCT로 중복을 제거함으로써 부하를 잘 분산하면서 데이터의 양을 줄일 수 있다.

마찬가지로 테이블의 결합과 ORDER BY에 의한 정렬 등의 구문도 일부 노드에 데이터가 집중되는 것에 의해 편향이 발생한다.

대화형 쿼리 엔진 Presto의 구조 - Presto로 구조화 데이터 집계하기

Hive와 같은 배치형 쿼리 엔진은 작은 쿼리를 여러 번 실행하는 대화형 데이터처리에는 적합하지 않다.
쿼리 실행의 지연을 감소시키는 것을 목적으로 개발된 것이 ‘대화형 쿼리 엔진’이다.

시기 이벤트
2010년 Dremel 논문 발표
2010년 Google BigQuery 발표
2013년 Cloudera Impala 1.0 배포(현 Apache Impala)
2013년 Presto의 오픈 소스화
2015년 Apache Drill 1.0 배포

플러그인 가능한 스토리지 (하나의 쿼리 안에서 여러 데이터 소스에 연결 가능)

Presto의 하나의 특징이 ‘플러그인 가능한 스토리지 설계’다.
Presto는 전용 스토리지를 갖고 있지 않으므로 Hive와 마찬가지로 다양한 데이터 소스에서 직접 데이터를 읽어 들인다.

Presto는 Hive 메타 스토어에 등록된 테이블을 가져올 수 있다.
따라서 Hive에서 만든 구조화 데이터를 좀 더 집계하는 등의 목적에 적합하다.
Presto가 성능을 최대한 발휘하려면 원래 스토리지가 열 지향 데이터 구조로 되어 있어야 한다.

Presto는 특히 ORC 형식의 로드에 최적화되어 있으며, 그것을 확장성이 높은 분산 스토리지에 배치하여 최대의 성능을 발휘한다.

Presto는 Hive 메타 스토어 이외에도 다양한 데이터 소스를 테이블로 참고할 수 있다.

CPU 처리의 최적화 (일기와 코드 실행 병렬 처리)

Presto는 SQL의 실행에 특화된 시스템으로, 쿼리를 분석하여 최적의 실행 계획을 생성하고, 그것을 자바의 바이트 코드로 변환한다.
바이트 코드는 Presto의 워커 노드에 배포되고, 그것은 런타임 시스템에 의해 기계 코드로 컴파일된다.

코드의 실행은 멀티 스레드화되어 단일 머신에서 수백 태스크나 병렬로 실행된다.
열 지향 스토리지에서의 읽기도 병렬화되어 데이터가 도달할 때마다 처리가 진행된다.

인 메모리 처리에 의한 고속화

분산 결합과 브로드캐스트 결합

열 지향 스토리지 집계

데이터 분석의 프레임워크 선택하기 - MPP 데이터베이스, Hive, Presto, Spark

MPP 데이터베이스

Hive

Presto

Spark