k8s为了保证kube-scheduler
和kube-manager-controller
组件的高可用性,每个组件本身通过多实例副本方式部署。但是由于多个实例同时使用informer
去监听处理必然带来一致性或重复执行问题,故采用分布式锁(选举)机制规避该问题
k8s是通过leaderelection
来实现选举,只有一个实例能拿到锁,只有拿到锁的实例可以运行,其他实例会不断重试(轮询)方式获取锁
LeaseDuration
: 锁的时长,通过该值判断是否超时,超时则认定丢失锁RenewDeadline
: 续约时长,每次lease ttl
的时长RetryPeriod
: 周期重试获取锁的间隔Callbacks
: 自定义方法
OnStartedLeading func(context.Context)
: 作为leader启动时进行回调OnStoppedLeading func()
: 关闭时进行回调OnNewLeader func(identity string)
: 转换为leader时进行回调package main
import (
"context"
"flag"
"os"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog"
)
var client *clientset.Clientset
func newLeaseLock(lockname, podname, namespace string) *resourcelock.LeaseLock {
return &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: lockname,
Namespace: namespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: podname,
},
}
}
func run(lock *resourcelock.LeaseLock, ctx context.Context, id string) {
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
klog.Info("get identity", "leader: ", lock.LeaseMeta.Name)
select {}
},
OnStoppedLeading: func() {
klog.Info("no longer the leader, staying inactive.")
},
OnNewLeader: func(current_id string) {
if current_id == id {
klog.Info("still the leader!")
return
}
klog.Info("new leader is %s", current_id)
},
},
})
}
func main() {
var (
leaseLockName string
leaseLockNamespace string
podName = os.Getenv("POD_NAME")
)
flag.StringVar(&leaseLockName, "lease-name", "", "Name of lease lock")
flag.StringVar(&leaseLockNamespace, "lease-namespace", "default", "Name of lease lock namespace")
flag.Parse()
if leaseLockName == "" {
klog.Fatal("missing lease-name flag")
}
if leaseLockNamespace == "" {
klog.Fatal("missing lease-namespace flag")
}
config, err := rest.InClusterConfig()
client = clientset.NewForConfigOrDie(config)
if err != nil {
klog.Fatalf("failed to get kubeconfig")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
lock := newLeaseLock(leaseLockName, podName, leaseLockNamespace)
run(lock, ctx, podName)
}
资源锁列表
leases
(recommend)endpointsleases
configmapsleases
configmaps
endpoints
创建资源锁
resourcelock.New
创建
// Manufacture will create a lock of a given type according to the input parameters
func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
leaseLock := &LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Client: coordinationClient,
LockConfig: rlc,
}
switch lockType {
case endpointsResourceLock:
return nil, fmt.Errorf("endpoints lock is removed, migrate to %s", LeasesResourceLock)
case configMapsResourceLock:
return nil, fmt.Errorf("configmaps lock is removed, migrate to %s", LeasesResourceLock)
case LeasesResourceLock:
return leaseLock, nil
case endpointsLeasesResourceLock:
return nil, fmt.Errorf("endpointsleases lock is removed, migrate to %s", LeasesResourceLock)
case configMapsLeasesResourceLock:
return nil, fmt.Errorf("configmapsleases lock is removed, migrated to %s", LeasesResourceLock)
default:
return nil, fmt.Errorf("Invalid lock-type %s", lockType)
}
}
resourcelock.NewFromKubeconfig
创建
// NewFromKubeconfig will create a lock of a given type according to the input parameters.
// Timeout set for a client used to contact to Kubernetes should be lower than
// RenewDeadline to keep a single hung request from forcing a leader loss.
// Setting it to max(time.Second, RenewDeadline/2) as a reasonable heuristic.
func NewFromKubeconfig(lockType string, ns string, name string, rlc ResourceLockConfig, kubeconfig *restclient.Config, renewDeadline time.Duration) (Interface, error) {
// shallow copy, do not modify the kubeconfig
config := *kubeconfig
timeout := renewDeadline / 2
if timeout < time.Second {
timeout = time.Second
}
config.Timeout = timeout
leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "leader-election"))
return New(lockType, ns, name, leaderElectionClient.CoreV1(), leaderElectionClient.CoordinationV1(), rlc)
}
选举
准备好资源锁后,调用方法leaderelection.RunOrDie
开始选举。
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
klog.Info("get identity", "leader: ", lock.LeaseMeta.Name)
select {}
},
OnStoppedLeading: func() {
klog.Info("no longer the leader, staying inactive.")
},
OnNewLeader: func(current_id string) {
if current_id == id {
klog.Info("still the leader!")
return
}
klog.Info("new leader is %s", current_id)
},
},
})
启动选举后,RunOrDie
方法会调用le.Run(ctx)
方法开始真正的选举流程,该方法除非在以下情况下才会返回:
ctx
被取消(外部要求中止选举流程)当未曾竞选Leader时,则会卡在le.acquire
方法中持续竞选
// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer le.config.Callbacks.OnStoppedLeading()
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(ctx)
}
// RunOrDie starts a client with the provided config or panics if the config
// fails to validate. RunOrDie blocks until leader election loop is
// stopped by ctx or it has stopped holding the leader lease
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
if lec.WatchDog != nil {
lec.WatchDog.SetLeaderElection(le)
}
le.Run(ctx)
}
竞选逻辑
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
if !le.config.Coordinated {
succeeded = le.tryAcquireOrRenew(ctx)
} else {
succeeded = le.tryCoordinatedRenew(ctx)
}
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
le.config.Lock.RecordEvent("became leader")
le.metrics.leaderOn(le.config.Name)
klog.Infof("successfully acquired lease %v", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}
返回值情况:
succeeded
默认值是false
,在竞选循环中,外部通过ctx
中止竞选时会中止wait.JitterUntil
循环并返回false
succeeded=true
并手动调用cancel()
中止wait.JitterUntil
循环故acquire
函数只会有两种情况会返回:
true
:当选 leaderfalse
:外部中止竞选未当选的竞选者会在wait.JitterUntil
循环中持续尝试
抢锁操作和续约操作逻辑都在函数tryAcquireOrRenew
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.NewTime(le.clock.Now())
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1. fast path for the leader to update optimistically assuming that the record observed
// last time is the current version.
if le.IsLeader() && le.isLeaseValid(now.Time) {
oldObservedRecord := le.getObservedRecord()
leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions
err := le.config.Lock.Update(ctx, leaderElectionRecord)
if err == nil {
le.setObservedRecord(&leaderElectionRecord)
return true
}
klog.Errorf("Failed to update lock optimistically: %v, falling back to slow path", err)
}
// 2. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}
// 3. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 4. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
le.metrics.slowpathExercised(le.config.Name)
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// update the lock itself
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}
基于le.observedTime
判断当前主结点是否有效,le.observedTime
是在主结点发生变化时调用le.setObservedRecord
方法更新。更新时机:
所以le.observedTime
变化的时候都是监测到主结点变化的时间,所以le.observedTime + LeaseDuration
的时间就是主结点的结束时间
基于 kubernetes 的资源乐观锁实现的(通常是主结点自己去更新锁信息)
le.config.Lock.Get(ctx)
会取得当前锁的最新resourceVersion
并保存resourceVersion
resourceVersion
和最新值,如果相等则允许更新,返回成功,否则更新失败