常熟建设网站,开无货源网店哪个平台好,学院网站建设管理,企业管理系统er图一、创建索引1、接收创建索引的RestCreateIndexAction2、Master执行创建索引的类TransportCreateIndexAction3、创建一个任务(创建索引的)#xff0c;放入一个队列4、执行创建索引时会先搜索模版5、创建索引的build#xff0c;更新集群状态(1) initializeEmpty初始化索引的分… 一、创建索引1、接收创建索引的RestCreateIndexAction2、Master执行创建索引的类TransportCreateIndexAction3、创建一个任务(创建索引的)放入一个队列4、执行创建索引时会先搜索模版5、创建索引的build更新集群状态(1) initializeEmpty初始化索引的分片副本等(2) 添加索引信息到indicesRouting(3) 创建一个新的集群状态返回 二、查询索引信息1 接收查询索引信息的RestCreateIndexAction2、实际查询的TransportGetIndexAction.java的doMasterOperation方法(1)查询的索引信息都是从集群状态中ImmutableOpenMapString, IndexMetadata得到的 至于下面创建索引和查询索引信息的入口如何找到的可以Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码
一、创建索引
1、接收创建索引的RestCreateIndexAction
public class RestCreateIndexAction extends BaseRestHandler {Overridepublic ListRoute routes() {return List.of(new Route(PUT, /{index}));}Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {CreateIndexRequest createIndexRequest;//下面是区分es7和es8创建索引的结构有一点区别但是不用细究if (request.getRestApiVersion() RestApiVersion.V_7) {createIndexRequest prepareRequestV7(request);} else {createIndexRequest prepareRequest(request);}//这个会通知让master执行创建索引操作return channel - client.admin().indices().create(createIndexRequest, new RestToXContentListener(channel));}
} channel - client.admin().indices().create(createIndexRequest, new RestToXContentListener(channel))至于这个如何通知master执行的可以看一下Elasticsearch 8.9 Master节点处理请求源码
2、Master执行创建索引的类TransportCreateIndexAction
Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListenerCreateIndexResponse listener) {createIndexService.createIndex(updateRequest,listener.map(response - new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));}/ *在集群状态下创建索引并等待指定数量的分片副本变为活动状态如 {link CreateIndexClusterStateUpdateRequestwaitForActiveShards} 中指定然后再在侦听器上发送响应。* 如果索引创建已成功应用于集群状态则 {link ShardsAcknowledgedResponseisAcknowledged} 将返回 true* 否则它将返回 false并且不会等待启动的分片{link ShardsAcknowledgedResponseisShardsAcknowledged} 也将为 false。* 如果集群状态下的索引创建成功并且在超时之前启动了必要的分片副本则 {link ShardsAcknowledgedResponseisShardsAcknowledged} 将返回 true* 否则如果操作超时则返回 false。*/public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListenerShardsAcknowledgedResponse listener) {logger.trace(createIndex[{}], request);onlyCreateIndex(request, listener.delegateFailureAndWrap((delegate, response) - {//如果确认创建索引则等待活动分片的数量满足要求。if (response.isAcknowledged()) {//省略代码//等待索引的分片活跃ActiveShardsObserver.waitForActiveShards(clusterService,new String[] { request.index() },request.waitForActiveShards(),request.ackTimeout(),delegate.map(shardsAcknowledged - {//省略代码return ShardsAcknowledgedResponse.of(true, shardsAcknowledged);}));} else {//如果超时等待分片启动记录日志。//最后返回一个ShardsAcknowledgedResponse对象表示索引创建没有成功。logger.trace(index creation not acknowledged for [{}], request);delegate.onResponse(ShardsAcknowledgedResponse.NOT_ACKNOWLEDGED);}}));}3、创建一个任务(创建索引的)放入一个队列 private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListenerAcknowledgedResponse listener) {//省略代码//调用submitUnbatchedTask方法提交一个任务。该任务是一个AckedClusterStateUpdateTask对象该对象继承自ClusterStateUpdateTask类用于执行集群状态更新任务。submitUnbatchedTask(create-index [ request.index() ], cause [ request.cause() ],new AckedClusterStateUpdateTask(Priority.URGENT, request, delegate.clusterStateUpdate()) {//在execute方法中会调用applyCreateIndexRequest方法来处理创建索引的请求。Overridepublic ClusterState execute(ClusterState currentState) throws Exception {return applyCreateIndexRequest(currentState, request, false, null, delegate.reroute());}//省略代码});}private void submitUnbatchedTask(SuppressWarnings(SameParameterValue) String source, ClusterStateUpdateTask task) {clusterService.submitUnbatchedStateUpdateTask(source, task);}DeprecatedSuppressForbidden(reason this method is itself forbidden)public void submitUnbatchedStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {masterService.submitUnbatchedStateUpdateTask(source, updateTask);}
Deprecatedpublic void submitUnbatchedStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {createTaskQueue(unbatched, updateTask.priority(), unbatchedExecutor).submitTask(source, updateTask, updateTask.timeout());}createTaskQueue返回的队列再执行submitTask方法 public T extends ClusterStateTaskListener MasterServiceTaskQueueT createTaskQueue(String name,Priority priority,ClusterStateTaskExecutorT executor) {return new BatchingTaskQueue(name,this::executeAndPublishBatch,insertionIndexSupplier,queuesByPriority.get(priority),executor,threadPool);}Overridepublic void submitTask(String source, T task, Nullable TimeValue timeout) {final var executed new AtomicBoolean(false);final Scheduler.Cancellable timeoutCancellable;if (timeout ! null timeout.millis() 0) {timeoutCancellable threadPool.schedule(new TaskTimeoutHandler(timeout, source, executed, task),timeout,ThreadPool.Names.GENERIC);} else {timeoutCancellable null;}queue.add(new Entry(source,task,insertionIndexSupplier.getAsLong(),threadPool.relativeTimeInMillis(),executed,threadPool.getThreadContext().newRestorableContext(true),timeoutCancellable));if (queueSize.getAndIncrement() 0) {perPriorityQueue.execute(processor);}}4、执行创建索引时会先搜索模版
public ClusterState applyCreateIndexRequest(ClusterState currentState,CreateIndexClusterStateUpdateRequest request,boolean silent,BiConsumerMetadata.Builder, IndexMetadata metadataTransformer,ActionListenerVoid rerouteListener) throws Exception {//对请求的设置进行规范化和验证。//省略代码//并尝试匹配v2模板final String v2Template MetadataIndexTemplateService.findV2Template(currentState.metadata(),name,isHiddenFromRequest ! null isHiddenFromRequest);//如果找到了v2模板则使用该模板和请求指定的设置创建索引。if (v2Template ! null) {return applyCreateIndexRequestWithV2Template(currentState,request,silent,v2Template,metadataTransformer,rerouteListener);} else { //没有找到v2模板则检查v1模板如果没有找到任何模板则使用请求指定的索引设置创建索引。final ListIndexTemplateMetadata v1Templates MetadataIndexTemplateService.findV1Templates(currentState.metadata(),request.index(),isHiddenFromRequest);//如果v1Templates不存在则根据请求指定的索引设置创建索引(下面的v1Templates的size为0)return applyCreateIndexRequestWithV1Templates(currentState,request,silent,v1Templates,metadataTransformer,rerouteListener);}}这里是因为模版版本的不同因为模版不一样所以需要把请求中的索引信息和模版中的索引信息合并一下最后调用applyCreateIndexWithTemporaryService创建索引 private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState currentState,final CreateIndexClusterStateUpdateRequest request,final boolean silent,final String templateName,final BiConsumerMetadata.Builder, IndexMetadata metadataTransformer,final ActionListenerVoid rerouteListener) throws Exception {//创建索引return applyCreateIndexWithTemporaryService(currentState,request,silent,null,tmpImd,mappings,indexService - resolveAndValidateAliases(request.index(),// data stream aliases are created separately in MetadataCreateDataStreamService::createDataStreamisDataStream ? Set.of() : request.aliases(),isDataStream ? List.of() : MetadataIndexTemplateService.resolveAliases(currentState.metadata(), templateName),currentState.metadata(),xContentRegistry,// the context is used ony for validation so its fine to pass fake values for the shard id and the current timestampindexService.newSearchExecutionContext(0, 0, null, () - 0L, null, emptyMap()),IndexService.dateMathExpressionResolverAt(request.getNameResolvedAt()),systemIndices::isSystemName),Collections.singletonList(templateName),metadataTransformer,rerouteListener);}private ClusterState applyCreateIndexRequestWithV1Templates(final ClusterState currentState,final CreateIndexClusterStateUpdateRequest request,final boolean silent,final ListIndexTemplateMetadata templates,final BiConsumerMetadata.Builder, IndexMetadata metadataTransformer,final ActionListenerVoid rerouteListener) throws Exception {//应用临时服务创建索引applyCreateIndexWithTemporaryService包括当前状态currentState、请求request、是否静默silent、临时索引元数据tmpImd、索引映射mappings、解析和验证别名等。return applyCreateIndexWithTemporaryService(currentState,request,silent,null,tmpImd,mappings null ? List.of() : List.of(mappings),indexService - resolveAndValidateAliases(request.index(),request.aliases(),MetadataIndexTemplateService.resolveAliases(templates),currentState.metadata(),// the context is only used for validation so its fine to pass fake values for the// shard id and the current timestampxContentRegistry,indexService.newSearchExecutionContext(0, 0, null, () - 0L, null, emptyMap()),IndexService.dateMathExpressionResolverAt(request.getNameResolvedAt()),systemIndices::isSystemName),templates.stream().map(IndexTemplateMetadata::getName).collect(toList()),metadataTransformer,rerouteListener);
}5、创建索引的build更新集群状态 private ClusterState applyCreateIndexWithTemporaryService(final ClusterState currentState,final CreateIndexClusterStateUpdateRequest request,final boolean silent,final IndexMetadata sourceMetadata,final IndexMetadata temporaryIndexMeta,final ListCompressedXContent mappings,final FunctionIndexService, ListAliasMetadata aliasSupplier,final ListString templatesApplied,final BiConsumerMetadata.Builder, IndexMetadata metadataTransformer,final ActionListenerVoid rerouteListener) throws Exception {//省略代码//创建索引并返回更新后的集群状态,在创建索引时会考虑一些参数如阻塞状态、索引元数据、分配策略等ClusterState updated clusterStateCreateIndex(currentState,request.blocks(),indexMetadata,metadataTransformer,allocationService.getShardRoutingRoleStrategy());//省略代码 return updated;
}/*** 应用提供的块将索引创建为群集状态。最终群集状态将包含基于活动节点的更新路由表。*/static ClusterState clusterStateCreateIndex(ClusterState currentState, //集群状态SetClusterBlock clusterBlocks, //集群阻塞IndexMetadata indexMetadata, //索引元数据BiConsumerMetadata.Builder, IndexMetadata metadataTransformer, //元数据转换器ShardRoutingRoleStrategy shardRoutingRoleStrategy //分片路由角色策略) {//是否存在元数据转换器来创建新的元数据对象final Metadata newMetadata;if (metadataTransformer ! null) {//如果存在元数据转换器则使用转换器将索引元数据应用到当前的元数据上生成新的元数据对象Metadata.Builder builder Metadata.builder(currentState.metadata()).put(indexMetadata, false);metadataTransformer.accept(builder, indexMetadata);newMetadata builder.build();} else {//否则直接将索引元数据添加到当前的元数据中newMetadata currentState.metadata().withAddedIndex(indexMetadata);}//索引名称和集群阻塞创建集群阻塞构建器并更新阻塞信息String indexName indexMetadata.getIndex().getName();ClusterBlocks.Builder blocks createClusterBlocksBuilder(currentState, indexName, clusterBlocks);blocks.updateBlocks(indexMetadata);//使用新的元数据对象和更新后的集群阻塞信息构建一个更新后的集群状态对象ClusterState updatedState ClusterState.builder(currentState).blocks(blocks).metadata(newMetadata).build();//根据分片路由角色策略和更新后的集群状态的路由表构建器将索引添加为新的索引RoutingTable.Builder routingTableBuilder RoutingTable.builder(shardRoutingRoleStrategy, updatedState.routingTable()).addAsNew(updatedState.metadata().index(indexName));//并返回更新后的集群状态对象return ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build();}注意上面的.addAsNew(updatedState.metadata().index(indexName))
//向构建器中添加一个新的索引元数据,其中initializeAsNew会初始化索引初始化分片和副本信息public Builder addAsNew(IndexMetadata indexMetadata) {//检查索引的状态是否为OPENif (indexMetadata.getState() IndexMetadata.State.OPEN) {//创建一个新的IndexRoutingTable.Builder对象,并使用给定的索引元数据进行初始化。然后将该对象添加到构建器中,并返回构建器本身org.elasticsearch.cluster.routing.IndexRoutingTable.Builder indexRoutingBuilder new org.elasticsearch.cluster.routing.IndexRoutingTable.Builder(shardRoutingRoleStrategy,indexMetadata.getIndex()).initializeAsNew(indexMetadata);//添加索引add(indexRoutingBuilder);}return this;}其中initializeAsNew初始化这个索引包括索引在集群上的分片节点等
(1) initializeEmpty初始化索引的分片副本等 public Builder initializeAsNew(IndexMetadata indexMetadata) {return initializeEmpty(indexMetadata, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null), null);}private Builder initializeEmpty(IndexMetadata indexMetadata,//索引元数据UnassignedInfo unassignedInfo,//未分配信息Nullable IndexRoutingTable previousIndexRoutingTable //之前的索引路由表作为参数。) {assert indexMetadata.getIndex().equals(index);assert previousIndexRoutingTable null || previousIndexRoutingTable.size() indexMetadata.getNumberOfShards();//如果已经存在分片shards则抛出异常表示无法初始化具有新分片的索引if (shards ! null) {throw new IllegalStateException(trying to initialize an index with fresh shards, but already has shards created);}//根据索引元数据的分片数量创建一个索引分片路由表数组shards new IndexShardRoutingTable.Builder[indexMetadata.getNumberOfShards()];//遍历要分配的分片for (int shardNumber 0; shardNumber indexMetadata.getNumberOfShards(); shardNumber) {//根据索引和分片编号创建一个分片ID并获取之前的节点信息ShardId shardId new ShardId(index, shardNumber);final var previousNodes getPreviousNodes(previousIndexRoutingTable, shardNumber);//确定主分片的恢复源RecoverySource类型final RecoverySource primaryRecoverySource;if (indexMetadata.inSyncAllocationIds(shardNumber).isEmpty() false) {// we have previous valid copies for this shard. use them for recoveryprimaryRecoverySource ExistingStoreRecoverySource.INSTANCE;} else if (indexMetadata.getResizeSourceIndex() ! null) {// this is a new index but the initial shards should merged from another indexprimaryRecoverySource LocalShardsRecoverySource.INSTANCE;} else {// a freshly created index with no restrictionprimaryRecoverySource EmptyStoreRecoverySource.INSTANCE;}//使用索引分片路由表构建器IndexShardRoutingTable.Builder创建主分片和副本分片的路由信息并添加到索引分片路由表中IndexShardRoutingTable.Builder indexShardRoutingBuilder IndexShardRoutingTable.builder(shardId);for (int i 0; i indexMetadata.getNumberOfReplicas(); i) {boolean primary i 0;indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId,primary,primary ? primaryRecoverySource : PeerRecoverySource.INSTANCE,withLastAllocatedNodeId(unassignedInfo, previousNodes, i),shardRoutingRoleStrategy.newEmptyRole(i)));}//构建好的索引分片路由表数组赋值给shardsshards[shardNumber] indexShardRoutingBuilder;}return this;}(2) 添加索引信息到indicesRouting public Builder add(IndexRoutingTable.Builder indexRoutingTableBuilder) {add(indexRoutingTableBuilder.build());return this;
}private ImmutableOpenMap.BuilderString, IndexRoutingTable indicesRouting;
public Builder add(IndexRoutingTable indexRoutingTable) {if (indicesRouting null) {throw new IllegalStateException(once build is called the builder cannot be reused);}indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable);return this;
}(3) 创建一个新的集群状态返回
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build()routingTable的实现 public Builder routingTable(RoutingTable routingTable) {this.routingTable routingTable;return this;}routingTableBuilder.build()的实现 public RoutingTable build() {if (indicesRouting null) {throw new IllegalStateException(once build is called the builder cannot be reused);}RoutingTable table new RoutingTable(version, indicesRouting.build());indicesRouting null;return table;}最后一个build() public ClusterState build() {if (UNKNOWN_UUID.equals(uuid)) {uuid UUIDs.randomBase64UUID();}final RoutingNodes routingNodes;//是否可以重用之前状态的路由节点routingNodes,如果条件满足则将之前状态的路由节点赋值给routingNodes否则将routingNodes设为null。if (previous ! null routingTable.indicesRouting() previous.routingTable.indicesRouting() nodes previous.nodes) {// routing table contents and nodes havent changed so we can try to reuse the previous states routing nodes which are// expensive to computeroutingNodes previous.routingNodes;} else {routingNodes null;}return new ClusterState(clusterName,version,uuid,metadata,routingTable,nodes,transportVersions,blocks,customs.build(),fromDiff,routingNodes);}二、查询索引信息
1 接收查询索引信息的RestCreateIndexAction
public class RestGetIndicesAction extends BaseRestHandler {Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {// starting with 7.0 we dont include types by default in the response to GET requestsif (request.getRestApiVersion() RestApiVersion.V_7 request.hasParam(INCLUDE_TYPE_NAME_PARAMETER) request.method().equals(GET)) {deprecationLogger.compatibleCritical(get_indices_with_types, TYPES_DEPRECATION_MESSAGE);}String[] indices Strings.splitStringByCommaToArray(request.param(index));final GetIndexRequest getIndexRequest new GetIndexRequest();getIndexRequest.indices(indices);getIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, getIndexRequest.indicesOptions()));getIndexRequest.local(request.paramAsBoolean(local, getIndexRequest.local()));getIndexRequest.masterNodeTimeout(request.paramAsTime(master_timeout, getIndexRequest.masterNodeTimeout()));getIndexRequest.humanReadable(request.paramAsBoolean(human, false));getIndexRequest.includeDefaults(request.paramAsBoolean(include_defaults, false));getIndexRequest.features(GetIndexRequest.Feature.fromRequest(request));final var httpChannel request.getHttpChannel();//这个是上面创建索引类似return channel - new RestCancellableNodeClient(client, httpChannel).admin().indices().getIndex(getIndexRequest, new RestChunkedToXContentListener(channel));}
}2、实际查询的TransportGetIndexAction.java的doMasterOperation方法 Overrideprotected void doMasterOperation(Task task,final org.elasticsearch.action.admin.indices.get.GetIndexRequest request,String[] concreteIndices,final ClusterState state,final ActionListenerGetIndexResponse listener) {MapString, MappingMetadata mappingsResult ImmutableOpenMap.of();MapString, ListAliasMetadata aliasesResult Map.of();MapString, Settings settings Map.of();MapString, Settings defaultSettings Map.of();MapString, String dataStreams Map.copyOf(state.metadata().findDataStreams(concreteIndices).entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, v - v.getValue().getName())));Feature[] features request.features();boolean doneAliases false;boolean doneMappings false;boolean doneSettings false;for (Feature feature : features) {checkCancellation(task);switch (feature) {case MAPPINGS:if (doneMappings false) {mappingsResult state.metadata().findMappings(concreteIndices, indicesService.getFieldFilter(), () - checkCancellation(task));doneMappings true;}break;case ALIASES:if (doneAliases false) {aliasesResult state.metadata().findAllAliases(concreteIndices);doneAliases true;}break;case SETTINGS:if (doneSettings false) {MapString, Settings settingsMapBuilder new HashMap();MapString, Settings defaultSettingsMapBuilder new HashMap();for (String index : concreteIndices) {checkCancellation(task);Settings indexSettings state.metadata().index(index).getSettings();if (request.humanReadable()) {indexSettings IndexMetadata.addHumanReadableSettings(indexSettings);}settingsMapBuilder.put(index, indexSettings);if (request.includeDefaults()) {Settings defaultIndexSettings settingsFilter.filter(indexScopedSettings.diff(indexSettings, Settings.EMPTY));defaultSettingsMapBuilder.put(index, defaultIndexSettings);}}settings Collections.unmodifiableMap(settingsMapBuilder);defaultSettings Collections.unmodifiableMap(defaultSettingsMapBuilder);doneSettings true;}break;default:throw new IllegalStateException(feature [ feature ] is not valid);}}listener.onResponse(new GetIndexResponse(concreteIndices, mappingsResult, aliasesResult, settings, defaultSettings, dataStreams));}(1)查询的索引信息都是从集群状态中ImmutableOpenMapString, IndexMetadata得到的 private final ImmutableOpenMapString, IndexMetadata indices;public MapString, MappingMetadata findMappings(String[] concreteIndices,FunctionString, PredicateString fieldFilter,Runnable onNextIndex) {assert Transports.assertNotTransportThread(decompressing mappings is too expensive for a transport thread);assert concreteIndices ! null;if (concreteIndices.length 0) {return ImmutableOpenMap.of();}ImmutableOpenMap.BuilderString, MappingMetadata indexMapBuilder ImmutableOpenMap.builder();SetString indicesKeys indices.keySet();Stream.of(concreteIndices).filter(indicesKeys::contains).forEach(index - {onNextIndex.run();//这里查询的是上面的ImmutableOpenMapString, IndexMetadata indices;IndexMetadata indexMetadata indices.get(index);PredicateString fieldPredicate fieldFilter.apply(index);indexMapBuilder.put(index, filterFields(indexMetadata.mapping(), fieldPredicate));});return indexMapBuilder.build();}Settings indexSettings state.metadata().index(index).getSettings();中state.metadata().index(index) 也是ImmutableOpenMapString, IndexMetadata indices public IndexMetadata index(String index) {return indices.get(index);}