본문 바로가기

Big Data

Druid - Design

드루이드는 클라우드 친화적이고 운영하기 쉬운 멀티 프로세스, 분산 아키텍처를 가지고 있다. 각각의 드루이드 프로세스 타입은 독립적으로 유연하게 구성되고 scale 될 수 있다.

이러한 디자인은 fault tolerance 를 증가시켜준다.

Processes and Servers

드루이드는 아래와 같은 몇가지 프로세스 타입들을 가진다.

  • Coordinator : cluster 에서 데이터의 가용성을 관리한다.
  • Overlord : data ingestion workloads 의 할당을 컨트롤한다.
  • Broker : 외부 클라이언트들로부터 유입되는 쿼리들을 처리한다.
  • Router : 이 프로세스는 optional processes 이다. broker, overlord, overlord 들로 가는 쿼리들을 routing 시켜준다.
  • Historical : queryable data 를 저장하는 프로세스이다.
  • MiddleManager : data 를 ingesting 하는데 책임이 있는 프로세스이다.

드루이드 프로세스는 원하는 방식으로 배포가 가능하지만, 일반적으로 Master, Query and Data Processes 로 구성하는것을 추천한다.

  • Master : Coordinator 와 overlord processes 을 동작시킨다. 데이터의 availiability과 ingestion 을 관리한다.
  • Query : Broker 와 Router processes 를 동작시킨다. 외부 클라이언트들로부터 유입되는 쿼리들을 처리한다.
  • Data : Historical and MiddleManager Processes 을 동작시킨다. ingestion workloads 를 수행하고 모든 queryable data 를 저장한다.

External dependencies

위의 세가지 프로세스 타입에 덧붙여서, 드루이드는 또한 세가지 외부 시스템에 대한 디펜던시가 존재한다.

이것은 기존의 인프라가 존재하는 경우 기존의 인프라를 활용하기 위함이다.

Deep stroage

shared file system 는 클러스터 내의 모든 드루이드 서버들에서 접근이 가능하다. 보통 여기에는 S3 나 HDFS, network mounted filesystem 과 같은 분산 저장소가 사용된다. single-server deployment 에서는 보통 local disk 가 사용된다. druid 는 system 에 ingestion 되는 data 들을 저장하기 위해 deep storage 를 사용한다.

 

즉, 드루이드는 deep storage 를 오직 데이터의 백업, 백그라운드에서 드루이드 프로세스들끼리의 데이터 전달의 용도로만 사용한다는 것이다. 질의에 응답하기 위해서 historical processes 는 deep storage 부터 읽지 않고, 대신 어떤 쿼리를 serving 하기 전에 local disk 로부터 segment 를 prefetch 하게 된다.

 

이것은 드루이드가 최적의 성능을 가진 query 를 처리하기 위해 deep storage 에 접근할 필요가 없음을 의미한다.

 

그리고 또한, deep stroage 와 historical proceeses 가 접근하는 영역의 모든 disk space 의 여유공간이 충분히 있어야 함을 의미한다.

deep storage 는 드루이드의 유연하고 fault-tolerant design 을 위해 매우 중요한 부분이다. 드루이드는 만약 모든 데이터 서버가 데이터를 잃어버렸다 하더라도 deep storage 를 통해 다시 데이터들을 로드(bootstrap) 할 수 있다.

Metadata storage

metadata storage 는 세그먼트 사용 정보나 task 정보등과 같이 다양하게 공유되는 시스템 메타데이터를 저장한다. 클러스터 배포 환경에서 이 db 는 mysql, postgreSQL 과 같은 RDBMS 가 사용된다. 로컬에서는 더비디비가 사용된다.

Zookeeper

내부적인 서비스 discovery, coordination, 그리고 leader election 등과 같은 작업을 하기 위해 사용된다.

Architecture diagram

위의 Master/Query/Data server 구성에 대해 아래 그림을 보면 이해가 더 쉬울 것이다.

Storage design

datasources and segments

드루이드의 데이터는 datasource 에 저장된다. (이 datasource 는 전통적인 RDBMS 에서의 테이블이다). 각각의 datasource 는 기본적으로 time 에 의해 partition 되며, 부가적으로 다른 속성들로 추가적으로 파티션할 수 있다.

 

이 때, 각각의 time range 를 chunk 라고 부른다. chunk 안에서 data 는 하나 또는 그 이상의 segments 들로 partition 된다.

각각의 segment 는 보통 수백만 row 들로 구성되는 single file 이다. segments 가 time chunk 로 구성되기 때문에, 때때로 segment 들을 살아있는 타임라인으로 생각하는것이 도움이 된다.

 

아래 그림을 보면 시간별로 chunk 가 존재하고 그 안에 partition(segments) 가 있음을 알 수 있다.

 

 

 

