高可用模式下的选主机制

选举原因

k8s为了保证kube-schedulerkube-manager-controller组件的高可用性,每个组件本身通过多实例副本方式部署。但是由于多个实例同时使用informer去监听处理必然带来一致性或重复执行问题,故采用分布式锁(选举)机制规避该问题

k8s是通过leaderelection来实现选举,只有一个实例能拿到锁,只有拿到锁的实例可以运行,其他实例会不断重试(轮询)方式获取锁

选举配置

  • LeaseDuration: 锁的时长,通过该值判断是否超时,超时则认定丢失锁
  • RenewDeadline: 续约时长,每次lease ttl的时长
  • RetryPeriod: 周期重试获取锁的间隔
  • Callbacks: 自定义方法
    • OnStartedLeading func(context.Context): 作为leader启动时进行回调
    • OnStoppedLeading func(): 关闭时进行回调
    • OnNewLeader func(identity string): 转换为leader时进行回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
"context"
"flag"
"os"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog"
)

var client *clientset.Clientset

func newLeaseLock(lockname, podname, namespace string) *resourcelock.LeaseLock {
return &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: lockname,
Namespace: namespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: podname,
},
}
}

func run(lock *resourcelock.LeaseLock, ctx context.Context, id string) {
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
klog.Info("get identity", "leader: ", lock.LeaseMeta.Name)
select {}
},
OnStoppedLeading: func() {
klog.Info("no longer the leader, staying inactive.")
},
OnNewLeader: func(current_id string) {
if current_id == id {
klog.Info("still the leader!")
return
}
klog.Info("new leader is %s", current_id)
},
},
})
}

func main() {
var (
leaseLockName string
leaseLockNamespace string
podName = os.Getenv("POD_NAME")
)
flag.StringVar(&leaseLockName, "lease-name", "", "Name of lease lock")
flag.StringVar(&leaseLockNamespace, "lease-namespace", "default", "Name of lease lock namespace")
flag.Parse()

if leaseLockName == "" {
klog.Fatal("missing lease-name flag")
}
if leaseLockNamespace == "" {
klog.Fatal("missing lease-namespace flag")
}

config, err := rest.InClusterConfig()
client = clientset.NewForConfigOrDie(config)

if err != nil {
klog.Fatalf("failed to get kubeconfig")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lock := newLeaseLock(leaseLockName, podName, leaseLockNamespace)
run(lock, ctx, podName)
}

选举逻辑

启动选举

资源锁列表

  • leases(recommend)
  • endpointsleases
  • configmapsleases
  • configmaps
  • endpoints

创建资源锁

  • 使用resourcelock.New创建

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    // Manufacture will create a lock of a given type according to the input parameters
    func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
    leaseLock := &LeaseLock{
    LeaseMeta: metav1.ObjectMeta{
    Namespace: ns,
    Name: name,
    },
    Client: coordinationClient,
    LockConfig: rlc,
    }
    switch lockType {
    case endpointsResourceLock:
    return nil, fmt.Errorf("endpoints lock is removed, migrate to %s", LeasesResourceLock)
    case configMapsResourceLock:
    return nil, fmt.Errorf("configmaps lock is removed, migrate to %s", LeasesResourceLock)
    case LeasesResourceLock:
    return leaseLock, nil
    case endpointsLeasesResourceLock:
    return nil, fmt.Errorf("endpointsleases lock is removed, migrate to %s", LeasesResourceLock)
    case configMapsLeasesResourceLock:
    return nil, fmt.Errorf("configmapsleases lock is removed, migrated to %s", LeasesResourceLock)
    default:
    return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }
    }

  • 使用resourcelock.NewFromKubeconfig创建

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // NewFromKubeconfig will create a lock of a given type according to the input parameters.
    // Timeout set for a client used to contact to Kubernetes should be lower than
    // RenewDeadline to keep a single hung request from forcing a leader loss.
    // Setting it to max(time.Second, RenewDeadline/2) as a reasonable heuristic.
    func NewFromKubeconfig(lockType string, ns string, name string, rlc ResourceLockConfig, kubeconfig *restclient.Config, renewDeadline time.Duration) (Interface, error) {
    // shallow copy, do not modify the kubeconfig
    config := *kubeconfig
    timeout := renewDeadline / 2
    if timeout < time.Second {
    timeout = time.Second
    }
    config.Timeout = timeout
    leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "leader-election"))
    return New(lockType, ns, name, leaderElectionClient.CoreV1(), leaderElectionClient.CoordinationV1(), rlc)
    }

