본문 바로가기

Big Data

[Druid][Tutorial] Writing an ingestion spec

Example Data

다음과 같은 데이터가 있다고 가정해보자.

  • srcIP: IP address of sender
  • srcPort: Port of sender
  • dstIP: IP address of receiver
  • dstPort: Port of receiver
  • protocol: IP protocol number
  • packets: number of packets transmitted
  • bytes: number of bytes transmitted
  • cost: the cost of sending the traffic
{"ts":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":10, "bytes":1000, "cost": 1.4}
{"ts":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":20, "bytes":2000, "cost": 3.1}
{"ts":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":2000, "dstPort":3000, "protocol": 6, "packets":30, "bytes":3000, "cost": 0.4}
{"ts":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":40, "bytes":4000, "cost": 7.9}
{"ts":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":50, "bytes":5000, "cost": 10.2}
{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}
{"ts":"2018-01-01T02:33:14Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":100, "bytes":10000, "cost": 22.4}
{"ts":"2018-01-01T02:33:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":200, "bytes":20000, "cost": 34.5}
{"ts":"2018-01-01T02:35:45Z","srcIP":"7.7.7.7", "dstIP":"8.8.8.8", "srcPort":4000, "dstPort":5000, "protocol": 17, "packets":300, "bytes":30000, "cost": 46.3}

위의 Json data 를 quickstart 에 있는 ingestion_tutorial_data.json 에 저장하자.

우리는 이 데이터를 로드하는 ingestion spec 을 작성할 것이다.

이 튜토리얼에서, 우리는 native batch indexing task 를 사용할 것이다. 다른 task 를 사용하게 된다면, 몇가지 영역을 다르게 작성해야 하는데, 이 튜토리얼에서 그러한 포인트들을 짚어줄 것이다.

1. Defining the scheme

druid ingestion spec 의 가장 핵심적인 요소는 dataSchema 이다. dataSchema 는 input data 를 druid 에 저장할 컬럼 set 으로 어떻게 파싱할것인지를 정의한다.

가장 먼저 empty dataSchema 를 생성하고, 튜토리얼을 진행하면서 하나씩 채워나가보자.

quickstart 디렉토리 안에 다음과 같은 컨텐츠를 포함시킨 ingestion-tutorial-index.json 파일을 생성한다.

"dataSchema" : {}

1-1. Datasource name

dataSchema 안의 dataSource parameter 를 사용하여, datasource name 을 구체화시킨다.

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
}

1-2. Time column

dataSchema 는 input data 로부터 main timestamp field 를 어떻게 추출할 것인지를 아는 것이 필요하다.

위의 예제 데이터에서 우리의 input data 에서 timestamp column 은 ISO 8601 timestamps 포맷인 "ts" 컬럼 이다. 그러니 이 정보를 표현하는 timestampSpec 을 아래와 같이 작성해주자.

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  }
}

1-3. Column types

자, 이제 우리는 time column 을 정의했으니, 이제 다른 컬럼들을 정의해보자.

druid 는 String, Long, Float, Double column 을 지원한다. 우리는 이 컬럼들이 어떻게 사용되는지 다음 섹션에서 살펴볼 것이다. 우선 roll-up 먼저 살펴보자.

1-4 Rollup

data 를 ingestion 할때, 우리는 rollup 을 사용할지 말지를 결정해야 한다.

  • 만약 roll-up 을 사용하게 된다면, 우리는 input column 을 두개로 분리해야 한다. 하나는 dimensions 이고 나머지 하나는 metrics 가 된다. dimenstion 은 rollup 을 위해 column 들을 grouping 하는 컬럼이고, "metrics" 는 aggregated 되는 컬럼이다.
  • 만약 roll-up 을 사용하지 않는다면, 모든 컬럼은 dimensions 로 다뤄지게 된다. 그리고 pre-aggregation 을 하지 않게 된다.

이 튜토리얼에서는 rollup 을 설정해보자. 이 설정은 granularitySpec 에 의해 결정된다.

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "granularitySpec" : {
    "rollup" : true
  }
}

1-4-1. choosing dimensions and metrics

이 예제에서는 "dimension" 와 "metrics" 에 대해 다음과 같이 나누는 것이 합리적인 분할이다.

  • Dimensions : srcIP, srcPort, dstIP, dstPort, protocol
  • Metrics : packets, bytes, cost

그러면, 위와 같이 분할한 Dimensions 와 Metrics 를 각각 어떻게 정의하는지 살펴보자.

1-4-2. Dimensions

Dimensions 는 dataSchema 안의 dimensionsSpec 에 의해 구체화 된다.

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "granularitySpec" : {
    "rollup" : true
  }
}

