package com.ssi.model; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ssi.constant.enums.VmsTosOrderStatusEnum; import com.ssi.entity.ScrollTrackLocationEntity; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.ssi.entity.vo.TaskStatusHistory; import com.ssi.response.SSIResponse; import com.ssi.utils.ElasticSearchUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.SortOrder; import org.springframework.stereotype.Component; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; /* * 默认index 名称 和type 名称一致。 * */ @Component @Slf4j public class VehicleElasticSearchModel extends ElasticSearchModel { public SSIResponse searchAllTrackLocations(String vin, String indexName, String indexRule, Set columns, String timeField, Long startTime, Long stopTime, String posType) { SearchRequestBuilder searchRequestBuilder = elasticSearchMetaModel.newSearchRequestBuilderInDayIndex( startTime, stopTime, indexName, 1, true, indexRule); Map res = Maps.newHashMap(); if (searchRequestBuilder != null) { int size = 10000; SearchResponse searchResponse; searchResponse = getSearchResponse(columns, searchRequestBuilder, timeField, vin, startTime, stopTime, size); long total = 0l; List> trips = Lists.newArrayList(); SearchHits hits = searchResponse.getHits(); SearchHit[] searchHits = hits.getHits(); //movingAveragePoints(searchHits, 10); filterJumpPoint(searchHits); while (searchHits != null && searchHits.length > 0) { long hitsTotal = hits.getTotalHits(); List> hitsTrip = Arrays.stream(searchHits).parallel().filter(hit -> { Map sourceAsMap = hit.getSourceAsMap(); if (sourceAsMap.get("abnormal") != null && (int) sourceAsMap.get("abnormal") == 1) { return false; } return true; }).map(hit -> { Map sourceAsMap = hit.getSourceAsMap(); if (posType != null) { locationTypeChange(sourceAsMap, posType, "longitude", "latitude"); } return sourceAsMap; }).collect(Collectors.toList()); trips.addAll(hitsTrip); if (searchHits.length == size) { searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId()) .setScroll(TimeValue.timeValueMinutes(5)).get(); searchHits = searchResponse.getHits().getHits(); } else { break; } } res.put("total", total); res.put("records", trips); } else { res.put("total", 0L); res.put("records", Lists.newArrayList()); } return SSIResponse.ok(res.get("records")); // return res; } /** * 移动平均轨迹平滑方法 */ public void movingAveragePoints(SearchHit[] searchHits, int length) { for (int i = length - 1; i < searchHits.length; i++) { Double sumLon = 0d; Double sumLat = 0d; for (int j = 0; j < length; j++) { SearchHit hit = searchHits[i - j]; Map sourceAsMap = hit.getSourceAsMap(); Double longitude = Double.parseDouble(sourceAsMap.get("longitude").toString()); Double latitude = Double.parseDouble(sourceAsMap.get("latitude").toString()); sumLon += longitude; sumLat += latitude; } Double avgLon = sumLon / length; Double avgLat = sumLat / length; SearchHit searchHit = searchHits[i - length + 1]; Map sourceAsMap = searchHit.getSourceAsMap(); sourceAsMap.put("longitude", avgLon); sourceAsMap.put("latitude", avgLat); double[] newLocation = {avgLon, avgLat}; sourceAsMap.put("location", newLocation); } } /** * 轨迹跳点过滤 */ public void filterJumpPoint(SearchHit[] searchHits) { SearchHit last1 = null; for (int i = 1; i < searchHits.length; i++) { SearchHit hit = searchHits[i]; Map sourceAsMap = hit.getSourceAsMap(); Long collectTime = Long.parseLong(sourceAsMap.get("collectTime").toString()); Double longitude = Double.parseDouble(sourceAsMap.get("longitude").toString()); Double latitude = Double.parseDouble(sourceAsMap.get("latitude").toString()); if (last1 == null) { last1 = hit; } else { Long lastCollectTime1 = Long.parseLong( last1.getSourceAsMap().get("collectTime").toString()); long diffTime = collectTime - lastCollectTime1; if (diffTime == 0) { diffTime = 1000L; } Double lastlongitude1 = Double.parseDouble( last1.getSourceAsMap().get("longitude").toString()); Double lastLatitude1 = Double.parseDouble( last1.getSourceAsMap().get("latitude").toString()); double diffLon = longitude - lastlongitude1; double diffLat = latitude - lastLatitude1; if (Math.abs(diffLat * 1000000) > 15 * diffTime / 1000) { sourceAsMap.put("abnormal", 1); continue; } if (Math.abs(diffLon * 100000) > 1.7 * diffTime / 1000) { sourceAsMap.put("abnormal", 1); continue; } last1 = hit; } } } private SearchResponse getSearchResponse(Set columns, SearchRequestBuilder searchRequestBuilder, String timeField, String vin, Long startTime, Long stopTime, int size) { if (columns != null && columns.size() > 0) { //确保行程关键字段不缺失 columns.add("id"); columns.add("vin"); columns.add(timeField); columns.add("longitude"); columns.add("latitude"); searchRequestBuilder.setFetchSource(columns.toArray(new String[columns.size()]), null); } BoolQueryBuilder queryBuilder = createBooleanQueryBuilderWithChinaLocation(); queryBuilder.mustNot(QueryBuilders.termQuery("keyOnStatus", -1)); if (vin != null) { queryBuilder.must(QueryBuilders.termQuery("vin", vin)); } RangeQueryBuilder rangeQueryBuilder = ElasticSearchUtil.createRangeQueryBuilder(timeField, startTime, stopTime); if (rangeQueryBuilder != null) { queryBuilder.must(rangeQueryBuilder); } return searchRequestBuilder.setQuery(queryBuilder) .addSort(timeField, SortOrder.ASC) .setScroll(TimeValue.timeValueMinutes(5)) .setSize(size) .get(); } /** * 深分页轨迹查询 */ public SSIResponse searchScrollTrackLocations(String indexName, String indexRule, String timeField, ScrollTrackLocationEntity scrollTrackLocationEntity) { Map res = Maps.newHashMap(); long t = System.currentTimeMillis(); String[] indexs = elasticSearchMetaModel.getIndexs(scrollTrackLocationEntity.getStartTime(), scrollTrackLocationEntity.getStopTime(), indexName, 1, true, indexRule); if (indexs.length > 0) { SearchResponse searchResponse; if (scrollTrackLocationEntity.getScrollId() == null) { SearchRequestBuilder requestBuilder = transportClient.prepareSearch(indexs) .setTypes(indexName); //设置筛选条件 BoolQueryBuilder queryBuilder = addQueryCondition(createBooleanQueryBuilderWithChinaLocation(), scrollTrackLocationEntity.getVin(), timeField, scrollTrackLocationEntity.getStartTime(), scrollTrackLocationEntity.getStopTime()); queryBuilder.mustNot(QueryBuilders.termQuery("keyOnStatus", -1)); requestBuilder.setQuery(queryBuilder); //设置抓取字段 String[] columns = scrollTrackLocationEntity.getColumns(); Set returnColumns = Sets.newHashSet("vin", timeField, "longitude", "latitude", "collectTime"); if (columns != null) { Arrays.stream(columns).forEach(returnColumns::add); } requestBuilder.setFetchSource(returnColumns.toArray(new String[returnColumns.size()]), null); //检查是否需要排序 if (scrollTrackLocationEntity.isSortByTime()) { requestBuilder.addSort(timeField, SortOrder.ASC); } else { requestBuilder.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); } requestBuilder.setScroll(TimeValue.timeValueMinutes(5)); requestBuilder.setSize(scrollTrackLocationEntity.getMaxSizePartition()); searchResponse = requestBuilder.get(); } else { searchResponse = transportClient.prepareSearchScroll( scrollTrackLocationEntity.getScrollId()) .setScroll(TimeValue.timeValueMinutes(5)).get(); } List> maps = Lists.newLinkedList(); //遍历结果 SearchHits hits = searchResponse.getHits(); SearchHit[] hitArray = hits.getHits(); Map sourceAsMap; //跳点步长 int stepLength = scrollTrackLocationEntity.getStepLength(); for (int i = 0; i < hitArray.length; i = i + stepLength) { sourceAsMap = hitArray[i].getSourceAsMap(); if (scrollTrackLocationEntity.getPosType() != null) { locationTypeChange(sourceAsMap, scrollTrackLocationEntity.getPosType(), "longitude", "latitude"); } maps.add(sourceAsMap); } if (maps.size() > 0) { res.put("total", hits.totalHits); res.put("records", maps); res.put("size", maps.size()); } else { res = emptySearchHitResult(); } res.put("over", (hitArray.length == 0) || (hitArray.length == hits.totalHits)); res.put("scrollId", searchResponse.getScrollId()); } return SSIResponse.ok(res); // return res; } private BoolQueryBuilder addQueryCondition(BoolQueryBuilder builder, String vin, String timeField, Long startTime, Long stopTime) { if (vin != null) { builder.must(QueryBuilders.termsQuery("vin", vin)); } if (startTime != null || stopTime != null) { RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(timeField); if (startTime != null) { rangeQueryBuilder.gte(startTime); } if (stopTime != null) { rangeQueryBuilder.lte(stopTime); } builder.must(rangeQueryBuilder); } return builder; } public Map getOfflineVehicleList(String indexName, String timeField, String vehicleCompany, String vehicleType, Integer offlineDays, int page, int size) { SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch( String.format("%s_latest", indexName)); RangeQueryBuilder rangeQueryBuilder = null; BoolQueryBuilder queryBuilder = createBooleanQueryBuilderWithChinaLocation(); if (offlineDays != null) { rangeQueryBuilder = QueryBuilders.rangeQuery(timeField); Long endTime = System.currentTimeMillis() - offlineDays * 24 * 60 * 60 * 1000; rangeQueryBuilder.lte(endTime); } if (rangeQueryBuilder != null) { queryBuilder.must(rangeQueryBuilder); } if (StringUtils.isNotBlank(vehicleType)) { queryBuilder.must(QueryBuilders.termQuery("vehicleType", vehicleType)); } if (StringUtils.isNotBlank(vehicleCompany)) { queryBuilder.must(QueryBuilders.termQuery("vehicleCompany", vehicleCompany)); } int start = (page - 1) * 10; SearchResponse searchResponse = searchRequestBuilder.setQuery(queryBuilder) .addSort(timeField, SortOrder.ASC) .setSize(size) .setFrom(start) .get(); return extractSearchHits(searchResponse.getHits()); } /** * 获取任务历史数据 */ public List searchTaskHistory(String taskNo, String year) { SearchRequestBuilder requestBuilder = transportClient.prepareSearch( "harbor_order_status_" + year).setTypes("harbor_order_status"); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.termQuery("sn", taskNo)); boolQueryBuilder.must(QueryBuilders.existsQuery("status")); Set returnColumns = Sets.newHashSet("vin", "sn", "status", "collectTime", "errorMessage"); SearchResponse response = requestBuilder.setQuery(boolQueryBuilder) .setFetchSource(returnColumns.toArray(new String[returnColumns.size()]), null) .addSort("collectTime", SortOrder.ASC) .get(); SearchHit[] hits = response.getHits().getHits(); List list = Stream.of(hits).map(hit -> { String sourceAsString = hit.getSourceAsString(); TaskStatusHistory taskStatusHistory = JSON.parseObject(sourceAsString, TaskStatusHistory.class); VmsTosOrderStatusEnum anEnum = VmsTosOrderStatusEnum.find(taskStatusHistory.getStatus()); if (anEnum != null) { taskStatusHistory.setStatusDescription(anEnum.getDescription()); } return taskStatusHistory; }).collect(Collectors.toList()); return list; } /** * 批量获取任务历史数据 */ public Map> searchTaskHistory(List taskNoList, String year) { SearchRequestBuilder requestBuilder = transportClient.prepareSearch( "harbor_order_status_" + year).setTypes("harbor_order_status"); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.termsQuery("sn", taskNoList)); boolQueryBuilder.must(QueryBuilders.existsQuery("status")); Set returnColumns = Sets.newHashSet("vin", "sn", "status", "collectTime", "errorMessage"); Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10L)); SearchResponse response = requestBuilder.setQuery(boolQueryBuilder) .setFetchSource(returnColumns.toArray(new String[returnColumns.size()]), null) .addSort("collectTime", SortOrder.ASC) .setScroll(scroll) .setSize(2000) .get(); //结果集为升序排序 SearchHit[] hits = response.getHits().getHits(); Map> statusMap = new HashMap<>(); boolean needContinue = true; while (hits != null && hits.length > 0 && needContinue) { for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); TaskStatusHistory taskStatusHistory = JSON.parseObject(sourceAsString, TaskStatusHistory.class); VmsTosOrderStatusEnum anEnum = VmsTosOrderStatusEnum.find(taskStatusHistory.getStatus()); if (anEnum != null) { taskStatusHistory.setStatusDescription(anEnum.getDescription()); }else{ //移除未知状态 continue; } if (statusMap.get(taskStatusHistory.getSn()) == null) { List list = new ArrayList<>(); list.add(taskStatusHistory); statusMap.put(taskStatusHistory.getSn(), list); } else { List list = statusMap.get(taskStatusHistory.getSn()); list.add(taskStatusHistory); statusMap.put(taskStatusHistory.getSn(), list); } } if (hits.length == 2000) { response = transportClient.prepareSearchScroll(response.getScrollId()) .setScroll(TimeValue.timeValueMinutes(5)).get(); hits = response.getHits().getHits(); } else { needContinue = false; } } transportClient.prepareClearScroll(); return statusMap; } /** * 获取车辆最近车况数据 */ public Map searchLatestVehicleData(String vin) { SearchRequestBuilder requestBuilder = transportClient.prepareSearch("vms_igv_vehicle_latest") .setTypes("vms_igv_vehicle"); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.termQuery("vin", vin)); SearchResponse response = requestBuilder.setQuery(boolQueryBuilder) .get(); SearchHit[] hits = response.getHits().getHits(); if (hits != null && hits.length > 0) { SearchHit hit = hits[0]; String jsonStr = hit.getSourceAsString(); JSONObject object = JSON.parseObject(jsonStr); return object; } return null; } }