Example Data
다음과 같은 데이터가 있다고 가정해보자.
srcIP
: IP address of sendersrcPort
: Port of senderdstIP
: IP address of receiverdstPort
: Port of receiverprotocol
: IP protocol numberpackets
: number of packets transmittedbytes
: number of bytes transmittedcost
: the cost of sending the traffic
{"ts":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":10, "bytes":1000, "cost": 1.4}
{"ts":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":20, "bytes":2000, "cost": 3.1}
{"ts":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":30, "bytes":3000, "cost": 0.4}
{"ts":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":40, "bytes":4000, "cost": 7.9}
{"ts":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":50, "bytes":5000, "cost": 10.2}
{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
{"ts":"2018-01-01T02:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":100, "bytes":10000, "cost": 22.4}
{"ts":"2018-01-01T02:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":200, "bytes":20000, "cost": 34.5}
{"ts":"2018-01-01T02:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":300, "bytes":30000, "cost": 46.3}
위의 Json data 를 quickstart 에 있는 ingestion_tutorial_data.json 에 저장하자.
우리는 이 데이터를 로드하는 ingestion spec 을 작성할 것이다.
이 튜토리얼에서, 우리는 native batch indexing task 를 사용할 것이다. 다른 task 를 사용하게 된다면, 몇가지 영역을 다르게 작성해야 하는데, 이 튜토리얼에서 그러한 포인트들을 짚어줄 것이다.
1. Defining the scheme
druid ingestion spec 의 가장 핵심적인 요소는 dataSchema
이다. dataSchema
는 input data 를 druid 에 저장할 컬럼 set 으로 어떻게 파싱할것인지를 정의한다.
가장 먼저 empty dataSchema
를 생성하고, 튜토리얼을 진행하면서 하나씩 채워나가보자.
quickstart 디렉토리 안에 다음과 같은 컨텐츠를 포함시킨 ingestion-tutorial-index.json 파일을 생성한다.
"dataSchema" : {}
1-1. Datasource name
dataSchema
안의 dataSource
parameter 를 사용하여, datasource name 을 구체화시킨다.
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
}
1-2. Time column
dataSchema
는 input data 로부터 main timestamp field 를 어떻게 추출할 것인지를 아는 것이 필요하다.
위의 예제 데이터에서 우리의 input data 에서 timestamp column 은 ISO 8601 timestamps 포맷인 "ts" 컬럼 이다. 그러니 이 정보를 표현하는 timestampSpec
을 아래와 같이 작성해주자.
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"format" : "iso",
"column" : "ts"
}
}
1-3. Column types
자, 이제 우리는 time column 을 정의했으니, 이제 다른 컬럼들을 정의해보자.
druid 는 String, Long, Float, Double column 을 지원한다. 우리는 이 컬럼들이 어떻게 사용되는지 다음 섹션에서 살펴볼 것이다. 우선 roll-up 먼저 살펴보자.
1-4 Rollup
data 를 ingestion 할때, 우리는 rollup 을 사용할지 말지를 결정해야 한다.
- 만약 roll-up 을 사용하게 된다면, 우리는 input column 을 두개로 분리해야 한다. 하나는 dimensions 이고 나머지 하나는 metrics 가 된다. dimenstion 은 rollup 을 위해 column 들을 grouping 하는 컬럼이고, "metrics" 는 aggregated 되는 컬럼이다.
- 만약 roll-up 을 사용하지 않는다면, 모든 컬럼은 dimensions 로 다뤄지게 된다. 그리고 pre-aggregation 을 하지 않게 된다.
이 튜토리얼에서는 rollup 을 설정해보자. 이 설정은 granularitySpec
에 의해 결정된다.
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"format" : "iso",
"column" : "ts"
},
"granularitySpec" : {
"rollup" : true
}
}
1-4-1. choosing dimensions and metrics
이 예제에서는 "dimension" 와 "metrics" 에 대해 다음과 같이 나누는 것이 합리적인 분할이다.
- Dimensions : srcIP, srcPort, dstIP, dstPort, protocol
- Metrics : packets, bytes, cost
그러면, 위와 같이 분할한 Dimensions 와 Metrics 를 각각 어떻게 정의하는지 살펴보자.
1-4-2. Dimensions
Dimensions 는 dataSchema
안의 dimensionsSpec
에 의해 구체화 된다.
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"format" : "iso",
"column" : "ts"
},
"dimensionsSpec" : {
"dimensions": [
"srcIP",
{ "name" : "srcPort", "type" : "long" },
{ "name" : "dstIP", "type" : "string" },
{ "name" : "dstPort", "type" : "long" },
{ "name" : "protocol", "type" : "string" }
]
},
"granularitySpec" : {
"rollup" : true
}
}
각각의 dimension 은 name 과 type 을 가지며, type 은 long, float, double, string 이 될 수 있다.
위의 dimension 중에 srcIP 를 보면 type 을 생략했는데, type 을 생략하게 되면 default type 인 string 으로 설정된다.
또한, protocol 은 input data 에서는 numeric type 이지만, ingestion 시에는 string type 으로 ingestion 하도록 지정했다는 점을 확인 할 수 있다.
- String VS Numerics
druid 에서 string type 과 numberics type 의 장단점 비교.
numberic type 은 string type 에 비해서 다음과 같은 장/단점이 있다.
- Pros : numeric 은 disk 에서 좀 더 작은 사이즈로 저장되며, 컬럼에서 데이터를 읽어올때도 낮은 오버헤드를 가지고 처리할 수 있다.
- Cons : numeric dimension 은 인덱스들을 가질 수 없다. 그래서 numeric dimension 을 filtering 하는 것은 bitmap index 를 가진 string type dimension 에 비해 더 느릴 수 있다. → 만약 해당 dimension 에 필터링 기능이 필요하다면 numeric 은 자제하고 string type 으로 지정하자.
1-4-3. Metrics
Metrics 는 dataSchema
의 metricsSpec
에 의해 지정할 수 있다.
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"format" : "iso",
"column" : "ts"
},
"dimensionsSpec" : {
"dimensions": [
"srcIP",
{ "name" : "srcPort", "type" : "long" },
{ "name" : "dstIP", "type" : "string" },
{ "name" : "dstPort", "type" : "long" },
{ "name" : "protocol", "type" : "string" }
]
},
"metricsSpec" : [
{ "type" : "count", "name" : "count" },
{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
{ "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
],
"granularitySpec" : {
"rollup" : true
}
}
metric 을 정의할때, aggregation 될 타입이 어떠한 타입인지를 결정할 필요가 있다.
이 튜토리얼에서는 "packets" 와 "bytes" 컬럼에 대해서는 long sum aggregation 을 수행하고, cost 컬럼에 대해서는 double sum aggregation 을 수행한다.
또한, 위의 metricsSpec 에서 count
는 "roll up" 된 original input data 가 얼마나 많은 row 가 있는지를 tracking 해준다.
1-5. No rollup
만약 rollup 을 사용하지 않는다면, 모든 컬럼은 dimensionsSpec
안에 모든 컬럼이 지정되어야 한다.
"dimensionsSpec" : {
"dimensions": [
"srcIP",
{ "name" : "srcPort", "type" : "long" },
{ "name" : "dstIP", "type" : "string" },
{ "name" : "dstPort", "type" : "long" },
{ "name" : "protocol", "type" : "string" },
{ "name" : "packets", "type" : "long" },
{ "name" : "bytes", "type" : "long" },
{ "name" : "srcPort", "type" : "double" }
]
},
1-6. Define granularities
이제까지 우리는 dataSchema
안에 parser
와 metricsSpec
을 정의했다.
이제 granularituSpec
안에 몇가지 부수적인 properties 를 설정해보자.
Type of granularitySpec
:uniform
와arbitrary
두가지 타입이 존재한다. 이 튜토리얼에서는uniform
type 을 사용한다.uniform
type 에서는 모든 세그먼트가 일관된 interval szie 를 가진다. (예를 들어, 모든 세그먼트는 1시간 분량의 데이터를 포함하는 것)The segment granularity
: single segment 가 data 를 포함해야 하는 시간 간격의 크기를 지정한다. (예를 들어,DAY
,WEEK
- The bucketing granularity of the timestamps in the time column (referred to as queryGranularity)
1-6-1. Segment granularity
SegmentGranularity 는 granularitySpec
안에 있는 segmentGranularity
프로퍼티에 의해 구성된다. 이 튜토리얼에서 우리는 hourly segments 를 생성한다.
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"format" : "iso",
"column" : "ts"
},
"dimensionsSpec" : {
"dimensions": [
"srcIP",
{ "name" : "srcPort", "type" : "long" },
{ "name" : "dstIP", "type" : "string" },
{ "name" : "dstPort", "type" : "long" },
{ "name" : "protocol", "type" : "string" }
]
},
"metricsSpec" : [
{ "type" : "count", "name" : "count" },
{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
{ "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "HOUR",
"rollup" : true
}
}
우리가 예제로 생성한 데이터는 두개의 시간 간격을 가지기 때문에, 위처럼 설정하게 되면 두개의 세그먼트가 생성된다.
1-6-2. Query granularity
query granularity 는 granularitySpec
안의 queryGranularity
property 에 의해 구성된다. 이 튜토리얼에서는 minute granularity 를 사용한다.
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"format" : "iso",
"column" : "ts"
},
"dimensionsSpec" : {
"dimensions": [
"srcIP",
{ "name" : "srcPort", "type" : "long" },
{ "name" : "dstIP", "type" : "string" },
{ "name" : "dstPort", "type" : "long" },
{ "name" : "protocol", "type" : "string" }
]
},
"metricsSpec" : [
{ "type" : "count", "name" : "count" },
{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
{ "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : "MINUTE",
"rollup" : true
}
}
query granularity d의 효과를 확인하기 위해, 아래 raw data input 을 살펴보자.
{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
이 row 가 minute queryGranularity 의 설정을 가지고 ingestion 됐을때, druid 는 row 의 timestamp 를 minute buckets 에 맞게 맞춰서 초단위를 내림처리 할 것이다.
- Define an interval (batch only)
batch task 에 대해서는 time interval 을 정의하는것이 필요하다. 지정된 timestamp interval 의 범위에 벗어나는 row 들은 ingestion 되지 않는다.
interval 또한, granualritySpec
에서 지정이 가능하다.
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"format" : "iso",
"column" : "ts"
},
"dimensionsSpec" : {
"dimensions": [
"srcIP",
{ "name" : "srcPort", "type" : "long" },
{ "name" : "dstIP", "type" : "string" },
{ "name" : "dstPort", "type" : "long" },
{ "name" : "protocol", "type" : "string" }
]
},
"metricsSpec" : [
{ "type" : "count", "name" : "count" },
{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
{ "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : "MINUTE",
"intervals" : ["2018-01-01/2018-01-02"],
"rollup" : true
}
}
2. Define the task type
이제 dataSchema
를 지정하는 것은 완료되었다. 남은 작업은 작성한 dataSchema
를 ingestion task spec 안에 위치시키는 것이다. 그리고 input source 를 지정해주는 것이다.
dataSchema
는 모든 task types 끼리 공유된다. 그러나 각각의 task type 은 각각 고유의 스펙 또한 가지고 있다. 이 튜토리얼에서는 우리는 native batch ingestion task 를 사용할 것이다.
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"format" : "iso",
"column" : "ts"
},
"dimensionsSpec" : {
"dimensions": [
"srcIP",
{ "name" : "srcPort", "type" : "long" },
{ "name" : "dstIP", "type" : "string" },
{ "name" : "dstPort", "type" : "long" },
{ "name" : "protocol", "type" : "string" }
]
},
"metricsSpec" : [
{ "type" : "count", "name" : "count" },
{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
{ "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : "MINUTE",
"intervals" : ["2018-01-01/2018-01-02"],
"rollup" : true
}
}
}
}
3. Define the input source
자, 이제 input source 를 정의해보자. input source 는 ioConfig
를 통해 정의된다.
각각의 task type 에는 자신의 ioConfig
를 가진다. input data 를 읽기 위해, 우리는 inputSource
를 지정해야 한다.
우리의 예제에서 사용한 netflow data 는 local file system 에서부터 읽어오기 때문에 다음과 같은 구성이 필요하다.
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/",
"filter" : "ingestion-tutorial-data.json"
}
}
3-1. Define the format of the data
우리의 input data 는 json string 으로 표현되기 때문에, 우리는 inputFormat 을 json foramt 으로 지정할 것이다.
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/",
"filter" : "ingestion-tutorial-data.json"
},
"inputFormat" : {
"type" : "json"
}
}
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"format" : "iso",
"column" : "ts"
},
"dimensionsSpec" : {
"dimensions": [
"srcIP",
{ "name" : "srcPort", "type" : "long" },
{ "name" : "dstIP", "type" : "string" },
{ "name" : "dstPort", "type" : "long" },
{ "name" : "protocol", "type" : "string" }
]
},
"metricsSpec" : [
{ "type" : "count", "name" : "count" },
{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
{ "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : "MINUTE",
"intervals" : ["2018-01-01/2018-01-02"],
"rollup" : true
}
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/",
"filter" : "ingestion-tutorial-data.json"
},
"inputFormat" : {
"type" : "json"
}
}
}
}
4. Additional tuning
각각의 ingestion task 는 추가적인 튜닝을 지원하는 tuningConfig
section 을 가진다. 예로써, tuningConfig
에 native batch ingestion 을 위해 target segment size 를 지정해보자.
"tuningConfig" : {
"type" : "index_parallel",
"maxRowsPerSegment" : 5000000
}
각각의 ingestion task 는 tuningConfig
의 type field 에 자신의 type 을 가진다는 것을 기억하자.
5. Final Spec
이제 우리는 ingestion spec 을 정의하는것을 끝마쳤다. 최종본은 다음과 같다.
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "ingestion-tutorial",
"timestampSpec" : {
"format" : "iso",
"column" : "ts"
},
"dimensionsSpec" : {
"dimensions": [
"srcIP",
{ "name" : "srcPort", "type" : "long" },
{ "name" : "dstIP", "type" : "string" },
{ "name" : "dstPort", "type" : "long" },
{ "name" : "protocol", "type" : "string" }
]
},
"metricsSpec" : [
{ "type" : "count", "name" : "count" },
{ "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
{ "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
{ "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : "MINUTE",
"intervals" : ["2018-01-01/2018-01-02"],
"rollup" : true
}
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/",
"filter" : "ingestion-tutorial-data.json"
},
"inputFormat" : {
"type" : "json"
}
},
"tuningConfig" : {
"type" : "index_parallel",
"maxRowsPerSegment" : 5000000
}
}
}
Submit the task and query the data
이제 다음 커맨드를 실행해보자.
bin/post-index-task --file quickstart/ingestion-tutorial-index.json --url http://localhost:8081
스크립트 실행이 완료되면, 우리는 데이터를 질의할 수 있다.
select * from "ingestion-tutorial"; 를 수행하면 아래와 같은 결과가 나올 것이다.
┌──────────────────────────┬───────┬──────┬───────┬─────────┬─────────┬─────────┬──────────┬─────────┬─────────┐
│ __time │ bytes │ cost │ count │ dstIP │ dstPort │ packets │ protocol │ srcIP │ srcPort │
├──────────────────────────┼───────┼──────┼───────┼─────────┼─────────┼─────────┼──────────┼─────────┼─────────┤
│ 2018-01-01T01:01:00.000Z │ 6000 │ 4.9 │ 3 │ 2.2.2.2 │ 3000 │ 60 │ 6 │ 1.1.1.1 │ 2000 │
│ 2018-01-01T01:02:00.000Z │ 9000 │ 18.1 │ 2 │ 2.2.2.2 │ 7000 │ 90 │ 6 │ 1.1.1.1 │ 5000 │
│ 2018-01-01T01:03:00.000Z │ 6000 │ 4.3 │ 1 │ 2.2.2.2 │ 7000 │ 60 │ 6 │ 1.1.1.1 │ 5000 │
│ 2018-01-01T02:33:00.000Z │ 30000 │ 56.9 │ 2 │ 8.8.8.8 │ 5000 │ 300 │ 17 │ 7.7.7.7 │ 4000 │
│ 2018-01-01T02:35:00.000Z │ 30000 │ 46.3 │ 1 │ 8.8.8.8 │ 5000 │ 300 │ 17 │ 7.7.7.7 │ 4000 │
└──────────────────────────┴───────┴──────┴───────┴─────────┴─────────┴─────────┴──────────┴─────────┴─────────┘
'Big Data' 카테고리의 다른 글
Druid - Design (0) | 2020.11.10 |
---|---|
Druid - Technology Overview (0) | 2020.10.19 |
Druid - Use Cases (0) | 2020.10.19 |