각각의 dimension 은 name 과 type 을 가지며, type 은 long, float, double, string 이 될 수 있다.

위의 dimension 중에 srcIP 를 보면 type 을 생략했는데, type 을 생략하게 되면 default type 인 string 으로 설정된다.

또한, protocol 은 input data 에서는 numeric type 이지만, ingestion 시에는 string type 으로 ingestion 하도록 지정했다는 점을 확인 할 수 있다.

  • String VS Numerics

druid 에서 string type 과 numberics type 의 장단점 비교.

numberic type 은 string type 에 비해서 다음과 같은 장/단점이 있다.

  • Pros : numeric 은 disk 에서 좀 더 작은 사이즈로 저장되며, 컬럼에서 데이터를 읽어올때도 낮은 오버헤드를 가지고 처리할 수 있다.
  • Cons : numeric dimension 은 인덱스들을 가질 수 없다. 그래서 numeric dimension 을 filtering 하는 것은 bitmap index 를 가진 string type dimension 에 비해 더 느릴 수 있다. → 만약 해당 dimension 에 필터링 기능이 필요하다면 numeric 은 자제하고 string type 으로 지정하자.

1-4-3. Metrics

Metrics 는 dataSchemametricsSpec 에 의해 지정할 수 있다.

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "rollup" : true
  }
}

metric 을 정의할때, aggregation 될 타입이 어떠한 타입인지를 결정할 필요가 있다.

이 튜토리얼에서는 "packets" 와 "bytes" 컬럼에 대해서는 long sum aggregation 을 수행하고, cost 컬럼에 대해서는 double sum aggregation 을 수행한다.

또한, 위의 metricsSpec 에서 count 는 "roll up" 된 original input data 가 얼마나 많은 row 가 있는지를 tracking 해준다.

1-5. No rollup

만약 rollup 을 사용하지 않는다면, 모든 컬럼은 dimensionsSpec 안에 모든 컬럼이 지정되어야 한다.

"dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" },
          { "name" : "packets", "type" : "long" },
          { "name" : "bytes", "type" : "long" },
          { "name" : "srcPort", "type" : "double" }
        ]
      },

1-6. Define granularities

이제까지 우리는 dataSchema 안에 parsermetricsSpec 을 정의했다.

이제 granularituSpec 안에 몇가지 부수적인 properties 를 설정해보자.

  • Type of granularitySpec : uniformarbitrary 두가지 타입이 존재한다. 이 튜토리얼에서는 uniform type 을 사용한다. uniform type 에서는 모든 세그먼트가 일관된 interval szie 를 가진다. (예를 들어, 모든 세그먼트는 1시간 분량의 데이터를 포함하는 것)
  • The segment granularity: single segment 가 data 를 포함해야 하는 시간 간격의 크기를 지정한다. (예를 들어, DAY, WEEK
  • The bucketing granularity of the timestamps in the time column (referred to as queryGranularity)

1-6-1. Segment granularity

SegmentGranularity 는 granularitySpec 안에 있는 segmentGranularity 프로퍼티에 의해 구성된다. 이 튜토리얼에서 우리는 hourly segments 를 생성한다.

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "HOUR",
    "rollup" : true
  }
}

