Skip to content
Snippets Groups Projects
Select Git revision
  • 07caebe0a9f8a0f483021aa88043e19cd5749d44
  • main default protected
  • v1.298.0
  • v1.297.0
  • v1.296.0
  • v1.295.0
  • v1.294.0
  • v1.293.0
  • v1.292.0
  • v1.291.0
  • v1.290.0
  • v1.289.0
  • v1.288.0
  • v1.287.0
  • v1.286.0
  • v1.285.0
  • v1.284.0
  • v1.283.0
  • v1.282.0
  • v1.281.0
  • v1.280.0
  • v1.279.0
22 results

redis.go

Blame
  • redis.go 7.96 KiB
    package redis
    
    import (
    	"context"
    	"encoding/json"
    	"fmt"
    	"math"
    	"os"
    	"strings"
    	"time"
    
    	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/errors"
    	"gitlab.bob.co.za/bob-public-utils/bobgroup-go-utils/logs"
    
    	"github.com/go-redis/redis_rate/v9"
    
    	"github.com/go-redis/redis/v8"
    )
    
    var (
    	ctx         = context.Background()
    	redisClient *ClientWithHelpers
    )
    
    type ClientWithHelpers struct {
    	Client    *redis.Client
    	Available bool
    }
    
    type SetLockKeyOptions struct {
    	Value          *string
    	MaxRetries     *int
    	RetryInterval  *time.Duration
    	FallbackResult *bool
    }
    
    func GetRedisClient(isDebug bool) *ClientWithHelpers {
    	if redisClient != nil && redisClient.IsConnected() {
    		return redisClient
    	}
    	redisClient = connectToRedis(isDebug)
    	return redisClient
    }
    
    func connectToRedis(isDebug bool) *ClientWithHelpers {
    	redisHost := os.Getenv("REDIS_HOST")
    	if redisHost == "false" || redisHost == "" {
    		return &ClientWithHelpers{
    			Client:    nil,
    			Available: false,
    		}
    	}
    
    	host := redisHost
    	port := os.Getenv("REDIS_PORT")
    
    	if isDebug {
    		env := os.Getenv("ENVIRONMENT")
    		switch env {
    		case "dev":
    			port = "6380"
    		case "stage":
    			port = "6381"
    		case "sandbox", "qa":
    			port = "6382"
    		case "prod":
    			port = "6383"
    		}
    	}
    	return NewClient(host + ":" + port)
    }
    
    func NewClient(addr string) *ClientWithHelpers {
    	return &ClientWithHelpers{
    		Client: redis.NewClient(&redis.Options{
    			MaxRetries:  1,
    			DialTimeout: time.Duration(1) * time.Second, // So max 2 second wait
    			Addr:        addr,
    			Password:    "", // no password set
    			DB:          0,  // use default Db
    		}),
    		Available: true,
    	}
    }
    
    func (r ClientWithHelpers) IsConnected() bool {
    	return r.Client != nil && r.Available == true
    }
    
    func (r ClientWithHelpers) DeleteByKey(key string) error {
    	if !r.IsConnected() {
    		return errors.Errorf("REDIS disabled: cannot del key(%s)", key)
    	}
    	_, err := r.Client.Del(ctx, key).Result()
    	if err != nil {
    		return errors.Wrapf(err, "failed to del key(%s)", key)
    	}
    	return nil
    }
    
    func (r ClientWithHelpers) DeleteByKeyPattern(pattern string) {
    	if !r.IsConnected() {
    		return
    	}
    
    	iter := r.Client.Scan(ctx, 0, pattern, math.MaxInt64).Iterator()
    	for iter.Next(ctx) {
    		val := iter.Val()
    		err := r.Client.Del(ctx, val).Err()
    		if err != nil {
    			panic(err)
    		}
    	}
    	if err := iter.Err(); err != nil {
    		panic(err)
    	}
    }
    
    func (r ClientWithHelpers) SetObjectByKey(key string, object interface{}) {
    	if !r.IsConnected() {
    		return
    	}
    
    	jsonBytes, err := json.Marshal(object)
    	if err != nil {
    		logs.ErrorWithMsg(err, "Error marshalling object to Redis: %s")
    		return
    	}
    
    	_, err = r.Client.Set(ctx, key, string(jsonBytes), 24*time.Hour).Result()
    	if err != nil {
    		logs.ErrorWithMsg(err, "Error setting value to Redis for key: %s", key)
    
    		/* Prevent further calls in this execution from trying to connect and also timeout */
    		if strings.HasSuffix(err.Error(), "i/o timeout") {
    			r.Available = false
    		}
    	}
    }
    
    func (r ClientWithHelpers) SetObjectByKeyWithExpiry(key string, object interface{}, expiration time.Duration) {
    	if !r.IsConnected() {
    		return
    	}
    
    	jsonBytes, err := json.Marshal(object)
    	if err != nil {
    		logs.ErrorWithMsg(err, "Error marshalling object to Redis: %s")
    		return
    	}
    
    	_, err = r.Client.Set(ctx, key, string(jsonBytes), expiration).Result()
    	if err != nil {
    		logs.ErrorWithMsg(err, "Error setting value to Redis for key: %s", key)
    
    		/* Prevent further calls in this execution from trying to connect and also timeout */
    		if strings.HasSuffix(err.Error(), "i/o timeout") {
    			r.Available = false
    		}
    	}
    }
    
    func (r ClientWithHelpers) SetObjectByKeyIndefinitely(key string, object interface{}) {
    	if !r.IsConnected() {
    		return
    	}
    
    	jsonBytes, err := json.Marshal(object)
    	if err != nil {
    		logs.ErrorWithMsg(err, "Error marshalling object to Redis")
    		return
    	}
    
    	_, err = r.Client.Set(ctx, key, string(jsonBytes), 0).Result()
    	if err != nil {
    		logs.ErrorWithMsg(err, "Error setting value to Redis for key: %s", key)
    
    		/* Prevent further calls in this execution from trying to connect and also timeout */
    		if strings.HasSuffix(err.Error(), "i/o timeout") {
    			r.Available = false
    		}
    	}
    
    }
    
    // GetObjectByKey fetches an object from Redis by key. If the key does not exist or there is an error, it returns nil.
    // If an expiry is provided, it will both fetch the key and update its expiry.
    func GetObjectByKey[T any](redisClient *ClientWithHelpers, key string, object T, expiry ...time.Duration) *T {
    	// Make sure we have a Redis client, and it is connected
    	if redisClient == nil || !redisClient.IsConnected() {
    		return nil
    	}
    
    	// Get the object from Redis
    	jsonString := redisClient.GetValueByKey(key, expiry...)
    	if jsonString == "" {
    		return nil
    	}
    
    	// Now unmarshal
    	err := json.Unmarshal([]byte(jsonString), &object)
    	if err != nil {
    		return nil
    	}
    
    	return &object
    }
    
    // GetValueByKey fetches a value from Redis by key. If the key does not exist or there is an error, it returns an empty
    // string. If an expiry is provided, it will both fetch the key and update its expiry.
    func (r ClientWithHelpers) GetValueByKey(key string, expiry ...time.Duration) string {
    	if !r.IsConnected() {
    		return ""
    	}
    
    	var jsonString string
    	var err error
    	if len(expiry) > 0 {
    		jsonString, err = r.Client.GetEx(ctx, key, expiry[0]).Result()
    	} else {
    		jsonString, err = r.Client.Get(ctx, key).Result()
    	}
    	if err == redis.Nil { /* Key does not exist */
    		return ""
    	} else if err != nil { /* Actual error */
    		logs.Warn(fmt.Sprintf("Error fetching object from Redis for key: %s", key), err)
    		/* Prevent further calls in this execution from trying to connect and also timeout */
    		if strings.HasSuffix(err.Error(), "i/o timeout") {
    			r.Available = false
    		}
    	}
    	return jsonString
    }
    
    func (r ClientWithHelpers) RateLimit(key string, limitFn func(int) redis_rate.Limit, limit int) (bool, error) {
    	limiter := redis_rate.NewLimiter(r.Client)
    	res, err := limiter.Allow(ctx, key, limitFn(limit))
    	if err != nil {
    		logs.ErrorWithMsg(err, "Redis Error rate limiting - %s", key)
    
    		/* Prevent further calls in this execution from trying to connect and also timeout */
    		if strings.HasSuffix(err.Error(), "i/o timeout") {
    			r.Available = false
    		}
    
    		return false, err
    	} else {
    		return res.Allowed == 1, nil
    	}
    }
    
    // SetLockKey attempts to set a lock on the specified key.
    func (r ClientWithHelpers) SetLockKey(key string, expiration time.Duration, lockOptions ...SetLockKeyOptions) (success bool, err error) {
    	fallbackResult := false
    	if len(lockOptions) > 0 && lockOptions[0].FallbackResult != nil {
    		fallbackResult = *lockOptions[0].FallbackResult
    	}
    
    	if !r.IsConnected() {
    		return fallbackResult, errors.Error("Redis is not connected")
    	}
    
    	value := "1"
    	retries := 0
    	retryInterval := 100 * time.Millisecond
    	if len(lockOptions) > 0 {
    		// Only use the values that were set
    		if lockOptions[0].Value != nil {
    			value = *lockOptions[0].Value
    		}
    		if lockOptions[0].MaxRetries != nil {
    			retries = *lockOptions[0].MaxRetries
    		}
    		if lockOptions[0].RetryInterval != nil {
    			retryInterval = *lockOptions[0].RetryInterval
    		}
    	}
    
    	for retries >= 0 {
    		success, err = r.Client.SetNX(ctx, key, value, expiration).Result()
    		if err != nil {
    			logs.ErrorWithMsg(err, "Error setting lock key %s", key)
    
    			// Prevent further calls in this execution from trying to connect and also timeout
    			if strings.HasSuffix(err.Error(), "i/o timeout") {
    				r.Available = false
    			}
    
    			return fallbackResult, err
    		}
    
    		if success || retries == 0 {
    			break
    		}
    
    		retries--
    		time.Sleep(retryInterval)
    	}
    
    	return success, nil
    }
    
    func (r ClientWithHelpers) KeepLockKeyAlive(key string, expiration time.Duration) {
    	if !r.IsConnected() {
    		return
    	}
    
    	_ = r.Client.Expire(ctx, key, expiration)
    }
    
    func (r ClientWithHelpers) IncrementCounter(key string) *int64 {
    	if !r.IsConnected() {
    		return nil
    	}
    
    	val, err := r.Client.Incr(ctx, key).Result()
    	if err != nil {
    		return nil
    	}
    
    	return &val
    }
    
    func (r ClientWithHelpers) DecrementCounter(key string) *int64 {
    	if !r.IsConnected() {
    		return nil
    	}
    
    	val, err := r.Client.Decr(ctx, key).Result()
    	if err != nil {
    		return nil
    	}
    
    	return &val
    }