Compare commits

...

9 Commits

13 changed files with 346 additions and 63 deletions

View File

@ -28,7 +28,6 @@ Configuration contains:
"MulticastConfig": {
"MIpAddr": "239.0.0.19",
"MPort": "9898",
"MaxDatagramSize": 18
},
"InterfaceConfig": {
"ExistingInterface": "eth0",
@ -42,7 +41,7 @@ Where:
- MIpAddr is the multicast address all the nodes will use to align.Must be the same on all nodes.
- Mport: Multicast port to subscribe. Must be the same on all nodes.
- MaxDataGramSize : obsolete, will be removed soon.
- ~MaxDataGramSize : obsolete, will be removed soon.~ DONE.
- ExistingInterface: the name of your ingress interface (eth0, eno0 , enps18, whatever your system is using. This may be different node by node)
- BridgeIpCidr: the IP address and mask you want to share among servers.

View File

@ -28,7 +28,7 @@ func init() {
MulticastEntity.MIpAddr = a.MulticastConfig.MIPAddr
MulticastEntity.MPort = a.MulticastConfig.MPort
MulticastEntity.MaxDatagramSize = a.MulticastConfig.MaxDatagramSize
MulticastEntity.MaxDatagramSize = len("75568770-cee4-4506-a790-036e232fb1b3")
ZoreideBridge.BridgeIpCIDR = a.InterfaceConfig.BridgeIPCIDR
ZoreideBridge.ExistingInterface = a.InterfaceConfig.ExistingInterface

2
go.mod
View File

@ -4,12 +4,12 @@ go 1.21
require (
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534
github.com/google/uuid v1.3.0
github.com/milosgajdos/tenus v0.0.3
)
require (
github.com/docker/libcontainer v2.2.1+incompatible // indirect
github.com/google/uuid v1.2.0 // indirect
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect

2
go.sum
View File

@ -4,6 +4,8 @@ github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 h1:dhy9OQKGBh4zVXbjwb
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/milosgajdos/tenus v0.0.3 h1:jmaJzwaY1DUyYVD0lM4U+uvP2kkEg1VahDqRFxIkVBE=
github.com/milosgajdos/tenus v0.0.3/go.mod h1:eIjx29vNeDOYWJuCnaHY2r4fq5egetV26ry3on7p8qY=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 h1:b0LrWgu8+q7z4J+0Y3Umo5q1dL7NXBkKBWkaVkAq17E=

View File

@ -3,12 +3,11 @@ package main
import (
"fmt"
"log"
"math/rand"
"net"
"time"
"github.com/go-ping/ping"
"github.com/google/uuid"
"github.com/milosgajdos/tenus"
)
@ -45,8 +44,18 @@ func (b *AbstractBridge) RefreshArp() {
func (b *AbstractBridge) initializeHierarchy() {
b.hIerarchyNumber = rand.Int63()
log.Println("Initialized host number: ", b.hIerarchyNumber)
var err error
var tmpuuid uuid.UUID
tmpuuid, err = uuid.NewRandom()
b.hIerarchyNumber = tmpuuid.String()
if err == nil {
log.Println("Initialized secure host ID: ", b.hIerarchyNumber)
} else {
log.Println("Error happened with random UUID: ", err.Error())
log.Println("Initialized to less secure host ID: ", b.hIerarchyNumber)
b.hIerarchyNumber = uuid.NewString()
}
}
@ -111,12 +120,29 @@ func (b *AbstractBridge) removeIPandBridgeInt() {
}
func isActive(bridgeip string) bool {
func (b *AbstractBridge) IsActive() bool {
defer func() {
if r := recover(); r != nil {
fmt.Println("An error happened in <Isactive()>, but Zoreide recovered. ")
fmt.Println("Error was: ", r)
}
}()
var bridgeip string
brIp, _, err := net.ParseCIDR(b.BridgeIpCIDR)
if err != nil {
log.Println("IsActive : problem parsing the IP/CIDR: ", err.Error())
} else {
bridgeip = brIp.String()
}
log.Println("Check for active IP: ", bridgeip)
pinger, err := ping.NewPinger(bridgeip)
if err != nil {
log.Println("Unable to ping address: ", bridgeip)
log.Println("Ping error: " + err.Error())
}
// just in case it doesn't stops alone
@ -133,3 +159,50 @@ func isActive(bridgeip string) bool {
return stats.PacketsRecv == pinger.Count
}
func (b *AbstractBridge) IsAssigned() bool {
// we want the program to recover in case of issues
defer func() {
if r := recover(); r != nil {
fmt.Println("An error happened in <IsAssigned()>, but Zoreide recovered. ")
fmt.Println("Error was: ", r)
}
}()
var (
ief *net.Interface
addrs []net.Addr
err error
)
interfaceName := b.ExistingInterface
if ief, err = net.InterfaceByName(interfaceName); err != nil { // get interface
log.Printf("Interface %s does not exist or not manageable\n", interfaceName)
log.Printf("Error is: %s\n", err.Error())
return false
}
if addrs, err = ief.Addrs(); err != nil { // get addresses
log.Printf("Cannot read IPs of interface %s\n", interfaceName)
log.Printf("Error is: %s\n", err.Error())
return false
}
if len(addrs) < 1 {
log.Printf("Interface has NO ip addresses")
return false
}
for _, addr := range addrs {
log.Println("Address found is: ", addr.String())
if addr.String() == b.BridgeIpCIDR {
log.Printf("Ip %s is assigned to interface %s", b.BridgeIpCIDR, interfaceName)
return true
}
}
log.Printf("Ip %s is NOT assigned to interface %s", b.BridgeIpCIDR, interfaceName)
return false
}

View File

@ -1,14 +1,14 @@
package main
import (
"fmt"
"log"
"runtime"
"time"
)
func init() {
fmt.Println("Garbage Collector Thread Starting")
log.Println("Garbage Collector Thread Starting")
go memoryCleanerThread()
@ -18,9 +18,9 @@ func memoryCleanerThread() {
for {
time.Sleep(10 * time.Minute)
fmt.Println("Time to clean memory...")
log.Println("Time to clean memory...")
runtime.GC()
fmt.Println("Garbage Collection done.")
log.Println("Garbage Collection done.")
}
}

View File

@ -3,14 +3,12 @@ package main
import (
"fmt"
"log"
"net"
"regexp"
"slices"
"strconv"
"strings"
"time"
)
func (mip *AbstractMulticast) AddUniqueAndSort(hier int64) {
func (mip *AbstractMulticast) AddUniqueAndSort(hier string) {
if slices.Contains(mip.HierarchyArray, hier) {
log.Println("Element already in the array: ", hier)
@ -21,12 +19,19 @@ func (mip *AbstractMulticast) AddUniqueAndSort(hier int64) {
}
func (mip *AbstractMulticast) IsAlpha(hier int64) bool {
func (mip *AbstractMulticast) IsAlpha(hier string) bool {
alpha := slices.Max(mip.HierarchyArray)
slices.Sort(mip.HierarchyArray)
indexMax := len(mip.HierarchyArray) - 1
if indexMax < 0 {
log.Println("Empty array , no elements")
return false
}
alpha := mip.HierarchyArray[indexMax]
log.Println("Maximum element is :", alpha)
return alpha == hier
log.Println("Array is :", mip.HierarchyArray)
return strings.Compare(alpha, hier) == 0
}
@ -41,7 +46,7 @@ func (mip *AbstractMulticast) WriteNumberToMulticast(br AbstractBridge) {
log.Println("Initiating ticker")
bstNumber := fmt.Sprintf("%d", br.hIerarchyNumber)
bstNumber := br.hIerarchyNumber
for range time.Tick(1 * time.Second) {
@ -55,6 +60,13 @@ func (mip *AbstractMulticast) WriteNumberToMulticast(br AbstractBridge) {
func (mip *AbstractMulticast) ReadNumberFromMulticast() {
defer func() {
if r := recover(); r != nil {
fmt.Println("An error happened in <ReadNumberToMulticast()>, but Zoreide recovered. ")
fmt.Println("Error was: ", r)
}
}()
log.Println("Initiating reader")
buffer := make([]byte, mip.MaxDatagramSize)
@ -74,53 +86,47 @@ func (mip *AbstractMulticast) ReadNumberFromMulticast() {
func (b *AbstractBridge) HierarchyReLocator(entity AbstractMulticast) {
defer func() {
if r := recover(); r != nil {
fmt.Println("An error happened in <HierarchyLocator()>, but Zoreide recovered. ")
fmt.Println("Error was: ", r)
}
}()
log.Println("Inizializing HierarchyManager")
entity.AddUniqueAndSort(b.hIerarchyNumber)
re := regexp.MustCompile("[0-9]+")
for bstNumber := range BstChannel {
cleanStr := re.FindAllString(bstNumber, -1)
if cleanStr == nil {
log.Println("No numbers in multicast: ", bstNumber)
continue
} else {
log.Println("Extracted numbers in multicast: ", cleanStr)
bstNumber = cleanStr[0]
}
brdNumber, err := strconv.ParseInt(bstNumber, 10, 64)
if err != nil {
if len(bstNumber) < 36 {
log.Println("Garbage received on multicast: ", bstNumber)
log.Println("Cannot convert to int64:", err.Error())
continue
} else {
log.Println("Adding received:", brdNumber)
entity.AddUniqueAndSort(brdNumber)
}
brIp, _, err := net.ParseCIDR(b.BridgeIpCIDR)
if err != nil {
log.Println(err.Error())
log.Println("Adding received:", bstNumber)
entity.AddUniqueAndSort(bstNumber)
}
// finished feeding the new number
// if Alpha:
if entity.IsAlpha(b.hIerarchyNumber) {
if isActive(brIp.String()) {
if b.IsAssigned() {
log.Println("Still ALPHA. This is ok.")
} else {
log.Println("I'm the new ALPHA! Get out my path, losers!")
b.configureIpAndBridgeUp()
log.Println("Ip is active: ", isActive(brIp.String()))
log.Println("Ip is active: ", b.IsActive())
}
// here we manage the case when we are not alpha
} else {
log.Println("GULP! There is a bigger one, better descalating")
if isActive(brIp.String()) {
if b.IsAssigned() {
log.Println("Start removing the IP from the interface:")
b.removeIPandBridgeInt()
log.Println("Removed.Ip is reachable: ", b.IsActive())
} else {
log.Println("Nothing to do, since IP is not assigned to the interface: ", b.ExistingInterface)
log.Println("Ip is reachable: ", b.IsActive())
}
}
@ -130,22 +136,40 @@ func (b *AbstractBridge) HierarchyReLocator(entity AbstractMulticast) {
func (b *AbstractBridge) WaitAndClean(entity AbstractMulticast) {
log.Println("Inizializing HAManager")
defer func() {
if r := recover(); r != nil {
fmt.Println("An error happened in <WaitAndClean()>, but Zoreide recovered. ")
fmt.Println("Error was: ", r)
}
}()
brIp, _, err := net.ParseCIDR(b.BridgeIpCIDR)
if err != nil {
log.Println(err.Error())
}
log.Println("Inizializing HA-Manager")
for {
pollTime := len(entity.HierarchyArray) + 1
time.Sleep(time.Duration(pollTime) * time.Second)
// svuotare l'array e rifare le elezioni
if !isActive(string(brIp.String())) {
// Evitiamo di avere l'IP senza essere alpha
if b.IsAssigned() {
if entity.IsAlpha(b.hIerarchyNumber) {
log.Println("We are alpha and IP is assigned. All ok")
} else {
log.Println("Cannot have IP assigned without being alpha")
log.Println("Inconsistent situation: start removing the IP from the interface:")
b.removeIPandBridgeInt()
log.Println("Removed.Ip is still reachable: ", b.IsActive())
}
}
if b.IsActive() {
log.Println("Ip reachable, cluster OK")
continue
} else {
log.Println("Ip absent, restart from green field.")
entity.HierarchyArray = entity.HierarchyArray[:0]
entity.HierarchyArray = slices.Clip(entity.HierarchyArray)
entity.AddUniqueAndSort(b.hIerarchyNumber)
log.Println("Restarted from green field: ", entity.HierarchyArray)
}
}

View File

@ -5,7 +5,7 @@ import "net"
type AbstractBridge struct {
ExistingInterface string
BridgeIpCIDR string
hIerarchyNumber int64
hIerarchyNumber string
GArp *Gratuitous
}
@ -17,14 +17,13 @@ type AbstractMulticast struct {
MRaddr *net.UDPAddr
Wconn *net.UDPConn
Rconn *net.UDPConn
HierarchyArray []int64
HierarchyArray []string
}
type AbstractConfig struct {
MulticastConfig struct {
MIPAddr string `json:"MIpAddr"`
MPort string `json:"MPort"`
MaxDatagramSize int `json:"MaxDatagramSize"`
MIPAddr string `json:"MIpAddr"`
MPort string `json:"MPort"`
} `json:"MulticastConfig"`
InterfaceConfig struct {
ExistingInterface string `json:"ExistingInterface"`

118
vendor/github.com/google/uuid/null.go generated vendored Normal file
View File

@ -0,0 +1,118 @@
// Copyright 2021 Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package uuid
import (
"bytes"
"database/sql/driver"
"encoding/json"
"fmt"
)
var jsonNull = []byte("null")
// NullUUID represents a UUID that may be null.
// NullUUID implements the SQL driver.Scanner interface so
// it can be used as a scan destination:
//
// var u uuid.NullUUID
// err := db.QueryRow("SELECT name FROM foo WHERE id=?", id).Scan(&u)
// ...
// if u.Valid {
// // use u.UUID
// } else {
// // NULL value
// }
//
type NullUUID struct {
UUID UUID
Valid bool // Valid is true if UUID is not NULL
}
// Scan implements the SQL driver.Scanner interface.
func (nu *NullUUID) Scan(value interface{}) error {
if value == nil {
nu.UUID, nu.Valid = Nil, false
return nil
}
err := nu.UUID.Scan(value)
if err != nil {
nu.Valid = false
return err
}
nu.Valid = true
return nil
}
// Value implements the driver Valuer interface.
func (nu NullUUID) Value() (driver.Value, error) {
if !nu.Valid {
return nil, nil
}
// Delegate to UUID Value function
return nu.UUID.Value()
}
// MarshalBinary implements encoding.BinaryMarshaler.
func (nu NullUUID) MarshalBinary() ([]byte, error) {
if nu.Valid {
return nu.UUID[:], nil
}
return []byte(nil), nil
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (nu *NullUUID) UnmarshalBinary(data []byte) error {
if len(data) != 16 {
return fmt.Errorf("invalid UUID (got %d bytes)", len(data))
}
copy(nu.UUID[:], data)
nu.Valid = true
return nil
}
// MarshalText implements encoding.TextMarshaler.
func (nu NullUUID) MarshalText() ([]byte, error) {
if nu.Valid {
return nu.UUID.MarshalText()
}
return jsonNull, nil
}
// UnmarshalText implements encoding.TextUnmarshaler.
func (nu *NullUUID) UnmarshalText(data []byte) error {
id, err := ParseBytes(data)
if err != nil {
nu.Valid = false
return err
}
nu.UUID = id
nu.Valid = true
return nil
}
// MarshalJSON implements json.Marshaler.
func (nu NullUUID) MarshalJSON() ([]byte, error) {
if nu.Valid {
return json.Marshal(nu.UUID)
}
return jsonNull, nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (nu *NullUUID) UnmarshalJSON(data []byte) error {
if bytes.Equal(data, jsonNull) {
*nu = NullUUID{}
return nil // valid null UUID
}
err := json.Unmarshal(data, &nu.UUID)
nu.Valid = err == nil
return err
}

View File

@ -12,6 +12,7 @@ import (
"fmt"
"io"
"strings"
"sync"
)
// A UUID is a 128 bit (16 byte) Universal Unique IDentifier as defined in RFC
@ -33,7 +34,15 @@ const (
Future // Reserved for future definition.
)
var rander = rand.Reader // random function
const randPoolSize = 16 * 16
var (
rander = rand.Reader // random function
poolEnabled = false
poolMu sync.Mutex
poolPos = randPoolSize // protected with poolMu
pool [randPoolSize]byte // protected with poolMu
)
type invalidLengthError struct{ len int }
@ -41,6 +50,12 @@ func (err invalidLengthError) Error() string {
return fmt.Sprintf("invalid UUID length: %d", err.len)
}
// IsInvalidLengthError is matcher function for custom error invalidLengthError
func IsInvalidLengthError(err error) bool {
_, ok := err.(invalidLengthError)
return ok
}
// Parse decodes s into a UUID or returns an error. Both the standard UUID
// forms of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx and
// urn:uuid:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx are decoded as well as the
@ -249,3 +264,31 @@ func SetRand(r io.Reader) {
}
rander = r
}
// EnableRandPool enables internal randomness pool used for Random
// (Version 4) UUID generation. The pool contains random bytes read from
// the random number generator on demand in batches. Enabling the pool
// may improve the UUID generation throughput significantly.
//
// Since the pool is stored on the Go heap, this feature may be a bad fit
// for security sensitive applications.
//
// Both EnableRandPool and DisableRandPool are not thread-safe and should
// only be called when there is no possibility that New or any other
// UUID Version 4 generation function will be called concurrently.
func EnableRandPool() {
poolEnabled = true
}
// DisableRandPool disables the randomness pool if it was previously
// enabled with EnableRandPool.
//
// Both EnableRandPool and DisableRandPool are not thread-safe and should
// only be called when there is no possibility that New or any other
// UUID Version 4 generation function will be called concurrently.
func DisableRandPool() {
poolEnabled = false
defer poolMu.Unlock()
poolMu.Lock()
poolPos = randPoolSize
}

View File

@ -27,6 +27,8 @@ func NewString() string {
// The strength of the UUIDs is based on the strength of the crypto/rand
// package.
//
// Uses the randomness pool if it was enabled with EnableRandPool.
//
// A note about uniqueness derived from the UUID Wikipedia entry:
//
// Randomly generated UUIDs have 122 random bits. One's annual risk of being
@ -35,7 +37,10 @@ func NewString() string {
// equivalent to the odds of creating a few tens of trillions of UUIDs in a
// year and having one duplicate.
func NewRandom() (UUID, error) {
return NewRandomFromReader(rander)
if !poolEnabled {
return NewRandomFromReader(rander)
}
return newRandomFromPool()
}
// NewRandomFromReader returns a UUID based on bytes read from a given io.Reader.
@ -49,3 +54,23 @@ func NewRandomFromReader(r io.Reader) (UUID, error) {
uuid[8] = (uuid[8] & 0x3f) | 0x80 // Variant is 10
return uuid, nil
}
func newRandomFromPool() (UUID, error) {
var uuid UUID
poolMu.Lock()
if poolPos == randPoolSize {
_, err := io.ReadFull(rander, pool[:])
if err != nil {
poolMu.Unlock()
return Nil, err
}
poolPos = 0
}
copy(uuid[:], pool[poolPos:(poolPos+16)])
poolPos += 16
poolMu.Unlock()
uuid[6] = (uuid[6] & 0x0f) | 0x40 // Version 4
uuid[8] = (uuid[8] & 0x3f) | 0x80 // Variant is 10
return uuid, nil
}

2
vendor/modules.txt vendored
View File

@ -5,7 +5,7 @@ github.com/docker/libcontainer/system
# github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534
## explicit; go 1.14
github.com/go-ping/ping
# github.com/google/uuid v1.2.0
# github.com/google/uuid v1.3.0
## explicit
github.com/google/uuid
# github.com/milosgajdos/tenus v0.0.3

View File

@ -2,7 +2,7 @@
"MulticastConfig": {
"MIpAddr": "239.0.0.19",
"MPort": "9898",
"MaxDatagramSize": 32
"MaxDatagramSize": 36
},
"InterfaceConfig": {
"ExistingInterface": "eth0",