우리가 예제로 생성한 데이터는 두개의 시간 간격을 가지기 때문에, 위처럼 설정하게 되면 두개의 세그먼트가 생성된다.

1-6-2. Query granularity

query granularity 는 granularitySpec 안의 queryGranularity property 에 의해 구성된다. 이 튜토리얼에서는 minute granularity 를 사용한다.

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "HOUR",
    "queryGranularity" : "MINUTE",
    "rollup" : true
  }
}

query granularity d의 효과를 확인하기 위해, 아래 raw data input 을 살펴보자.

{"ts":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1", "dstIP":"2.2.2.2", "srcPort":5000, "dstPort":7000, "protocol": 6, "packets":60, "bytes":6000, "cost": 4.3}

이 row 가 minute queryGranularity 의 설정을 가지고 ingestion 됐을때, druid 는 row 의 timestamp 를 minute buckets 에 맞게 맞춰서 초단위를 내림처리 할 것이다.

  • Define an interval (batch only)

batch task 에 대해서는 time interval 을 정의하는것이 필요하다. 지정된 timestamp interval 의 범위에 벗어나는 row 들은 ingestion 되지 않는다.

interval 또한, granualritySpec 에서 지정이 가능하다.

"dataSchema" : {
  "dataSource" : "ingestion-tutorial",
  "timestampSpec" : {
    "format" : "iso",
    "column" : "ts"
  },
  "dimensionsSpec" : {
    "dimensions": [
      "srcIP",
      { "name" : "srcPort", "type" : "long" },
      { "name" : "dstIP", "type" : "string" },
      { "name" : "dstPort", "type" : "long" },
      { "name" : "protocol", "type" : "string" }
    ]
  },
  "metricsSpec" : [
    { "type" : "count", "name" : "count" },
    { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
    { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
    { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
  ],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "HOUR",
    "queryGranularity" : "MINUTE",
    "intervals" : ["2018-01-01/2018-01-02"],
    "rollup" : true
  }
}

2. Define the task type

이제 dataSchema 를 지정하는 것은 완료되었다. 남은 작업은 작성한 dataSchema 를 ingestion task spec 안에 위치시키는 것이다. 그리고 input source 를 지정해주는 것이다.

dataSchema 는 모든 task types 끼리 공유된다. 그러나 각각의 task type 은 각각 고유의 스펙 또한 가지고 있다. 이 튜토리얼에서는 우리는 native batch ingestion task 를 사용할 것이다.

{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "ingestion-tutorial",
      "timestampSpec" : {
        "format" : "iso",
        "column" : "ts"
      },
      "dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" }
        ]
      },
      "metricsSpec" : [
        { "type" : "count", "name" : "count" },
        { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
        { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
        { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "MINUTE",
        "intervals" : ["2018-01-01/2018-01-02"],
        "rollup" : true
      }
    }
  }
}

3. Define the input source

자, 이제 input source 를 정의해보자. input source 는 ioConfig 를 통해 정의된다.

각각의 task type 에는 자신의 ioConfig 를 가진다. input data 를 읽기 위해, 우리는 inputSource 를 지정해야 한다.

우리의 예제에서 사용한 netflow data 는 local file system 에서부터 읽어오기 때문에 다음과 같은 구성이 필요하다.

"ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      }
    }

3-1. Define the format of the data

우리의 input data 는 json string 으로 표현되기 때문에, 우리는 inputFormat 을 json foramt 으로 지정할 것이다.

"ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      },
      "inputFormat" : {
        "type" : "json"
      }
    }
{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "ingestion-tutorial",
      "timestampSpec" : {
        "format" : "iso",
        "column" : "ts"
      },
      "dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" }
        ]
      },
      "metricsSpec" : [
        { "type" : "count", "name" : "count" },
        { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
        { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
        { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "MINUTE",
        "intervals" : ["2018-01-01/2018-01-02"],
        "rollup" : true
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      },
      "inputFormat" : {
        "type" : "json"
      }
    }
  }
}

