Skip to content

Commit 7a32901

Browse files
committed
store/postgres: Make subgraph creation not block writers
Creating a subgraph can block all other subgraph writers if the creation itself has to wait for its lock, e.g., because of an ongoing autovacuum of event_meta_data. That wait can be long (an hour or more in the case of autovacuum) We now do subgraph creation in such a way that we wait for a lock only a short time (2s) and then sleep for a bit so that other write activity can proceed.
1 parent c3d7bb4 commit 7a32901

File tree

2 files changed

+108
-5
lines changed

2 files changed

+108
-5
lines changed

store/postgres/src/store.rs

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,13 +1026,69 @@ impl StoreTrait for Store {
10261026
subgraph_id: &SubgraphDeploymentId,
10271027
ops: Vec<EntityOperation>,
10281028
) -> Result<(), StoreError> {
1029+
// Various timing parameters, all in seconds
1030+
const INITIAL_DELAY: u64 = 2;
1031+
const MAX_DELAY: u64 = 64;
1032+
const LOCK_TIMEOUT: u64 = 2;
1033+
10291034
let conn = self.get_conn().map_err(Error::from)?;
10301035
let econn = e::Connection::new(&conn);
1031-
1032-
conn.transaction(|| {
1033-
self.apply_entity_operations_with_conn(&econn, ops, None)?;
1034-
crate::entities::create_schema(&conn, subgraph_id)
1035-
})
1036+
let mut delay = Duration::from_secs(INITIAL_DELAY);
1037+
1038+
// Creating a subgraph creates a table that references
1039+
// `event_meta_data`. To validate that reference, Postgres takes a
1040+
// `share update exclusive` lock; for this lock, Postgres has to
1041+
// wait for all write activity that started before the lock was
1042+
// requested to finish, and it also has to hold all write activity
1043+
// that starts after the lock request. Usually, this is not a
1044+
// problem as the lock is only held for a very short amount of
1045+
// time.
1046+
//
1047+
// If there is other activity, like an autovacuum, happening
1048+
// already when the lock is requested though, we have to wait until
1049+
// that activity finishes, which in the case of an autovacuum can
1050+
// be an hour or longer. The autovacuum by itself is not a problem,
1051+
// as it still allows writes to happen, but once the lock request
1052+
// from this code gets into the lock queue, write activity also has
1053+
// to wait for the autovacuum to finish, effectively blocking all
1054+
// subgraph indexing until the autovacuum has finished.
1055+
//
1056+
// To avoid this, we set a lock timeout of 2s, which should be long
1057+
// enough to get the lock under normal conditions, but not so long
1058+
// that it materially impedes indexing in the above situation. If
1059+
// we can not get the lock within 2s, the subgraph creation fails,
1060+
// and we sleep an increasing amount of time (up to about a minute)
1061+
// and then retry the subgraph creation.
1062+
loop {
1063+
let start = Instant::now();
1064+
let result = conn.transaction(|| -> Result<(), StoreError> {
1065+
self.apply_entity_operations_with_conn(&econn, ops.clone(), None)?;
1066+
conn.batch_execute(&format!("set local lock_timeout to '{}s'", LOCK_TIMEOUT))?;
1067+
crate::entities::create_schema(&conn, subgraph_id)
1068+
});
1069+
if let Err(StoreError::Unknown(_)) = &result {
1070+
// There is no robust way to actually find out that we timed
1071+
// out on the lock from the error message; diesel shields us
1072+
// from these details too much. Rather than grep the error
1073+
// message, which would be very fragile, we assume that if a
1074+
// failure occurred after more than LOCK_TIMEOUT seconds that
1075+
// it was because we timed out on the lock and try again.
1076+
if start.elapsed() >= Duration::from_secs(LOCK_TIMEOUT) {
1077+
debug!(
1078+
self.logger,
1079+
"could not acquire lock for creation of subgraph {}, trying again in {}s",
1080+
&subgraph_id,
1081+
delay.as_secs()
1082+
);
1083+
std::thread::sleep(delay);
1084+
if delay.as_secs() < MAX_DELAY {
1085+
delay *= 2;
1086+
}
1087+
continue;
1088+
}
1089+
}
1090+
break result;
1091+
}
10361092
}
10371093

10381094
fn start_subgraph_deployment(

store/postgres/tests/store.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2145,3 +2145,50 @@ fn subgraph_schema_types_have_subgraph_id_directive() {
21452145
Ok(())
21462146
})
21472147
}
2148+
2149+
#[test]
2150+
fn create_subgraph_deployment_tolerates_locks() {
2151+
run_test(|store| -> Result<(), ()> {
2152+
use diesel::connection::SimpleConnection;
2153+
use diesel::connection::TransactionManager;
2154+
use std::sync::Barrier;
2155+
2156+
const BLOCK_TIME: u64 = 3;
2157+
2158+
let url = postgres_test_url();
2159+
let barrier = Arc::new(Barrier::new(2));
2160+
let blocker_barrier = barrier.clone();
2161+
// Start a thread that will take a lock for BLOCK_TIME seconds and
2162+
// therefore block subgraph creation during that time
2163+
std::thread::spawn(move || {
2164+
let blocker =
2165+
PgConnection::establish(url.as_str()).expect("Failed to connect to Postgres");
2166+
blocker
2167+
.transaction_manager()
2168+
.begin_transaction(&blocker)
2169+
.expect("Failed to set up transaction for blocker");
2170+
blocker
2171+
.batch_execute("lock table event_meta_data in share update exclusive mode")
2172+
.expect("Failed to lock event_meta_data");
2173+
blocker_barrier.wait();
2174+
std::thread::sleep(Duration::from_secs(BLOCK_TIME));
2175+
blocker
2176+
.transaction_manager()
2177+
.rollback_transaction(&blocker)
2178+
.expect("Failed to roll blocker transaction back");
2179+
});
2180+
2181+
// While we are blocking, try to create a subgraph. We don't really have
2182+
// a way to check from the outside that this does not block other write
2183+
// activity, but it is visible in the logs if this test is run with
2184+
// GRAPH_LOG=debug
2185+
let subgraph_id = SubgraphDeploymentId::new("DeploymentLocking").unwrap();
2186+
barrier.wait();
2187+
let start = std::time::Instant::now();
2188+
store
2189+
.create_subgraph_deployment(&subgraph_id, vec![])
2190+
.expect("Subgraph creation failed");
2191+
assert!(start.elapsed() >= Duration::from_secs(BLOCK_TIME));
2192+
Ok(())
2193+
})
2194+
}

0 commit comments

Comments
 (0)