ericpuwang

Go Redis使用可重入锁支持选主

分析

对于资源共享需求较低的场景,可以使用有时效性的互斥锁,可以实现选主功能。可以参考K8S选主机制

方案

定义选主对象

选主锁使用带有时效的Redis互斥锁, 只有得到该互斥锁,才能执行程序

type LeaderElector struct {
	config   LeaderElectionConfig
	lock     *lock.RedisLock
	isLeader bool
}

执行选主逻辑,并启动服务

func (le *LeaderElector) Run(ctx context.Context) {
	defer lock.HandleCrash()
	defer func() {
		if le.config.Callbacks.OnStoppedLeading != nil {
			le.config.Callbacks.OnStoppedLeading(ctx)
		}
	}()

	if !le.acquire(ctx) {
		// 取消上下文
		return
	}

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	go le.config.Callbacks.OnStartedLeading(ctx)
	le.renew(ctx)
}

示例

package main

import (
	"context"
	"flag"
	"fmt"
	"os"
	"time"

	"github.com/CodeNinja917/leaderelection"
	"github.com/google/uuid"
	"github.com/redis/go-redis/v9"
)

func main() {
	var (
		redisAddr string
		lockName  string
		id        string
	)
	flag.StringVar(&redisAddr, "redis-addr", "localhost:6379", "redis addr")
	flag.StringVar(&lockName, "lock-name", "", "the lease lock resource name")
	flag.StringVar(&id, "id", uuid.New().String(), "the holder identify name")
	flag.Parse()

	cfg := leaderelection.LeaderElectionConfig{
		RedisConfig: redis.Options{Addr: redisAddr},
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: func(ctx context.Context) {
				for {
					select {
					case <-ctx.Done():
						return
					default:
						time.Sleep(10 * time.Second)
						fmt.Println("Controller loop...")
					}
				}
			},
			OnStoppedLeading: func(ctx context.Context) {
				fmt.Printf("leader lost: %s\n", id)
				os.Exit(0)
			},
			OnNewLeader: func(identity string) {
				if identity == id {
					return
				}
				fmt.Printf("new leader elected: %s\n", identity)
			},
		},
		ReleaseOnCancel: true,
		Identity:        id,
		Key:             lockName,
	}
	ctx := context.Background()
	le, err := leaderelection.NewLeaderElector(ctx, cfg)
	if err != nil {
		fmt.Printf("ERROR: %v\n", err)
		return
	}
	le.Run(ctx)
}