|
@@ -0,0 +1,348 @@
|
|
|
+package com.chelvc.framework.elasticsearch;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.function.Function;
|
|
|
+
|
|
|
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
|
|
|
+import co.elastic.clients.elasticsearch._types.Conflicts;
|
|
|
+import co.elastic.clients.elasticsearch._types.Script;
|
|
|
+import co.elastic.clients.elasticsearch._types.ScriptLanguage;
|
|
|
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
|
|
|
+import co.elastic.clients.elasticsearch.core.BulkRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.BulkResponse;
|
|
|
+import co.elastic.clients.elasticsearch.core.CountRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.CountResponse;
|
|
|
+import co.elastic.clients.elasticsearch.core.CreateRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.CreateResponse;
|
|
|
+import co.elastic.clients.elasticsearch.core.DeleteRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.DeleteResponse;
|
|
|
+import co.elastic.clients.elasticsearch.core.ExistsRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.GetRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.GetResponse;
|
|
|
+import co.elastic.clients.elasticsearch.core.IndexRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.IndexResponse;
|
|
|
+import co.elastic.clients.elasticsearch.core.MgetRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.MgetResponse;
|
|
|
+import co.elastic.clients.elasticsearch.core.SearchRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.SearchResponse;
|
|
|
+import co.elastic.clients.elasticsearch.core.UpdateByQueryRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.UpdateByQueryResponse;
|
|
|
+import co.elastic.clients.elasticsearch.core.UpdateRequest;
|
|
|
+import co.elastic.clients.elasticsearch.core.UpdateResponse;
|
|
|
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
|
|
|
+import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
|
|
|
+import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
|
|
|
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
|
|
|
+import co.elastic.clients.elasticsearch.core.bulk.UpdateOperation;
|
|
|
+import co.elastic.clients.json.JsonData;
|
|
|
+import co.elastic.clients.transport.ElasticsearchTransport;
|
|
|
+import co.elastic.clients.transport.endpoints.BooleanResponse;
|
|
|
+import co.elastic.clients.util.ObjectBuilder;
|
|
|
+import com.chelvc.framework.common.util.ObjectUtils;
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.google.common.collect.Maps;
|
|
|
+import lombok.NonNull;
|
|
|
+
|
|
|
+/**
|
|
|
+ * ES操作接口默认实现
|
|
|
+ *
|
|
|
+ * @author Woody
|
|
|
+ * @date 2024/10/17
|
|
|
+ */
|
|
|
+public class DefaultElasticsearchHandler implements ElasticsearchHandler {
|
|
|
+ private final ElasticsearchClient client;
|
|
|
+
|
|
|
+ public DefaultElasticsearchHandler(@NonNull ElasticsearchTransport transport) {
|
|
|
+ this.client = new ElasticsearchClient(transport);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ElasticsearchClient getClient() {
|
|
|
+ return this.client;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public CreateResponse create(@NonNull Object document) {
|
|
|
+ ModelContext context = ModelContext.of(document.getClass());
|
|
|
+ CreateRequest<?> request = new CreateRequest.Builder<>().index(context.getIndex())
|
|
|
+ .id(context.getIdentity(document)).document(document).build();
|
|
|
+ try {
|
|
|
+ return this.client.create(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> BulkResponse create(@NonNull Collection<T> documents) {
|
|
|
+ List<BulkOperation> operations = Lists.newArrayListWithCapacity(documents.size());
|
|
|
+ documents.forEach(document -> {
|
|
|
+ ModelContext context = ModelContext.of(document.getClass());
|
|
|
+ CreateOperation<?> create = new CreateOperation.Builder<>().index(context.getIndex())
|
|
|
+ .id(context.getIdentity(document)).document(document).build();
|
|
|
+ BulkOperation operation = new BulkOperation.Builder().create(create).build();
|
|
|
+ operations.add(operation);
|
|
|
+ });
|
|
|
+ BulkRequest request = new BulkRequest.Builder().operations(operations).build();
|
|
|
+ try {
|
|
|
+ return this.client.bulk(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public DeleteResponse delete(@NonNull Class<?> model, @NonNull String id) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ DeleteRequest request = new DeleteRequest.Builder().index(context.getIndex()).id(id).build();
|
|
|
+ try {
|
|
|
+ return this.client.delete(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public BulkResponse delete(@NonNull Class<?> model, @NonNull Collection<String> ids) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ List<BulkOperation> operations = Lists.newArrayListWithCapacity(ids.size());
|
|
|
+ ids.forEach(id -> {
|
|
|
+ DeleteOperation delete = new DeleteOperation.Builder().index(context.getIndex()).id(id).build();
|
|
|
+ BulkOperation operation = new BulkOperation.Builder().delete(delete).build();
|
|
|
+ operations.add(operation);
|
|
|
+ });
|
|
|
+ BulkRequest request = new BulkRequest.Builder().operations(operations).build();
|
|
|
+ try {
|
|
|
+ return this.client.bulk(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> UpdateResponse<T> update(@NonNull T document) {
|
|
|
+ return this.update(document, null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> BulkResponse update(@NonNull Collection<T> documents) {
|
|
|
+ List<BulkOperation> operations = Lists.newArrayListWithCapacity(documents.size());
|
|
|
+ documents.forEach(document -> {
|
|
|
+ ModelContext context = ModelContext.of(document.getClass());
|
|
|
+ UpdateOperation<T, Object> update = new UpdateOperation.Builder<T, Object>().index(context.getIndex())
|
|
|
+ .id(context.getIdentity(document)).action(a -> a.doc(document)).retryOnConflict(3).build();
|
|
|
+ BulkOperation operation = new BulkOperation.Builder().update(update).build();
|
|
|
+ operations.add(operation);
|
|
|
+ });
|
|
|
+ BulkRequest request = new BulkRequest.Builder().operations(operations).build();
|
|
|
+ try {
|
|
|
+ return this.client.bulk(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public <T> UpdateResponse<T> update(@NonNull T document, @NonNull Long primary, @NonNull Long sequence) {
|
|
|
+ Class<T> model = (Class<T>) document.getClass();
|
|
|
+ String id = ModelContext.of(model).getIdentity(document);
|
|
|
+ return this.update(model, id, document, primary, sequence);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> UpdateResponse<T> update(@NonNull Class<T> model, @NonNull String id, @NonNull Object document) {
|
|
|
+ return this.update(model, id, document, null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> UpdateByQueryResponse update(@NonNull Class<T> model, @NonNull Query query, @NonNull T document) {
|
|
|
+ return this.update(model, query, ObjectUtils.object2map(document));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public UpdateByQueryResponse update(@NonNull Class<?> model, @NonNull Query query, @NonNull String field,
|
|
|
+ Object value) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ String source = "ctx._source." + field + (value == null ? "=null" : ("=params." + field));
|
|
|
+ Map<String, JsonData> params = value == null ? Collections.emptyMap() :
|
|
|
+ ImmutableMap.of(field, JsonData.of(value));
|
|
|
+ Script script = new Script.Builder().lang(ScriptLanguage.Painless).source(source).params(params).build();
|
|
|
+ UpdateByQueryRequest request = new UpdateByQueryRequest.Builder().index(context.getIndex()).script(script)
|
|
|
+ .query(query).conflicts(Conflicts.Proceed).build();
|
|
|
+ try {
|
|
|
+ return this.client.updateByQuery(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public UpdateByQueryResponse update(@NonNull Class<?> model, @NonNull Query query,
|
|
|
+ @NonNull Map<String, ?> parameters) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ StringBuilder source = new StringBuilder();
|
|
|
+ Map<String, JsonData> params = Maps.newHashMapWithExpectedSize(parameters.size());
|
|
|
+ parameters.forEach((key, value) -> {
|
|
|
+ if (source.length() > 0) {
|
|
|
+ source.append(";");
|
|
|
+ }
|
|
|
+ if (value == null) {
|
|
|
+ source.append("ctx._source.").append(key).append("=null");
|
|
|
+ } else {
|
|
|
+ source.append("ctx._source.").append(key).append("=params.").append(key);
|
|
|
+ params.put(key, JsonData.of(value));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ Script script = new Script.Builder().lang(ScriptLanguage.Painless).source(source.toString())
|
|
|
+ .params(params).build();
|
|
|
+ UpdateByQueryRequest request = new UpdateByQueryRequest.Builder().index(context.getIndex()).script(script)
|
|
|
+ .query(query).conflicts(Conflicts.Proceed).build();
|
|
|
+ try {
|
|
|
+ return this.client.updateByQuery(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> UpdateResponse<T> update(@NonNull Class<T> model, @NonNull String id, @NonNull Object document,
|
|
|
+ Long primary, Long sequence) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ UpdateRequest<T, ?> request = new UpdateRequest.Builder<T, Object>().index(context.getIndex()).id(id)
|
|
|
+ .doc(document).ifPrimaryTerm(primary).ifSeqNo(sequence).retryOnConflict(3).build();
|
|
|
+ try {
|
|
|
+ return this.client.update(request, model);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public IndexResponse index(@NonNull Object document) {
|
|
|
+ return this.index(document, null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public IndexResponse index(@NonNull Class<?> model, @NonNull String id, @NonNull Object document) {
|
|
|
+ return this.index(model, id, document, null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public IndexResponse index(@NonNull Object document, Long primary, Long sequence) {
|
|
|
+ Class<?> model = document.getClass();
|
|
|
+ String id = ModelContext.of(model).getIdentity(document);
|
|
|
+ return this.index(model, id, document, primary, sequence);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public IndexResponse index(@NonNull Class<?> model, @NonNull String id, @NonNull Object document, Long primary,
|
|
|
+ Long sequence) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ IndexRequest<?> request = new IndexRequest.Builder<>().index(context.getIndex())
|
|
|
+ .id(id).document(document).ifPrimaryTerm(primary).ifSeqNo(sequence).build();
|
|
|
+ try {
|
|
|
+ return this.client.index(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> BulkResponse index(@NonNull Collection<T> documents) {
|
|
|
+ return this.index(documents, null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> BulkResponse index(@NonNull Collection<T> documents, Long primary, Long sequence) {
|
|
|
+ List<BulkOperation> operations = Lists.newArrayListWithCapacity(documents.size());
|
|
|
+ documents.forEach(document -> {
|
|
|
+ ModelContext context = ModelContext.of(document.getClass());
|
|
|
+ IndexOperation<?> index = new IndexOperation.Builder<>().index(context.getIndex())
|
|
|
+ .id(context.getIdentity(document)).document(document).ifPrimaryTerm(primary)
|
|
|
+ .ifSeqNo(sequence).build();
|
|
|
+ BulkOperation operation = new BulkOperation.Builder().index(index).build();
|
|
|
+ operations.add(operation);
|
|
|
+ });
|
|
|
+ BulkRequest request = new BulkRequest.Builder().operations(operations).build();
|
|
|
+ try {
|
|
|
+ return this.client.bulk(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public CountResponse count(@NonNull Class<?> model, @NonNull Query query) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ CountRequest request = new CountRequest.Builder().index(context.getIndex()).query(query).build();
|
|
|
+ try {
|
|
|
+ return this.client.count(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public BooleanResponse exists(@NonNull Class<?> model, @NonNull String id) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ ExistsRequest request = new ExistsRequest.Builder().index(context.getIndex()).id(id).build();
|
|
|
+ try {
|
|
|
+ return this.client.exists(request);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> GetResponse<T> get(@NonNull Class<T> model, @NonNull String id) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ GetRequest request = new GetRequest.Builder().index(context.getIndex()).id(id).build();
|
|
|
+ try {
|
|
|
+ return this.client.get(request, model);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> MgetResponse<T> get(@NonNull Class<T> model, @NonNull List<String> ids) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ MgetRequest request = new MgetRequest.Builder().index(context.getIndex()).ids(ids).build();
|
|
|
+ try {
|
|
|
+ return this.client.mget(request, model);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> SearchResponse<T> search(@NonNull Class<T> model, @NonNull Query query) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ SearchRequest request = new SearchRequest.Builder().index(context.getIndex()).query(query).build();
|
|
|
+ return this.search(model, request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> SearchResponse<T> search(@NonNull Class<T> model, @NonNull Query query,
|
|
|
+ @NonNull Function<SearchRequest.Builder, ObjectBuilder<SearchRequest>> function) {
|
|
|
+ ModelContext context = ModelContext.of(model);
|
|
|
+ SearchRequest.Builder builder = new SearchRequest.Builder().index(context.getIndex()).query(query);
|
|
|
+ SearchRequest request = function.apply(builder).build();
|
|
|
+ return this.search(model, request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public <T> SearchResponse<T> search(@NonNull Class<T> model, @NonNull SearchRequest request) {
|
|
|
+ try {
|
|
|
+ return this.client.search(request, model);
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|