반응형
Recent Posts
Recent Comments
관리 메뉴

개발잡부

[900gle] 900gle Consumer 본문

900gle shopping/consumer

[900gle] 900gle Consumer

닉의네임 2022. 11. 1. 23:21
반응형

https://ldh-6019.tistory.com/418

 

[900gle] 900gle producer

elastic stack과 kafka 를 각자 실행하면 실행 되는데 docker compose 로 elastic stack 과 kafka 를 띄웠더니.. 둘다 죽는다.. 메모리 문제인가.. 일단 https://ldh-6019.tistory.com/228?category=1059128 [kafka] MacOs Kafka install /

ldh-6019.tistory.com

producer 에서 1분에 3개씩 데이터 가져와서 전송

 

Local kafka 실행

#카프카 설치 경로로 이동 
cd /Users/doo/kafka/kafka_2.13-2.8.0
#주키퍼 시작
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#카프카 시작
bin/kafka-server-start.sh -daemon config/server.properties
#토픽생성
bin/kafka-topics.sh --create --topic 900gle --bootstrap-server localhost:9092

 

Consumer Service -  KafkaListener 

package com.bbongdoo.doo.service;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@Service
@RequiredArgsConstructor
public class ConsumerService {


    private final GoodsService goodsService;

    private static final String TOPIC = "900gle";
    private static final String GROUP_ID = "doo";


    @KafkaListener(topics = TOPIC, groupId = GROUP_ID)
    public void consume(String message) throws IOException {
        List<String> messageList = new ArrayList<>();
        messageList.add(message);

            goodsService.staticIndex(messageList);
            messageList.clear();

    }
}

 

Elasticsearch indexer 

package com.bbongdoo.doo.service;


import com.bbongdoo.doo.apis.IndexGoodsApi;
import com.bbongdoo.doo.domain.Goods;
import com.bbongdoo.doo.domain.GoodsRepository;
import com.bbongdoo.doo.utils.ParseVector;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;

@Slf4j
@Service
@RequiredArgsConstructor
public class GoodsService {


    private final RestHighLevelClient client;

    private LocalDateTime updatedTime;

    private String PREFIX = "goods-kafka";

    public void staticIndex(List<String> data) {

        String indexName = PREFIX + "-" + LocalDateTime.now().format(DateTimeFormatter.ISO_DATE).toString();

        try {

            GetIndexRequest requestGetIndex = new GetIndexRequest(indexName);
            boolean existsIndex = client.indices().exists(requestGetIndex, RequestOptions.DEFAULT);

            GetAliasesRequest aliasesRequest = new GetAliasesRequest(PREFIX);
            GetAliasesResponse getAliasesResponse = client.indices().getAlias(aliasesRequest, RequestOptions.DEFAULT);

            String oldIndexName = "";
            if (getAliasesResponse.getAliases().size() > 0) {
                oldIndexName = Optional.ofNullable(getAliasesResponse.getAliases().keySet().iterator().next()).orElse("");
            }

            if (existsIndex == false) {
                CreateIndexRequest request = IndexGoodsApi.createIndex(indexName);
                CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
            } else {

                if (!oldIndexName.equals(indexName)) {
                    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
                    AcknowledgedResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
                    CreateIndexRequest request = IndexGoodsApi.createIndex(indexName);
                    CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
                }
            }

            BulkRequest bulkRequest = new BulkRequest();
            data.forEach(
                    x -> {
                        try {

                            JSONParser jsonParser = new JSONParser();
                            Object obj = jsonParser.parse(x);
                            JSONObject jsonObj = (JSONObject) obj;

                            XContentBuilder builder = XContentFactory.jsonBuilder();
                            builder.startObject();
                            {
                                builder.field("id", jsonObj.get("id"));
                                builder.field("keyword", jsonObj.get("keyword"));
                                builder.field("name", jsonObj.get("name"));
                                builder.field("brand", jsonObj.get("brand"));
                                builder.field("price", jsonObj.get("price"));
                                builder.field("category1", jsonObj.get("category1"));
                                builder.field("category2", jsonObj.get("category2"));
                                builder.field("category3", jsonObj.get("category3"));
                                builder.field("category4", jsonObj.get("category4"));
                                builder.field("category5", jsonObj.get("category5"));
                                builder.field("image", jsonObj.get("image"));
//                                builder.field("feature_vector", ParseVector.parse(jsonObj.get("keyword")));
                                builder.field("weight", jsonObj.get("weight"));
                                builder.field("popular", jsonObj.get("popular"));
                                builder.field("type", jsonObj.get("type"));
                                builder.field("created_time", jsonObj.get("created_time"));
                                builder.field("updated_time", jsonObj.get("updated_time"));
                            }
                            builder.endObject();
                            IndexRequest indexRequest = new IndexRequest(indexName)
                                    .type("_doc")
                                    .source(builder);
                            bulkRequest.add(indexRequest);

                        } catch (IOException e) {
                            e.printStackTrace();
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                    }
            );

            BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);

            System.out.println(bulkResponse.buildFailureMessage());

            FlushRequest flushRequest = new FlushRequest(indexName);
            FlushResponse flushResponse = client.indices().flush(flushRequest, RequestOptions.DEFAULT);
            ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName);
            ForceMergeResponse forceMergeResponse = client.indices().forcemerge(forceMergeRequest, RequestOptions.DEFAULT);
            IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();

            if (!StringUtils.isEmpty(oldIndexName) && !indexName.equals(oldIndexName)) {

                IndicesAliasesRequest.AliasActions aliasActionsAdd = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
                        .index(indexName)
                        .alias(PREFIX);
                indicesAliasesRequest.addAliasAction(aliasActionsAdd);

                IndicesAliasesRequest.AliasActions aliasActionsRemove = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.REMOVE)
                        .index(oldIndexName)
                        .alias(PREFIX);
                indicesAliasesRequest.addAliasAction(aliasActionsRemove);

            } else {


                IndicesAliasesRequest.AliasActions aliasActionsAdd = new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
                        .index(indexName)
                        .alias(PREFIX);

                indicesAliasesRequest.addAliasAction(aliasActionsAdd);


            }

            AcknowledgedResponse indicesAliasesResponse =
                    client.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }




}

 

결과 

일단 죽지 않는것에 만족하고 이걸 어떻게 사용할건지는 다시 생각해 봐야겠음

 

 

반응형
Comments