반응형
Recent Posts
Recent Comments
관리 메뉴

개발잡부

[900gle] 900gle producer 본문

900gle shopping/producer

[900gle] 900gle producer

닉의네임 2022. 10. 30. 14:50
반응형

elastic stack과 kafka 를 각자 실행하면 실행 되는데 

docker compose 로  elastic stack 과 kafka 를 띄웠더니.. 둘다 죽는다..

메모리 문제인가..

 

 

 

일단

https://ldh-6019.tistory.com/228?category=1059128 

 

[kafka] MacOs Kafka install / test

kafka를 설치 해보자 https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.13-2.8.0.tgz Apache Download Mirrors Copyright © 2020 The Apache Software Foundation, Licensed under the Apache L..

ldh-6019.tistory.com

 

 

명령어 모음  kafka 실행

#카프카 설치 경로로 이동 
cd /Users/doo/kafka/kafka_2.13-2.8.0
#주키퍼 시작
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#카프카 시작
bin/kafka-server-start.sh -daemon config/server.properties
#토픽생성
bin/kafka-topics.sh --create --topic 900gle --bootstrap-server localhost:9092

#토픽생성까지만 실행

#프로듀서 실행
bin/kafka-console-producer.sh --topic 900gle --bootstrap-server localhost:9092
#컨슈머 실행
bin/kafka-console-consumer.sh --topic 900gle --from-beginning --bootstrap-server localhost:9092

1분에한번씩  실행 

package com.bbongdoo.doo.cron;

import com.bbongdoo.doo.service.ProducerService;
import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class GoodsCron {

    private final ProducerService producerService;

    @Scheduled(cron = "0/1 * * * * *")
    public void indexJob() {
        producerService.dynamicIndex();
    }
}

DB 조회 후 json 으로 변환 그리고 kafka 전송

package com.bbongdoo.doo.service;

import com.bbongdoo.doo.domain.GoodsText;
import com.bbongdoo.doo.domain.GoodsTextRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
@RequiredArgsConstructor
public class ProducerService {

    private final GoodsTextRepository goodsTextRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private static final String TOPIC = "900gle";

    public void dynamicIndex() {

        List<GoodsText> goodsList = null;
        goodsList = goodsTextRepository.findAll();
        if (goodsList.size() > 0) {
            goodsList.forEach(
                    x -> {
                        String st = "";
                        ObjectMapper objectMapper = new ObjectMapper();
                        try {
                             st = objectMapper.writeValueAsString(x);
                        } catch (JsonProcessingException e) {
                            e.printStackTrace();
                        }
                        System.out.println(String.format("Produce message : %s", st));
                        this.kafkaTemplate.send(TOPIC, st);
                    }
            );
        } else {
            System.out.println("end");
        }
    }
}

producer message

반응형
Comments