zoreide/orchestrator.go

144 lines
3.1 KiB
Go
Raw Normal View History

2023-08-09 09:08:05 -05:00
package main
import (
"fmt"
"log"
2023-08-09 12:38:57 -05:00
"net"
"regexp"
2023-08-09 09:08:05 -05:00
"slices"
"strconv"
"time"
)
func (mip *AbstractMulticast) AddUniqueAndSort(hier int64) {
if slices.Contains(mip.HierarchyArray, hier) {
2023-08-12 17:03:49 -05:00
log.Println("Element already in the array: ", hier)
2023-08-09 09:08:05 -05:00
} else {
mip.HierarchyArray = append(mip.HierarchyArray, hier)
log.Println("Here the numbers known:", mip.HierarchyArray)
2023-08-09 09:08:05 -05:00
}
}
func (mip *AbstractMulticast) IsAlpha(hier int64) bool {
2023-08-12 16:56:25 -05:00
alpha := slices.Max(mip.HierarchyArray)
log.Println("Maximum element is :", alpha)
2023-08-12 16:56:25 -05:00
return alpha == hier
2023-08-09 09:08:05 -05:00
}
func (mip *AbstractMulticast) WriteNumberToMulticast(br AbstractBridge) {
defer func() {
if r := recover(); r != nil {
fmt.Println("An error happened in <WriteNumberToMulticast()>, but Zoreide recovered. ")
fmt.Println("Error was: ", r)
}
}()
log.Println("Initiating ticker")
bstNumber := fmt.Sprintf("%d", br.hIerarchyNumber)
for range time.Tick(1 * time.Second) {
_, err := mip.Wconn.Write([]byte(bstNumber))
if err != nil {
log.Println("Cannot write to multicast:" + err.Error())
}
}
}
func (mip *AbstractMulticast) ReadNumberFromMulticast() {
log.Println("Initiating reader")
buffer := make([]byte, mip.MaxDatagramSize)
// Loop forever reading from the socket
for {
_, _, err := mip.Rconn.ReadFromUDP(buffer)
if err != nil {
log.Println("ReadFromUDP failed:", err)
}
BstChannel <- string(buffer)
}
}
func (b *AbstractBridge) HierarchyReLocator(entity AbstractMulticast) {
log.Println("Inizializing Descalator")
entity.AddUniqueAndSort(b.hIerarchyNumber)
re := regexp.MustCompile("[0-9]+")
2023-08-09 09:08:05 -05:00
for bstNumber := range BstChannel {
bstNumber = re.FindAllString(bstNumber, -1)[0]
2023-08-09 09:08:05 -05:00
brdNumber, err := strconv.ParseInt(bstNumber, 10, 64)
if err != nil {
log.Println("Garbage received on multicast: ", bstNumber)
log.Println("Cannot convert to int64:", err.Error())
continue
} else {
log.Println("Adding received:", brdNumber)
entity.AddUniqueAndSort(brdNumber)
}
2023-08-09 12:38:57 -05:00
brIp, _, err := net.ParseCIDR(b.BridgeIpCIDR)
if err != nil {
log.Println(err.Error())
}
2023-08-09 09:08:05 -05:00
// finished feeding the new number
// if Alpha:
if entity.IsAlpha(b.hIerarchyNumber) {
2023-08-09 12:38:57 -05:00
if isActive(brIp.String()) {
2023-08-09 09:08:05 -05:00
log.Println("Still ALPHA. This is ok.")
} else {
log.Println("I'm the new ALPHA! Get out my path, losers!")
b.configureIpAndBridgeUp()
2023-08-09 12:38:57 -05:00
log.Println("Ip is active: ", isActive(brIp.String()))
2023-08-09 09:08:05 -05:00
}
} else {
log.Println("GULP! There is a bigger one, better descalating")
2023-08-09 12:38:57 -05:00
if isActive(brIp.String()) {
2023-08-09 09:08:05 -05:00
b.removeIPandBridgeInt()
2023-08-09 12:38:57 -05:00
2023-08-09 09:08:05 -05:00
}
}
}
}
func (b *AbstractBridge) WaitAndClean(entity AbstractMulticast) {
log.Println("Inizializing Escalator")
2023-08-10 13:39:26 -05:00
brIp, _, err := net.ParseCIDR(b.BridgeIpCIDR)
if err != nil {
log.Println(err.Error())
}
2023-08-09 09:08:05 -05:00
for {
2023-08-10 13:39:26 -05:00
2023-08-09 09:08:05 -05:00
pollTime := len(entity.HierarchyArray) + 1
time.Sleep(time.Duration(pollTime) * time.Second)
2023-08-10 13:39:26 -05:00
// svuotare l'array e rifare le elezioni
if !isActive(string(brIp.String())) {
2023-08-10 13:39:26 -05:00
entity.HierarchyArray = entity.HierarchyArray[:0]
2023-08-12 17:03:49 -05:00
entity.HierarchyArray = slices.Clip(entity.HierarchyArray)
entity.AddUniqueAndSort(b.hIerarchyNumber)
2023-08-10 13:39:26 -05:00
}
2023-08-09 09:08:05 -05:00
}
}