데이터소스는 수백만 개의 세그먼트를 가질 수 있다. 각각의 세그먼트는 MiddleManager 에 의해 만들어지며, 이 시점에는 변경가능하고 uncommitted 된 상태이다.

세그먼트 building process 는 다음을 따른다. (이 단계들은 데이터 파일들을 컴팩트 하고 fast queries 가 가능하도록 디자인된 단계이다)

  • 컬럼형 포맷으로 변경
  • bitmap indexes 와 함게 인덱싱
  • 다양한 알고리즘으로 압축
    • Dictionary encoding with id storage minimization for String columns
    • Bitmap compression for bitmap indexes
    • Type-aware compression for all columns

정기적으로, 세그먼트들은 커밋되고 publish 된다. 그리고 이 시점에 deep storage 에 쓰여지게 되며 immutable 한 형태로 변경된다. 그 후 MiddleManager 에서 Historical Processes 로 이동된다.

그리고 segment 에 관련된 항목도 메타데이터 저장소에 쓰여진다.

이 항목은 세그먼트의 스키마, 크기 및 딥 스토리지에서의 위치 등을 포함한 세그먼트에 대한 자체 설명 메타 데이터 비트이다. 이러한 항목은 코디네이터가 클러스터에서 사용할 수있는 데이터를 알기 위해 사용하는 것이다.

Indexing and handoff

Indexing 은 새로운 세그먼트가 만들어지는 것에 의한 메커니즘이다. 그리고 handoff 는 세그먼트가 publish 되고 Historical process 에 의해 serve 되는것에 의한 메커니즘이다. 이와 같이 이 매커니즘은 indexing side 에서 동작한다.

  1. Indexing task 가 새로운 세그먼트를 빌딩하고 시작한다. 그것을 빌딩하기 전에 반드시 세그먼트의 identifier 를 결정하는 것이 필요하다.
    1. appending 하는 task 의 경우, (kafka task 라던지, append mode 에서의 index task) 기존에 존재하는 segments 에 새로운 partition 을 add 하기위해 Overload 의 allocate API 를 호출함으로써 설정될것이다.
    2. overwriting 하는 task 의 경우 (Hadoop task or append mode 가 아닌 index task), 이것은 interval 에 locking 을 걸고, 새로운 버전과 새로운 segments 를 생성함으로써 수행될 것이다.
  2. 만약 indexing task 가 kafka 와 같이 realtime task 라면, 이 시점에 segment 는 즉각 queryable 한 상태가 된다. (available 하지만 unpublished 된 상태이다.)
  3. Indexing task 가 segment 로부터 데이터를 읽는것을 끝마치고 나서, deep storage 에 segment 를 push 하고, metadata store 안에 record 를 기록하는 것이 의해서 publish 된다.
  4. 만약 indexing task 가 realtime task 라면, 이 시점에서 Historical Process 가 segment 를 load 하기 까지 기다린다. 반대로 indexing task 가 realtime task 가 아니라면, 그것은 즉시 exit 된다.

Coordinator , Historical side 에서는 다음과 같다.

  1. Cooridinator 는 새로운 published segements 를 찾기 위해 metadatastore 를 주기적으로 확인한다 (디폴트 1분)
  2. Coordinator 가 published 되고 used 된 segments 를 찾았을때, Coordinator 는 segments 를 load 할 Historical process 를 찾아내고, 로드할것을 지시한다.
  3. Historical 은 segments 를 load 하고 serving 하기 시작한다.
  4. 이 시점에서 만약 indexing task 가 handoff 를 기다리는 중이라면, 그것은 종료될 것이다.

Segment identifiers

모든 segments 는 4가지 부분으로 이루어진 identifier 를 가진다.

  1. Datasource name
  2. Time interval (for the time chunk containing the segment; this corresponds to the )
  3. Version number (일반적으로 segment 가 처음시작될때 설정된 ISO8601 형식의 timestamp 값)
  4. Partition number (an integer, unique within a datasource+interval+version; may not necessarily be contiguous.)

예를 들어, 아래 text 는 datasource 가 clarity-cloud0 이고, time chunk 가 2018-05-21T16:00:00.000Z/2018-05-21T17:00:00.000Z 이며 version 이 2018-05-21T15:56:09.909Z , partition number 가 1 인 text 이다.

clarity-cloud0_2018-05-21T16:00:00.000Z_2018-05-21T17:00:00.000Z_2018-05-21T15:56:09.909Z_1

추가로 아래 예제와 같이, 0 번째 partition number 를 가진 segment (chunk 에서 첫번째 파티션)는 partition number 를 생략한다. (위의 예제와 동일하지만 partition 1 이 0 으로 바뀌었다.)

clarity-cloud0_2018-05-21T16:00:00.000Z_2018-05-21T17:00:00.000Z_2018-05-21T15:56:09.909Z