选举

准备好资源锁后,调用方法leaderelection.RunOrDie开始选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
klog.Info("get identity", "leader: ", lock.LeaseMeta.Name)
select {}
},
OnStoppedLeading: func() {
klog.Info("no longer the leader, staying inactive.")
},
OnNewLeader: func(current_id string) {
if current_id == id {
klog.Info("still the leader!")
return
}
klog.Info("new leader is %s", current_id)
},
},
})

选举主流程

启动选举后,RunOrDie方法会调用le.Run(ctx)方法开始真正的选举流程,该方法除非在以下情况下才会返回:

  • ctx被取消(外部要求中止选举流程)
  • 当选了 Leader 后,任期结束(网络或某种原因导致续期失败)

当未曾竞选Leader时,则会卡在le.acquire方法中持续竞选

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer le.config.Callbacks.OnStoppedLeading()

if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(ctx)
}

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate. RunOrDie blocks until leader election loop is
// stopped by ctx or it has stopped holding the leader lease
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
if lec.WatchDog != nil {
lec.WatchDog.SetLeaderElection(le)
}
le.Run(ctx)
}

竞选

竞选逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
if !le.config.Coordinated {
succeeded = le.tryAcquireOrRenew(ctx)
} else {
succeeded = le.tryCoordinatedRenew(ctx)
}
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
le.config.Lock.RecordEvent("became leader")
le.metrics.leaderOn(le.config.Name)
klog.Infof("successfully acquired lease %v", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}

返回值情况:

  • succeeded 默认值是false,在竞选循环中,外部通过ctx中止竞选时会中止wait.JitterUntil循环并返回false
  • 如果竞选成功,则会改写succeeded=true并手动调用cancel()中止wait.JitterUntil循环

acquire函数只会有两种情况会返回:

  • true:当选 leader
  • false:外部中止竞选

未当选的竞选者会在wait.JitterUntil循环中持续尝试

抢锁和续约

抢锁操作和续约操作逻辑都在函数tryAcquireOrRenew

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.NewTime(le.clock.Now())
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}

// 1. fast path for the leader to update optimistically assuming that the record observed
// last time is the current version.
if le.IsLeader() && le.isLeaseValid(now.Time) {
oldObservedRecord := le.getObservedRecord()
leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions

err := le.config.Lock.Update(ctx, leaderElectionRecord)
if err == nil {
le.setObservedRecord(&leaderElectionRecord)
return true
}
klog.Errorf("Failed to update lock optimistically: %v, falling back to slow path", err)
}

// 2. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}

le.setObservedRecord(&leaderElectionRecord)

return true
}

// 3. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)

le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}

// 4. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
le.metrics.slowpathExercised(le.config.Name)
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}

// update the lock itself
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}

le.setObservedRecord(&leaderElectionRecord)
return true
}

原理分析

任期原理

基于le.observedTime判断当前主结点是否有效,le.observedTime是在主结点发生变化时调用le.setObservedRecord方法更新。更新时机:

  • 锁不存在,创建锁成功时更新
  • 获取到锁记录和缓存的不同,说明上次尝试获取锁到现在的间隔内主结点变更,更新缓存
  • 主结点超期没续期且当前节点抢锁成功,更新缓存

所以le.observedTime变化的时候都是监测到主结点变化的时间,所以le.observedTime + LeaseDuration的时间就是主结点的结束时间

抢锁原理

基于 kubernetes 的资源乐观锁实现的(通常是主结点自己去更新锁信息)

  • 获取锁方法le.config.Lock.Get(ctx)会取得当前锁的最新resourceVersion并保存
  • 更新锁时提供保存的resourceVersion
  • Kubernetes对比resourceVersion和最新值,如果相等则允许更新,返回成功,否则更新失败