// Manufacture will create a lock of a given type according to the input parameters funcNew(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) { leaseLock := &LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Namespace: ns, Name: name, }, Client: coordinationClient, LockConfig: rlc, } switch lockType { case endpointsResourceLock: returnnil, fmt.Errorf("endpoints lock is removed, migrate to %s", LeasesResourceLock) case configMapsResourceLock: returnnil, fmt.Errorf("configmaps lock is removed, migrate to %s", LeasesResourceLock) case LeasesResourceLock: return leaseLock, nil case endpointsLeasesResourceLock: returnnil, fmt.Errorf("endpointsleases lock is removed, migrate to %s", LeasesResourceLock) case configMapsLeasesResourceLock: returnnil, fmt.Errorf("configmapsleases lock is removed, migrated to %s", LeasesResourceLock) default: returnnil, fmt.Errorf("Invalid lock-type %s", lockType) } }
使用resourcelock.NewFromKubeconfig创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// NewFromKubeconfig will create a lock of a given type according to the input parameters. // Timeout set for a client used to contact to Kubernetes should be lower than // RenewDeadline to keep a single hung request from forcing a leader loss. // Setting it to max(time.Second, RenewDeadline/2) as a reasonable heuristic. funcNewFromKubeconfig(lockType string, ns string, name string, rlc ResourceLockConfig, kubeconfig *restclient.Config, renewDeadline time.Duration) (Interface, error) { // shallow copy, do not modify the kubeconfig config := *kubeconfig timeout := renewDeadline / 2 if timeout < time.Second { timeout = time.Second } config.Timeout = timeout leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "leader-election")) return New(lockType, ns, name, leaderElectionClient.CoreV1(), leaderElectionClient.CoordinationV1(), rlc) }
// Run starts the leader election loop. Run will not return // before leader election loop is stopped by ctx or it has // stopped holding the leader lease func(le *LeaderElector) Run(ctx context.Context) { defer runtime.HandleCrash() defer le.config.Callbacks.OnStoppedLeading()
if !le.acquire(ctx) { return// ctx signalled done } ctx, cancel := context.WithCancel(ctx) defer cancel() go le.config.Callbacks.OnStartedLeading(ctx) le.renew(ctx) }
// RunOrDie starts a client with the provided config or panics if the config // fails to validate. RunOrDie blocks until leader election loop is // stopped by ctx or it has stopped holding the leader lease funcRunOrDie(ctx context.Context, lec LeaderElectionConfig) { le, err := NewLeaderElector(lec) if err != nil { panic(err) } if lec.WatchDog != nil { lec.WatchDog.SetLeaderElection(le) } le.Run(ctx) }
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, // else it tries to renew the lease if it has already been acquired. Returns true // on success else returns false. func(le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { now := metav1.NewTime(le.clock.Now()) leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, }
// 1. fast path for the leader to update optimistically assuming that the record observed // last time is the current version. if le.IsLeader() && le.isLeaseValid(now.Time) { oldObservedRecord := le.getObservedRecord() leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions
err := le.config.Lock.Update(ctx, leaderElectionRecord) if err == nil { le.setObservedRecord(&leaderElectionRecord) returntrue } klog.Errorf("Failed to update lock optimistically: %v, falling back to slow path", err) }
// 2. obtain or create the ElectionRecord oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) if err != nil { if !errors.IsNotFound(err) { klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) returnfalse } if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil { klog.Errorf("error initially creating leader election record: %v", err) returnfalse }
le.setObservedRecord(&leaderElectionRecord)
returntrue }
// 3. Record obtained, check the Identity & Time if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord } iflen(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) returnfalse }
// 4. We're going to try to update. The leaderElectionRecord is set to it's default // here. Let's correct it before updating. if le.IsLeader() { leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions le.metrics.slowpathExercised(le.config.Name) } else { leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 }
// update the lock itself if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { klog.Errorf("Failed to update lock: %v", err) returnfalse }