4. Additional tuning

각각의 ingestion task 는 추가적인 튜닝을 지원하는 tuningConfig section 을 가진다. 예로써, tuningConfig 에 native batch ingestion 을 위해 target segment size 를 지정해보자.

"tuningConfig" : {
      "type" : "index_parallel",
      "maxRowsPerSegment" : 5000000
    }

각각의 ingestion task 는 tuningConfig 의 type field 에 자신의 type 을 가진다는 것을 기억하자.

5. Final Spec

이제 우리는 ingestion spec 을 정의하는것을 끝마쳤다. 최종본은 다음과 같다.

{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "ingestion-tutorial",
      "timestampSpec" : {
        "format" : "iso",
        "column" : "ts"
      },
      "dimensionsSpec" : {
        "dimensions": [
          "srcIP",
          { "name" : "srcPort", "type" : "long" },
          { "name" : "dstIP", "type" : "string" },
          { "name" : "dstPort", "type" : "long" },
          { "name" : "protocol", "type" : "string" }
        ]
      },
      "metricsSpec" : [
        { "type" : "count", "name" : "count" },
        { "type" : "longSum", "name" : "packets", "fieldName" : "packets" },
        { "type" : "longSum", "name" : "bytes", "fieldName" : "bytes" },
        { "type" : "doubleSum", "name" : "cost", "fieldName" : "cost" }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "HOUR",
        "queryGranularity" : "MINUTE",
        "intervals" : ["2018-01-01/2018-01-02"],
        "rollup" : true
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "ingestion-tutorial-data.json"
      },
      "inputFormat" : {
        "type" : "json"
      }
    },
    "tuningConfig" : {
      "type" : "index_parallel",
      "maxRowsPerSegment" : 5000000
    }
  }
}

Submit the task and query the data

이제 다음 커맨드를 실행해보자.

bin/post-index-task --file quickstart/ingestion-tutorial-index.json --url http://localhost:8081

스크립트 실행이 완료되면, 우리는 데이터를 질의할 수 있다.

select * from "ingestion-tutorial"; 를 수행하면 아래와 같은 결과가 나올 것이다.


┌──────────────────────────┬───────┬──────┬───────┬─────────┬─────────┬─────────┬──────────┬─────────┬─────────┐
│ __time                   │ bytes │ cost │ count │ dstIP   │ dstPort │ packets │ protocol │ srcIP   │ srcPort │
├──────────────────────────┼───────┼──────┼───────┼─────────┼─────────┼─────────┼──────────┼─────────┼─────────┤
│ 2018-01-01T01:01:00.000Z │  6000 │  4.9 │     3 │ 2.2.2.2 │    3000 │      60 │ 6        │ 1.1.1.1 │    2000 │
│ 2018-01-01T01:02:00.000Z │  9000 │ 18.1 │     2 │ 2.2.2.2 │    7000 │      90 │ 6        │ 1.1.1.1 │    5000 │
│ 2018-01-01T01:03:00.000Z │  6000 │  4.3 │     1 │ 2.2.2.2 │    7000 │      60 │ 6        │ 1.1.1.1 │    5000 │
│ 2018-01-01T02:33:00.000Z │ 30000 │ 56.9 │     2 │ 8.8.8.8 │    5000 │     300 │ 17       │ 7.7.7.7 │    4000 │
│ 2018-01-01T02:35:00.000Z │ 30000 │ 46.3 │     1 │ 8.8.8.8 │    5000 │     300 │ 17       │ 7.7.7.7 │    4000 │
└──────────────────────────┴───────┴──────┴───────┴─────────┴─────────┴─────────┴──────────┴─────────┴─────────┘

'Big Data' 카테고리의 다른 글

Druid - Design  (0) 2020.11.10
Druid - Technology Overview  (0) 2020.10.19
Druid - Use Cases  (0) 2020.10.19