Segment lifecycle

각각의 segment 들은 다음 세가지 영역을 포함한 lifecycle 을 가진다.

  1. Metadata store : segment 가 만들어질 때, metadata store 에 segment metadata (일반적으로 수 KB 정도의 작은 JSON payload 이다. ) 가 저장된다. segment 에 관련된 record 를 metadata store 에 저장하는 행위를 publishing 이라고 부른다. 이러한 metadata record 는 used 로 이름지어진 boolean flag 를 가지는데, segment 를 queryable 한 상태로 둘지 말지 를 결정하는데 이 falg 가 사용된다. realtime task 에 의해 만들어진 segment 들은 publish 되기 전에 바로 사용 가능한데, 이유는 그 segment 들이 오직 완료되고 data 의 additional rows 를 받아들이지 않을
  2. Deep Storage : segment 가 만들어질때, segment data file 들을 deep storage 에 push 한다.
  3. Availability for querying : Druid data server 에서 segment query 가 가능한 상태

sys.segments table 에서 Druid SQL 을 사용함으로써 현재 active 한 segments 의 stats 를 검사할 수 있다. 그것은 다음 flag 들을 포함한다.

  • is_published : segment metadata 가 metadata store 에 push 되고 used falg 가 ture 일때, True
  • is_available : realtime task 또는 Historical process 에서 segment 가 현재 querying 이 가능한 상태이면, True
  • is_realtime : segment 가 오직 realtime tasks 에서만! 이용 가능하다면 , True. realtime ingestion 을 사용하는 datasource 에서 이 값은 일반적으로 true 로 설정되어 시작하며, segment 가 publish 되고 hand off 된 상태가 되면 false 로 값이 바뀌게 된다.
  • is_overshadowed : segment 가 used : true 의 값을 가지고 publish 되며, 몇몇의 다른 segment 들로 인해 완전히 overshadowed 된 상태일 경우에 True. 일반적으로 이것은 transient stats 이다. 그리고, 이 상태에 있는 segment 들은 곧 자동적으로 그들의 used flag 값을 false 로 설정하게 될 것이다.

Availability and consistency

위에 indexing and handoff topic 에서 살펴봤듯이, druid 는 ingestion 과 querying 이 아키텍처적으로 분리되어 있다. 이것은 Druid 의 availability 와 consistency 속성을 이해할때, 우리가 각각 컴포넌트의 기능을 독자적으로 살펴봐야 하는 이유이다.

* Ingestion side

ingestion 의 측면에서, druid 의 주된 ingestion 방식은 all pull-based 와 트랜잭션의 보장을 제공하는 것이다.

즉, all-or-nothing 의 방식으로 publish 가 일어남을 보장한다.

  • supervised 되는 "seekable-stream(검색 가능한 스트림)" ingestion 방법 (카프카 or Kinesis). 이 방식에서 druid 는 strean offset 을 metadata store 에 같은 트랜잭션의 segment metadata 옆에 나란히 commit 한다. 아직 publish 되지 않은 data 의 ingestion 은 ingestion task 가 실패할 경우 rollback 될 수 있다. 이 경우, 부분적으로 ingestion 된 data 들은 버려지게 된다. 그리고 druid 는 stream offset 의 마지막 commit 된 부분으로부터 다시 ingestion 을 시작할 것이다. 이것은 publish 행위가 정확히 한번만! 일어남을 보장해준다.
  • Hadoop-based batch ingestion. 각각의 task 들은 모든 segment metadata 들을 single transaction 에 publish 한다.
  • Native-batch ingestion. parallel mode 에서, subtask 가 끝난후에, supervisor task 는 모든 segment metadata 를 single transaction 으로 publish 한다. 반대로 simple mode 에서는 single task 가 모든 segment metadata 를 single transaction 으로 publish 한다.

streaming ingestion 방식 중 하나인 Tranquility 는 더 이상 추천되지 않는다. (이 녀석은 transactional loading 을 수행하지 않는다.) → 즉 로딩 시점에 트랜잭션 보장을 못한다.

부가적으로, 몇몇의 ingestion 방식은 멱등성(idempotency) 을 보장해준다. 이것은 같은 ingestion 의 반복되는 실행이 중복 데이터를 만들어내지 않음을 의미한다.

  • Supervised "seekable-stream" ingestion methods 는 멱등성을 보장한다. stream offset 과 segment metadata 가 locking 시점에 함께 저장되고 업데이트되기 때문이다.
  • Hadoop-based batch ingestion 은 input source 가 druid datastore 에 이미 ingestion 되어 있는 데이터와 같은 경우를 제외하고 멱등성을 보장한다. 위와 같은 경우에는 이미 존재하는 데이터를 overwrite 하는 대신에 adding 하기 때문에 같은 task 를 두번 실행했을시에 멱등성을 보장하지 않는다.
  • Natvie batch ingestion 은 appendToExisting 값이 true 이거나, 이미 ingestion 된 data 를 또 ingestion 하는 경우를 제외하고는 멱등성을 보장한다. 위의 두 가지 경우에서는 마찬가지로 이미 존재하는 데이터를 overwrite 하는 대신에 adding 하기 때문에 같은 task 를 실행하는 것은 멱등성을 보장하지 않는다.

