일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- MySQL
- Elasticsearch
- zip 파일 암호화
- zip 암호화
- 파이썬
- aggregation
- licence delete curl
- plugin
- springboot
- Mac
- License
- analyzer test
- TensorFlow
- Kafka
- high level client
- aggs
- flask
- Test
- query
- 900gle
- sort
- Java
- 차트
- docker
- Python
- matplotlib
- token filter test
- license delete
- API
- ELASTIC
- Today
- Total
개발잡부
[kafka] 5amsung ELK + kafka 본문
ELK 는 docker compose 로 구성하고 kafka 로 구성했더니 connection 이 되지 않아서
포기 했는데 검색해보니
Kafka, ELK를 각각 Docker compose로 구성하면 Kafka와 Logstash연동에 문제가 있을수 있다 고 한다.
Docker network는 default bridge이며, 기본적으로 같은 네트워크로 묶인 컨테이너끼리 통신이 가능 이니께..
docker network connect, 공용 외부 네트워크 생성으로 해결할수도 있으나 compose로 구성
- git clone https://github.com/900gle/docker-elk
- cd docker-elk
- docker-compose.yml 수정 기존에 쓰고 있던 es8.8.1 elk 에 kafka 추가
version: '3.7'
services:
# The 'setup' service runs a one-off script which initializes the
# 'logstash_internal' and 'kibana_system' users inside Elasticsearch with the
# values of the passwords defined in the '.env' file.
#
# This task is only performed during the *initial* startup of the stack. On all
# subsequent runs, the service simply returns immediately, without performing
# any modification to existing users.
setup:
build:
context: setup/
args:
ELASTIC_VERSION: ${ELASTIC_VERSION}
init: true
volumes:
- setup:/state:Z
environment:
ELASTIC_PASSWORD: ${ELASTIC_PASSWORD:-}
LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
KIBANA_SYSTEM_PASSWORD: ${KIBANA_SYSTEM_PASSWORD:-}
networks:
- elk
depends_on:
- elasticsearch
elasticsearch:
build:
context: elasticsearch/
args:
ELASTIC_VERSION: ${ELASTIC_VERSION}
volumes:
- ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:ro,z
- elasticsearch:/usr/share/elasticsearch/data:z
ports:
- "9200:9200"
- "9300:9300"
environment:
ES_JAVA_OPTS: -Xms512m -Xmx512m
# Bootstrap password.
# Used to initialize the keystore during the initial startup of
# Elasticsearch. Ignored on subsequent runs.
ELASTIC_PASSWORD: ${ELASTIC_PASSWORD:-}
# Use single node discovery in order to disable production mode and avoid bootstrap checks.
# see: https://www.elastic.co/guide/en/elasticsearch/reference/current/bootstrap-checks.html
discovery.type: single-node
networks:
- elk
logstash:
build:
context: logstash/
args:
ELASTIC_VERSION: ${ELASTIC_VERSION}
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro,Z
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro,Z
ports:
- "5044:5044"
- "50000:50000/tcp"
- "50000:50000/udp"
- "9600:9600"
environment:
LS_JAVA_OPTS: -Xms512m -Xmx512m
LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
networks:
- elk
depends_on:
- elasticsearch
kibana:
build:
context: kibana/
args:
ELASTIC_VERSION: ${ELASTIC_VERSION}
volumes:
- ./kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml:ro,Z
ports:
- "5601:5601"
environment:
KIBANA_SYSTEM_PASSWORD: ${KIBANA_SYSTEM_PASSWORD:-}
networks:
- elk
depends_on:
- elasticsearch
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:latest
ports:
- "9900:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- elk
kafka:
container_name: kafka
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "5amsung:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
networks:
- elk
networks:
elk:
driver: bridge
volumes:
setup:
elasticsearch:
실행
(base) ➜ es8.8.1 docker compose up -d --build
kafka 랑 elasticsearch 가 죽어 있네..
docker logs es881-elasticsearch-1
ERROR: Elasticsearch exited unexpectedly
#도커 이미지 확인
docker images
#도커 이미지 삭제
docker rmi {IMAGE_ID}
- 접속확인
- Elasticsearch : localhost:9200
- Logstash : localhost:5000/9600
- Kibana : localhost:5601
- docker network ls 명령어를 입력하여 네트워크 목록확인
- docker network inspect {network name} 명령어로 해당 네트워크에 컨테이너가 모두 포함되었는지 확인
docker-compose.yml kafka 설정에서
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 이처럼 설정하여
컨테이너 내부에선 kafka:29092 외부에선 localhost:9092접속하도록 설정
logstash pipeline 추가
- cd /Users/doo/docker/es8.8.1/logstash/pipeline
- vi logstash.conf
- input으로 kafka의 5amsung 에서 메시지를 읽어오고, output으로 elasticsearch로 보내면서 index를 설정
input {
kafka {
bootstrap_servers => "kafka:29092"
#group_id => "logstash"
topics => ["5amsung"]
consumer_threads => 1
decorate_events => true
}
}
## Add your filters / logstash plugins configuration here
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "logstash-%{+YYYY.MM.dd}"
}
}
- Logstash 컨테이너 재시작
5amsung producer api 가 localhost:9092 포트로 kafka 브로커에 데이터 전송
https://ldh-6019.tistory.com/508
kibana dev tool 을 통한 data 확인
head 를 통해 index 확인
"reason"=>"action [indices:admin/auto_create] is unauthorized for user [logstash_internal] with effective roles [logstash_admin,logstash_writer] on indices [race-2023.06.25], this action is granted by the index privileges [auto_configure,create_index,manage,all]"}}
인덱스 패턴을 만들어서 처리 해야 하지만..귀찮아서
create_index role 에 모든인덱스 텍스트의 인덱스를 생성할수 있는 권한을 부여
Springboot Logback 로그 수집
Springboot의 설정은 Logback만 진행하도록 하겠습니다.
1.build.gradle에 라이브러리 추가
# Logback은 kafka appender을 기본적으로 지원하지않아 별도 라이브러리가 필요합니다.
implementation 'com.github.danielwegener:logback-kafka-appender:0.1.0'
implementation 'net.logstash.logback:logstash-logback-encoder:6.2'
2.logback.xml 설정
<configuration>
<appender name="LOG-KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%date - %-5p %t %-25logger{5} %F:%L %m%n</pattern>
</layout>
</encoder>
<topic>test-topic</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>retries=1</producerConfig>
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
<producerConfig>compression.type=snappy</producerConfig>
<producerConfig>max.block.ms=1000</producerConfig>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date - %-5p %t %-25logger{5} %F:%L %m%n</pattern>
</encoder>
</appender>
<logger name="kafka-logger" level="INFO" additivity="false">
<appender-ref ref="LOG-KAFKA"/>
<appender-ref ref="STDOUT"/>
</logger>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
3.로그 남기기
// @Slf4j의 topic 옵션으로 logger을 지정해줍니다.
@Slf4j(topic = "kafka-logger")
@RestController
@RequiredArgsConstructor
public class HelloController {
@GetMapping({"", "/hello"})
public String hello() throws IOException {
// kafka logger에 로그 남기기
log.info("hello~!@");
return "hello";
}
}
Kibana index pattern 생성
- http://localhost:5601/app/management/kibana/indexPatterns 이동
- kibana pipeline에 설정한 index를 검색이 되는지 확인 (kafka-app-log-%{+YYYY.MM.dd})
- kafka-app-log-* 패턴을 만들고 Time filed를 @timestamp로 설정후 생
- http://localhost:5601/app/discover에서 데이터 확인