package com.ssi.config; import com.alibaba.fastjson.JSON; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.Maps; import lombok.Getter; import lombok.ToString; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.InetAddress; import java.util.Map; /** * elasticsearch查询配置 * * @author LiXiaoCong * @version 2018/4/25 15:09 */ @Configuration @Getter @ToString public class ElasticsearchConfig { private final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class); @Value("${spring.data.elasticsearch.cluster-name:es}") private String clustername; @Value("${spring.data.elasticsearch.cluster-nodes}") private String servers; @Value("${spring.data.elasticsearch.properties.pool:6}") private String poolSize; @Value("${es.client.search.type:road_terminal_02001}") private String typeName; @Value("${spring.data.elasticsearch.userName}") private String userName; @Value("${spring.data.elasticsearch.pwd}") private String pwd; @Value("${spring.data.elasticsearch.enable-security}") private boolean enableSecurity = false; @Bean public TransportClient newTransportClient() { logger.info("Elasticsearch客户端初始化开始..."); TransportClient transportClient = null; try { // 配置信息 Settings.Builder builder = Settings.builder(); if (clustername == null) { builder.put("client.transport.ignore_cluster_name", true); } else { builder.put("cluster.name", clustername); } builder.put("client.transport.sniff", false); builder.put("client.transport.ping_timeout", "180s"); builder.put("thread_pool.search.size", Integer.parseInt(poolSize)); if(enableSecurity){ builder.put("xpack.security.user", userName + ":" + pwd); Settings esSetting = builder.build(); transportClient = new PreBuiltXPackTransportClient(esSetting); }else{ Settings esSetting = builder.build(); //配置信息Settings自定义,下面设置为EMPTY transportClient = new PreBuiltTransportClient(esSetting); } logger.info(String.format("es.client.servers = %s", servers)); HashMultimap hostAndPort = HashMultimap.create(); String[] hps = servers.split(","); String[] kv; for (String hp : hps) { kv = hp.split(":"); if (kv.length == 2) { hostAndPort.put(kv[0], Integer.parseInt(kv[1])); } else { logger.error(String.format("es.client.servers: %s 配置有误, 示例: host:port", hp)); } } Preconditions.checkArgument(!hostAndPort.isEmpty(), "es.client.servers 中无可用的服务节点配置。"); for (Map.Entry hp : hostAndPort.entries()) { transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(hp.getKey()), hp.getValue())); } logger.info("Elasticsearch客户端初始化成功。"); } catch (Exception e) { logger.error("elasticsearch TransportClient create error!!!", e); } return transportClient; } public static TransportClient getClient() { TransportClient transportClient = null; try { System.setProperty("es.set.netty.runtime.available.processors", "false"); // 配置信息 Settings.Builder builder = Settings.builder(); builder.put("cluster.name", "my-application"); builder.put("client.transport.sniff", false); builder.put("client.transport.ping_timeout", "180s"); builder.put("thread_pool.search.size", 6); Settings esSetting = builder.build(); //配置信息Settings自定义,下面设置为EMPTY transportClient = new PreBuiltTransportClient(esSetting); HashMultimap hostAndPort = HashMultimap.create(); String[] hps = "172.16.15.50:9300,172.16.15.51:9300,172.16.15.52:9300".split(","); String[] kv; for (String hp : hps) { kv = hp.split(":"); if (kv.length == 2) { hostAndPort.put(kv[0], Integer.parseInt(kv[1])); } } Preconditions.checkArgument(!hostAndPort.isEmpty(), "es.client.servers 中无可用的服务节点配置。"); for (Map.Entry hp : hostAndPort.entries()) { transportClient.addTransportAddress(new TransportAddress(InetAddress.getByName(hp.getKey()), hp.getValue())); } } catch (Exception e) { System.out.println("elasticsearch TransportClient create error!!!" + e); } return transportClient; } public static void main(String[] args) throws Exception { TransportClient transportClient = getClient(); Map record = Maps.newHashMap(); record.put("id", "0001"); record.put("originalMsg", "wrwerer234234"); record.put("table", "my_template_demo"); record.put("index", "my_template_demo"); record.put("location", new GeoPoint(30.505715, 114.19597)); record.put("speed", 20.5); record.put("age", 22); record.put("address", "武汉"); String s = JSON.toJSONString(record); System.out.println(s); transportClient.prepareIndex("my_template_demo20200110", "my_template_demo", "0001") .setSource(record) .get(); } }