* Query side

query 의 측면에서, druid broker 는 입력받은 query 에 해당하는 segment 들의 consistency 를 보장하는데 책임이 있다. druid broker 는 쿼리가 시작될때 현재 사용가능한 것을 기준으로 사용할 적당한 segment 들을 선택한다.

 

이것은 atomic replacement 에 의해 지원되는데, atomic replacement 란 유저의 관점에서, 쿼리가 old data 와 newer data 를 consistency 나 performance 를 해치지 않고 순간적으로 바꾸는 것을 보장해주는 featrure 이다. 이것은 appendToExisting 값이 false 일때, Hadoop-based batch ingestion, native batch ingestion 에서 사용되고 압축된다.

 

atomic replacement 는 각각의 time chunk 마다 개별적으로 발생한다 만약, batch ingestion task 나 compaction 이 여러개의 time chunk 를 포함한다면, 그때 각각의 time chunk 는 작업이 끝난 이후에 곧 atomic replacement 를 겪게 될 것이다. 그러나 모든 replacement 가 동시에 일어나지는 않을 것이다.

 

전형적으로, druid 에서 atomic replacement 는 segment version 들과 결합되어 동작하는 core set concept 을 기반으로 한다. time chunk 가 overwritten 되었을 때, 새로운 segment 의 core set 은 더 높은 verision number 로 만들어진다. core set 은 broker 가 older set 대신 그것들을 사용하기 전에, 반드시 모두 available 한 상태여야 한다. 이러한 속성들은 함께 동작해서 druid 의 atomic replacement 를 보장해준다.

 

또한 하나의 version 과 하나의 time chunk 에서 오로지 하나의 core set 만이 있을 수 있다.

druid 는 또한 forceTimeChunkLock 을 통해 설정할 수 있는 실험적인 기능인 segment locking mode 를 제공한다. 이하 실험적 기능의 상세한 설명은 생략.

Query processing

쿼리들은 처음에 broker 로 들어가게 된다. broker 는 쿼리와 관련하여 어느 segment 가 data 를 가지고 있는지를 찾게 된다. segments 의 list 들은 항상 시간에 의해 partition 되며, 그 외 우리가 직접 지정한 속성을 가지고도 partition 되게 된다. 또한, broker 는 어느 HistoricalsMiddleManagers 가 선택된 segment 들을 serving 할 것인지를 확인한다. 그리고 재 작성된 subquery (들어온 쿼리를 파싱해서 알맞은 형태로 재작성 함) 들을 각 process 들로 보내게 된다.

 

그러면, Historical,MiddleManager processes 는 쿼리를 가져와서, 처리하고, 결과를 리턴하게 된다.

Broker 는 결과를 받고 최종 결과를 도출하기 위해 그들을 서로 머지한다. 그리고 최초 요청한 client 에세 결과를 return 하게 된다.

Broker pruning 은 Druid 가 각각의 쿼리에서 스캔하는 데이터의 양을 제한하는데 있어서 아주 중요한 방법이다.

 

(그러나 유일한 방법은 아니다.) Broker 가 pruning 을 위해 사용하는 것보다 더 세분화된 레벨의 필터링을 위해 각각의 segment 안에 있는 indexing 구조는 druid 가 row data 를 찾기 전에 filter set 에 match 되는 rows 가 어느 것인지를 찾아내도록 해준다. Druid 가 일단 특정쿼리에 매치되는 row 들을 알게되면, 오직 query 에서 필요한 특정 컬럼에만 접근하면 된다.

그러한 컬럼들에 접근함으로써, druid 는 query filter 에 매치되지 않는 data 를 읽는 것을 피하기 위해서 row 와 row 사이를 skip 할 수 있다.

 

 

정리해보면, Druid 는 query performance 를 maximize 하기 위해 3가지 테크닉을 사용한다.

  1. 각각의 쿼리에 대해 접근되는 segments 들을 Pruning
  2. 각각의 segment 안에서, index 를 사용해서 어느 row 를 access 해야만 하는지를 확인
  3. 각각의 segment 안에서, 지정된 쿼리에 연관된 오직 특정 row 와 column 만 읽는다.

'Big Data' 카테고리의 다른 글

[Druid][Tutorial] Writing an ingestion spec  (0) 2020.11.24
Druid - Technology Overview  (0) 2020.10.19
Druid - Use Cases  (0) 2020.10.19