일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Test
- docker
- API
- Kafka
- token filter test
- Java
- flask
- 파이썬
- licence delete curl
- TensorFlow
- Python
- query
- Mac
- MySQL
- plugin
- analyzer test
- zip 파일 암호화
- 차트
- matplotlib
- zip 암호화
- high level client
- aggregation
- ELASTIC
- license delete
- Elasticsearch
- sort
- 900gle
- springboot
- aggs
- License
Archives
- Today
- Total
개발잡부
[900gle] 900gle producer 본문
반응형
elastic stack과 kafka 를 각자 실행하면 실행 되는데
docker compose 로 elastic stack 과 kafka 를 띄웠더니.. 둘다 죽는다..
메모리 문제인가..
일단
https://ldh-6019.tistory.com/228?category=1059128
명령어 모음 kafka 실행
#카프카 설치 경로로 이동
cd /Users/doo/kafka/kafka_2.13-2.8.0
#주키퍼 시작
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#카프카 시작
bin/kafka-server-start.sh -daemon config/server.properties
#토픽생성
bin/kafka-topics.sh --create --topic 900gle --bootstrap-server localhost:9092
#토픽생성까지만 실행
#프로듀서 실행
bin/kafka-console-producer.sh --topic 900gle --bootstrap-server localhost:9092
#컨슈머 실행
bin/kafka-console-consumer.sh --topic 900gle --from-beginning --bootstrap-server localhost:9092
1분에한번씩 실행
package com.bbongdoo.doo.cron;
import com.bbongdoo.doo.service.ProducerService;
import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class GoodsCron {
private final ProducerService producerService;
@Scheduled(cron = "0/1 * * * * *")
public void indexJob() {
producerService.dynamicIndex();
}
}
DB 조회 후 json 으로 변환 그리고 kafka 전송
package com.bbongdoo.doo.service;
import com.bbongdoo.doo.domain.GoodsText;
import com.bbongdoo.doo.domain.GoodsTextRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@RequiredArgsConstructor
public class ProducerService {
private final GoodsTextRepository goodsTextRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "900gle";
public void dynamicIndex() {
List<GoodsText> goodsList = null;
goodsList = goodsTextRepository.findAll();
if (goodsList.size() > 0) {
goodsList.forEach(
x -> {
String st = "";
ObjectMapper objectMapper = new ObjectMapper();
try {
st = objectMapper.writeValueAsString(x);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
System.out.println(String.format("Produce message : %s", st));
this.kafkaTemplate.send(TOPIC, st);
}
);
} else {
System.out.println("end");
}
}
}
반응형
Comments