You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a controller that uses reflectors to avoid hammering the life out of the kubernetes api. However i'm having issues with it sometimes not updating those reflectors and just getting stuck.
I'm using default_backoff and am expecting it to automatically retry and restart the reflector stream if something fails, but maybe that is a wrong assumption?
Slightly cleaned up, this is the code i'm using:
asyncfnmain() -> anyhow::Result<()>{let kubernetes_client = Client::try_default().await?;letmut tasks:JoinSet<()> = JoinSet::new();let namespace_api:Api<Namespace> = Api::all(kubernetes_client.clone());let(namespace_store, namespace_writer) = reflector::store_shared(128);let namespace_subscriber = namespace_writer
.subscribe().expect("subscribers can only be created from shared stores");
tasks.spawn(watcher(namespace_api,Default::default()).default_backoff().reflect_shared(namespace_writer).for_each(|_| futures::future::ready(())),);
namespace_store.wait_until_ready().await?;let context = Arc::new(ContextData{
kubernetes_client,namespaces: namespace_store.clone(),});
tasks.spawn(create_fanout_controller::<Secret>(
context.clone(),
namespace_subscriber.clone(),));info!("Operator started.");whileletSome(res) = tasks.join_next().await{
res?;}Ok(())}asyncfncreate_fanout_controller<R:Fanoutable>(context:Arc<ContextData>,namespace_subscriber:ReflectHandle<Namespace>,) -> (){let api:Api<R> = Api::all(context.kubernetes_client.clone());let(store, writer) = reflector::store();let main_stream = watcher(
api.clone(),
watcher::Config::default().labels("fanout.digizuite.com/enable_fanout=true"),).default_backoff().reflect(writer).touched_objects();let ns_store = store.clone();let owned_by_store = store.clone();Controller::for_stream(main_stream, store.clone()).with_config(controller::Config::default()).watches_shared_stream(namespace_subscriber,move |ns| {let name = Candidate::new(
ns.metadata.name.as_ref().expect("Namespace objects are from the api and should always have a name"),);
ns_store
.state().iter().filter(|c| {
c.get_target_namespace_matcher().is_some_and(|m| m.is_match_candidate(&name))}).map(|c| c.to_object_ref(())).collect::<Vec<_>>()}).run(fanout_resource, error_policy, context.clone()).for_each(|e| asyncmove{match e {Err(e) => {error!("Error in reconciler: {:?}", e);}Ok(r) => {info!("Reconciled resource: {:?}", r.0.name);}}}).await}
And then things are then stuck believing an invalid state of the world:
2025-06-10T08:18:22.207Z ERROR kube_fanout::fanout > Failed to apply resource to namespace digizuite-5f39be27d2fb4a1787b081e0a9e35a84: ApiError: secrets "keyshot-studio-license" is forbidden: unable to create new content in namespace digizuite-5f39be27d2fb4a1787b081e0a9e35a84 because it is being terminated: Forbidden (ErrorResponse { status: "Failure", message: "secrets \"keyshot-studio-license\" is forbidden: unable to create new content in namespace digizuite-5f39be27d2fb4a1787b081e0a9e35a84 because it is being terminated", reason: "Forbidden", code: 403 })
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
I have a controller that uses reflectors to avoid hammering the life out of the kubernetes api. However i'm having issues with it sometimes not updating those reflectors and just getting stuck.
I'm using
default_backoff
and am expecting it to automatically retry and restart the reflector stream if something fails, but maybe that is a wrong assumption?Slightly cleaned up, this is the code i'm using:
Looking at the logs i see this error occur:
And then things are then stuck believing an invalid state of the world:
Slighly more logs are available here for more context: https://gist.github.com/zlepper/54e060f5f8ce8cd875e7fb53e0d86f97
Beta Was this translation helpful? Give feedback.
All reactions