diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java index ece4f53381c1..0fc38962fe27 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -194,6 +194,7 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { // zk ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + boolean issuedDistribCommit = false; List useNodes = null; if (req.getParams().get(COMMIT_END_POINT) == null) { useNodes = nodes; @@ -203,11 +204,16 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); cmdDistrib.distribCommit(cmd, useNodes, params); - cmdDistrib.blockAndDoRetries(); + issuedDistribCommit = true; } } if (isLeader) { + if (issuedDistribCommit) { + // defensive copy of params, which was passed into distribCommit(...) above; will unconditionally replace + // DISTRIB_UPDATE_PARAM, COMMIT_END_POINT, and DISTRIB_FROM if the new `params` val will actually be used + params = new ModifiableSolrParams(params); + } params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); params.set(COMMIT_END_POINT, "replicas"); @@ -218,14 +224,15 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); + // NOTE: distribCommit(...) internally calls `blockAndDoRetries()`, flushing any TOLEADER distrib commits cmdDistrib.distribCommit(cmd, useNodes, params); + issuedDistribCommit = true; } doLocalCommit(cmd); - - if (useNodes != null) { - cmdDistrib.blockAndDoRetries(); - } + } + if (issuedDistribCommit) { + cmdDistrib.blockAndDoRetries(); } } } diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml new file mode 100644 index 000000000000..3e619948e189 --- /dev/null +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml @@ -0,0 +1,52 @@ + + + + + + + ${tests.luceneMatchVersion:LATEST} + + + + + + + + ${solr.ulog.dir:} + + + + ${solr.commitwithin.softcommit:true} + + + + + + + ensure-parallel-commit + + + + + + + + + diff --git a/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java new file mode 100644 index 000000000000..205baf4a16bf --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.update.CommitUpdateCommand; +import org.apache.solr.update.processor.UpdateRequestProcessor; +import org.apache.solr.update.processor.UpdateRequestProcessorFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.util.TestUtil; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ParallelCommitExecutionTest extends SolrCloudTestCase { + + private static final String DEBUG_LABEL = MethodHandles.lookup().lookupClass().getName(); + private static final String COLLECTION_NAME = DEBUG_LABEL + "_collection"; + + /** A basic client for operations at the cloud level, default collection will be set */ + private static CloudSolrClient CLOUD_CLIENT; + private static int expectCount; + private static volatile int tooHigh; + + private static volatile CountDownLatch countdown; + private static final AtomicInteger countup = new AtomicInteger(); + + @BeforeClass + public static void beforeClass() throws Exception { + // multi replicas matters; for the initial parallel commit execution tests, only consider repFactor=1 + final int repFactor = 1;//random().nextBoolean() ? 1 : 2; + final int numShards = TestUtil.nextInt(random(), 1, 4); + final int numNodes = (numShards * repFactor); + expectCount = numNodes; + + final String configName = DEBUG_LABEL + "_config-set"; + final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf"); + + configureCluster(numNodes).addConfig(configName, configDir).configure(); + + Map collectionProperties = new LinkedHashMap<>(); + collectionProperties.put("config", "solrconfig-parallel-commit.xml"); + collectionProperties.put("schema", "schema_latest.xml"); + CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor) + .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE) + .setProperties(collectionProperties) + .process(cluster.getSolrClient()); + + CLOUD_CLIENT = cluster.getSolrClient(); + CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME); + waitForRecoveriesToFinish(CLOUD_CLIENT); + } + + @AfterClass + private static void afterClass() throws Exception { + if (null != CLOUD_CLIENT) { + CLOUD_CLIENT.close(); + CLOUD_CLIENT = null; + } + } + + private static void initSyncVars(boolean tooHigh) { + final int ct; + if (tooHigh) { + ParallelCommitExecutionTest.tooHigh = TOO_HIGH_INCREMENT; + ct = expectCount + TOO_HIGH_INCREMENT; + } else { + ParallelCommitExecutionTest.tooHigh = 0; + ct = expectCount; + } + countdown = new CountDownLatch(ct); + countup.set(0); + } + + @Test + public void testParallelOk() throws Exception { + initSyncVars(false); + UpdateResponse rsp = CLOUD_CLIENT.commit(true, true); + // nocommit: why are we getting status:0 in response that clearly failed? It looks like no matter + // how many distrib commits fail, as long as the local commit succeeds, it reports success. (if the + // local commit fails, it apparently does report failure). + // that's why we're doing this "countup" thing -- because there's apparently no other way to know + // whether the commit succeeded or not? + // we expect `countup` to be incremented by exactly `expectCount` number of parallel commits + assertEquals(expectCount, countup.get()); + } + + private static final int TOO_HIGH_INCREMENT = 1; + @Test + public void testParallelFail() throws Exception { + // artificially set countdown too high; we're really testing the test code, since there's no way + // to mock the "incorrect" behavior (i.e., "serial" requests) + initSyncVars(true); + UpdateResponse rsp = CLOUD_CLIENT.commit(true, true); + + // If we could mock a "real" scenario, we'd expect the distrib requests to timeout waiting + // on the countdown latch (and thus not increment `countup`), but the "local" shard would + // then succeed, resulting in `countup` being incremented exactly once. Here we've set `countdown` + // artificially high, but to approximately mock this, we get the "local" shard to recognize + // the "tooHigh" scenario, and succeed anyway, incrementing `countup`. + assertEquals(1, countup.get()); + // sanity check: because we actually expect things to be working properly, we expect `countdown` + // to be exactly "TOO_HIGH_INCREMENT" too high. + assertEquals(TOO_HIGH_INCREMENT, countdown.getCount()); + } + + public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception { + assert null != client.getDefaultCollection(); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(client.getDefaultCollection(), + client.getZkStateReader(), + true, true, 330); + } + + public static class CheckFactory extends UpdateRequestProcessorFactory { + @Override + public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { + return new Check(next); + } + } + + public static class Check extends UpdateRequestProcessor { + + public Check(UpdateRequestProcessor next) { + super(next); + } + + @Override + public void processCommit(CommitUpdateCommand cmd) throws IOException { + super.processCommit(cmd); + countdown.countDown(); + // In the "tooHigh" scenario, for the one-and-only local (non-distrib) shard, force success (after waiting) to + // mock the "serial blocking" behavior. + final boolean mustSucceed = tooHigh == TOO_HIGH_INCREMENT && cmd.getReq().getParams().get("update.distrib") == null; + try { + if (!countdown.await(5, TimeUnit.SECONDS) && !mustSucceed) { + throw new RuntimeException("done waiting"); + } + countup.incrementAndGet(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } +}