You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

4979 lines
165 KiB

package modules
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"github.com/project-nano/framework"
uuid "github.com/satori/go.uuid"
"io/ioutil"
"log"
"math"
"math/rand"
"net"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"time"
)
//config file define
type cellDefine struct {
Enabled bool `json:"enabled,omitempty"`
PurgeAppending bool `json:"purge_appending,omitempty"`
Instances []string `json:"-"`
}
type poolDefine struct {
Name string `json:"name"`
Enabled bool `json:"enabled,omitempty"`
Network string `json:"network,omitempty"`
Storage string `json:"storage,omitempty"`
Failover bool `json:"failover,omitempty"`
Cells map[string]cellDefine `json:"cells,omitempty"`
}
type storageDefine struct {
Name string `json:"name"`
Type string `json:"type"`
Host string `json:"host"`
Target string `json:"target"`
}
type addressPoolDefine struct {
AddressPoolConfig
Ranges []AddressRangeStatus `json:"ranges,omitempty"`
}
type ResourceData struct {
Zone string `json:"zone"`
Pools []poolDefine `json:"pools"`
StoragePools []storageDefine `json:"storage_pools,omitempty"`
AddressPools []addressPoolDefine `json:"address_pools,omitempty"`
SystemTemplates []SystemTemplate `json:"system_templates,omitempty"`
SecurityPolicyGroup []managedSecurityPolicyGroup `json:"security_policy_group,omitempty"`
}
//memory status define
type ManagedZone struct {
Name string
PoolStatistic
CellStatistic
InstanceStatistic
ResourceUsage
}
type ManagedComputePool struct {
ComputePoolInfo
Cells map[string]bool
InstanceNames map[string]string //name => id
CellStatistic
InstanceStatistic
ResourceUsage
}
type ManagedComputeCell struct {
ComputeCellInfo
Pool string
LatestUpdate time.Time
Instances map[string]bool
Pending map[string]bool
InstanceStatistic
ResourceUsage
}
type ManagedIPV4AddressRange struct {
startAddress net.IP
endAddress net.IP
netmask net.IPMask
capacity uint32
allocated map[string]string
}
type ManagedAddressPool struct {
name string
gateway string
dns []string
provider string
mode string
ranges map[string]ManagedIPV4AddressRange
rangeStartAddressed []string
}
type imageServer struct {
Host string
Port int
}
type BatchCreateGuestTask struct {
StartTime time.Time
LatestUpdate time.Time
Finished bool
Guests []CreateGuestStatus
GuestName map[string]int //name => index
}
type BatchDeleteGuestTask struct {
StartTime time.Time
LatestUpdate time.Time
Finished bool
Guests []DeleteGuestStatus
GuestID map[string]int //id => index
}
type managedSecurityPolicyGroup struct {
SecurityPolicyGroupStatus
Rules []SecurityPolicyRule `json:"rules,omitempty"`
}
type BatchStopGuestTask struct {
StartTime time.Time
LatestUpdate time.Time
Finished bool
Guests []StopGuestStatus
GuestID map[string]int //id => index
}
type ResourceManager struct {
reportChan chan CellStatusReport
commands chan resourceCommand
pools map[string]ManagedComputePool
cells map[string]ManagedComputeCell
unallocatedCells map[string]bool
instances map[string]InstanceStatus
imageServers map[string]imageServer //key = server name
pendingError map[string]error //pending create error
storagePools map[string]StoragePoolInfo
addressPools map[string]ManagedAddressPool
migrations map[string]MigrationStatus
batchCreateTasks map[string]BatchCreateGuestTask
batchDeleteTasks map[string]BatchDeleteGuestTask
batchStopTasks map[string]BatchStopGuestTask
templates map[string]SystemTemplate
allTemplateID []string
policyGroups map[string]managedSecurityPolicyGroup
policyGroupNames map[string]bool
sortedPolicyGroupID []string
generator *rand.Rand
zone ManagedZone
startTime time.Time
dataFile string
runner *framework.SimpleRunner
}
type resourceCommand struct {
Type commandType
DiskImage DiskImageConfig
Migration MigrationParameter
Pool string
Cell string
Address string
Range string
Start string
InstanceID string
Hardware string
InstanceList []InstanceStatus
Instance InstanceStatus
InstanceQuery GuestQueryCondition
MonitorPort uint
Name string
Host string
Port int
Group string
Tags []string
Progress uint
Size uint64
Image string
Secret string
Storage string
StorageType string
Target string
MigrationID string
Error error
Failover bool
IDList []string
PortList []uint64
DiskImages []DiskImageStatus
AddressPool AddressPoolConfig
AddressRange AddressRangeConfig
BatchID string
BatchCreating BatchCreateRequest
Priority PriorityEnum
ReadSpeed uint64
WriteSpeed uint64
ReadIOPS uint64
WriteIOPS uint64
ReceiveSpeed uint64
SendSpeed uint64
TemplateID string
TemplateConfig SystemTemplateConfig
PolicyGroup SecurityPolicyGroup
PolicyGroupQuery SecurityPolicyGroupQueryCondition
PolicyRule SecurityPolicyRule
Index int
Flag bool
SearchCondition SearchGuestsCondition
ErrorChan chan error
ResultChan chan ResourceResult
}
type ResourceStatistic struct {
Name string
Error error
Enabled bool
Alive bool
PoolStatistic
CellStatistic
InstanceStatistic
ResourceUsage
}
type commandType int
const (
evaluateCoreFactor = 2
evaluateMemoryFactor = 1.5
evaluateDiskFactor = 0.5
)
const (
cmdQueryAllComputePoolInfo = iota
cmdGetComputePoolInfo
cmdCreateComputePool
cmdDeleteComputePool
cmdModifyComputePool
cmdQueryZoneStatus
cmdQueryComputePoolStatus
cmdGetComputePoolStatus
cmdQueryUnallocatedComputeCell
cmdQueryComputeCells
cmdQueryComputeCellStatus
cmdGetComputeCellStatus
cmdAddComputeCell
cmdRemoveComputeCell
cmdEnableComputeCell
cmdDisableComputeCell
cmdFinishPurgeCell
cmdGetCellStatus
cmdUpdateCellInfo
cmdSetCellDead
cmdBatchUpdateInstanceStatus
cmdUpdateInstanceStatus
cmdAllocateInstance
cmdConfirmInstance
cmdDeallocateInstance
cmdGetInstanceStatus
cmdQueryInstanceStatusInPool
cmdQueryInstanceStatusInCell
cmdUpdateInstanceAddress
cmdUpdateInstancePriority
cmdUpdateInstanceDiskThreshold
cmdUpdateInstanceNetworkThreshold
cmdUpdateInstanceMonitorSecret
cmdRenameInstance
cmdGetInstanceByName
cmdQueryGuestsByCondition
cmdAddImageServer
cmdRemoveImageServer
cmdGetImageServer
cmdCreateStoragePool
cmdDeleteStoragePool
cmdModifyStoragePool
cmdQueryStoragePool
cmdGetStoragePool
cmdQueryAddressPool
cmdGetAddressPool
cmdCreateAddressPool
cmdModifyAddressPool
cmdDeleteAddressPool
cmdQueryAddressRange
cmdGetAddressRange
cmdAddAddressRange
cmdRemoveAddressRange
cmdQueryMigration
cmdGetMigration
cmdCreateMigration
cmdFinishMigration
cmdCancelMigration
cmdUpdateMigration
cmdBuildFailoverPlan
cmdMigrationInstance
cmdPurgeInstance
cmdBeginResetSystem
cmdFinishResetSystem
cmdStartBatchCreateGuest
cmdSetBatchCreateGuestStart
cmdSetBatchCreateGuestFail
cmdGetBatchCreateGuest
cmdStartBatchDeleteGuest
cmdSetBatchDeleteGuestSuccess
cmdSetBatchDeleteGuestFail
cmdGetBatchDeleteGuest
cmdStartBatchStopGuest
cmdSetBatchStopGuestSuccess
cmdSetBatchStopGuestFail
cmdGetBatchStopGuest
cmdQuerySystemTemplates
cmdGetSystemTemplate
cmdCreateSystemTemplate
cmdModifySystemTemplate
cmdDeleteSystemTemplate
cmdQuerySecurityPolicyGroups
cmdGetSecurityPolicyGroup
cmdCreateSecurityPolicyGroup
cmdModifySecurityPolicyGroup
cmdDeleteSecurityPolicyGroup
cmdGetSecurityPolicyRules
cmdAddSecurityPolicyRule
cmdModifySecurityPolicyRule
cmdRemoveSecurityPolicyRule
cmdMoveSecurityPolicyRule
cmdSearchGuests
cmdUpdateAutoStart
cmdInvalid
)
var commandNames = []string{
"QueryAllComputePoolInfo",
"GetComputePoolInfo",
"CreateComputePool",
"DeleteComputePool",
"ModifyComputePool",
"QueryZoneStatus",
"QueryComputePoolStatus",
"GetComputePoolStatus",
"QueryUnallocatedComputeCell",
"QueryComputeCells",
"QueryComputeCellStatus",
"GetComputeCellStatus",
"AddComputeCell",
"RemoveComputeCell",
"EnableComputeCell",
"DisableComputeCell",
"FinishPurgeCell",
"GetCellStatus",
"UpdateCellInfo",
"SetCellDead",
"BatchUpdateInstanceStatus",
"UpdateInstanceStatus",
"AllocateInstance",
"ConfirmInstance",
"DeallocateInstance",
"GetInstanceStatus",
"QueryInstanceStatusInPool",
"QueryInstanceStatusInCell",
"UpdateInstanceAddress",
"UpdateInstancePriority",
"UpdateInstanceDiskThreshold",
"UpdateInstanceNetworkThreshold",
"UpdateInstanceMonitorSecret",
"RenameInstance",
"GetInstanceByName",
"QueryGuestsByCondition",
"AddImageServer",
"RemoveImageServer",
"GetImageServer",
"CreateStoragePool",
"DeleteStoragePool",
"ModifyStoragePool",
"QueryStoragePool",
"GetStoragePool",
"QueryAddressPool",
"GetAddressPool",
"CreateAddressPool",
"ModifyAddressPool",
"DeleteAddressPool",
"QueryAddressRange",
"GetAddressRange",
"AddAddressRange",
"RemoveAddressRange",
"QueryMigration",
"GetMigration",
"CreateMigration",
"FinishMigration",
"CancelMigration",
"UpdateMigration",
"BuildFailoverPlan",
"MigrationInstance",
"PurgeInstance",
"BeginResetSystem",
"FinishResetSystem",
"StartBatchCreateGuest",
"SetBatchCreateGuestStart",
"SetBatchCreateGuestFail",
"GetBatchCreateGuest",
"StartBatchDeleteGuest",
"SetBatchDeleteGuestSuccess",
"SetBatchDeleteGuestFail",
"GetBatchDeleteGuest",
"StartBatchStopGuest",
"SetBatchStopGuestSuccess",
"SetBatchStopGuestFail",
"GetBatchStopGuest",
"QuerySystemTemplates",
"GetSystemTemplate",
"CreateSystemTemplate",
"ModifySystemTemplate",
"DeleteSystemTemplate",
"QuerySecurityPolicyGroups",
"GetSecurityPolicyGroup",
"CreateSecurityPolicyGroup",
"ModifySecurityPolicyGroup",
"DeleteSecurityPolicyGroup",
"GetSecurityPolicyRules",
"AddSecurityPolicyRule",
"ModifySecurityPolicyRule",
"RemoveSecurityPolicyRule",
"MoveSecurityPolicyRule",
"SearchGuests",
"UpdateAutoStart",
}
func (c commandType) toString() string {
if c >= cmdInvalid{
return "invalid"
}
return commandNames[c]
}
const (
TimeFormatLayout = "2006-01-02 15:04:05"
StorageTypeNFS = "nfs"
RangeTypeExternal = "external"
RangeTypeInternal = "internal"
)
func CreateResourceManager(dataPath string) (manager *ResourceManager, err error) {
if cmdInvalid != len(commandNames){
err = fmt.Errorf("insufficient command names %d/%d", len(commandNames), cmdInvalid)
return
}
const (
DefaultQueueLength = 1 << 10
DefaultDataFilename = "resource.data"
)
manager = &ResourceManager{}
manager.runner = framework.CreateSimpleRunner(manager.mainRoutine)
manager.reportChan = make(chan CellStatusReport, DefaultQueueLength)
manager.commands = make(chan resourceCommand, DefaultQueueLength)
manager.dataFile = filepath.Join(dataPath, DefaultDataFilename)
manager.pools = map[string]ManagedComputePool{}
manager.cells = map[string]ManagedComputeCell{}
manager.instances = map[string]InstanceStatus{}
manager.unallocatedCells = map[string]bool{}
manager.imageServers = map[string]imageServer{}
manager.pendingError = map[string]error{}
manager.storagePools = map[string]StoragePoolInfo{}
manager.addressPools = map[string]ManagedAddressPool{}
manager.templates = map[string]SystemTemplate{}
manager.policyGroups = map[string]managedSecurityPolicyGroup{}
manager.policyGroupNames = map[string]bool{}
manager.migrations = map[string]MigrationStatus{}
manager.batchCreateTasks = map[string]BatchCreateGuestTask{}
manager.batchDeleteTasks = map[string]BatchDeleteGuestTask{}
manager.batchStopTasks = map[string]BatchStopGuestTask{}
manager.generator = rand.New(rand.NewSource(time.Now().UnixNano()))
manager.startTime = time.Now()
if err := manager.loadConfig(); err != nil {
return nil, err
}
return manager, nil
}
func (manager *ResourceManager) Start() error {
return manager.runner.Start()
}
func (manager *ResourceManager) Stop() error {
return manager.runner.Stop()
}
func (manager *ResourceManager) UpdateCellStatus(report CellStatusReport) {
manager.reportChan <- report
}
func (manager *ResourceManager) CreatePool(name, storage, address string, failover bool, resultChan chan error) {
req := resourceCommand{Type: cmdCreateComputePool, Pool: name, Storage:storage, Address: address, Failover: failover, ErrorChan: resultChan}
manager.commands <- req
}
func (manager *ResourceManager) ModifyPool(name, storage, address string, failover bool, resultChan chan error){
manager.commands <- resourceCommand{Type:cmdModifyComputePool, Pool:name, Storage:storage, Address: address, Failover: failover, ErrorChan:resultChan}
}
func (manager *ResourceManager) DeletePool(name string, resultChan chan error) {
req := resourceCommand{Type: cmdDeleteComputePool, Pool: name, ErrorChan: resultChan}
manager.commands <- req
}
//storage pools
func (manager *ResourceManager) CreateStoragePool(name, storageType, host, target string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdCreateStoragePool, Storage:name, StorageType:storageType, Host:host, Target: target, ErrorChan:respChan}
}
func (manager *ResourceManager) ModifyStoragePool(name, storageType, host, target string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdModifyStoragePool, Storage:name, StorageType:storageType, Host:host, Target: target, ErrorChan:respChan}
}
func (manager *ResourceManager) DeleteStoragePool(name string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdDeleteStoragePool, Storage:name, ErrorChan:respChan}
}
func (manager *ResourceManager) GetStoragePool(name string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdGetStoragePool, Storage:name, ResultChan: respChan}
}
func (manager *ResourceManager) QueryStoragePool(respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdQueryStoragePool, ResultChan: respChan}
}
func (manager *ResourceManager) QueryCellsInPool(pool string, resp chan ResourceResult){
cmd := resourceCommand{Type:cmdQueryComputeCells, Pool:pool, ResultChan:resp}
manager.commands <- cmd
}
func (manager *ResourceManager) AddCell(pool, cell string, resultChan chan error) {
req := resourceCommand{Type: cmdAddComputeCell, Pool: pool, Cell: cell, ErrorChan: resultChan}
manager.commands <- req
}
func (manager *ResourceManager) RemoveCell(pool, cell string, resultChan chan error) {
req := resourceCommand{Type: cmdRemoveComputeCell, Pool: pool, Cell: cell, ErrorChan: resultChan}
manager.commands <- req
}
func (manager *ResourceManager) EnableCell(poolName, cellName string, respChan chan error){
manager.commands <- resourceCommand{Type:cmdEnableComputeCell, Pool:poolName, Cell:cellName, ErrorChan:respChan}
}
func (manager *ResourceManager) DisableCell(poolName, cellName string, purge bool, respChan chan error){
manager.commands <- resourceCommand{Type:cmdDisableComputeCell, Pool:poolName, Cell:cellName, ErrorChan:respChan}
}
func (manager *ResourceManager) FinishPurgeCell(cellName string, respChan chan error){
manager.commands <- resourceCommand{Type:cmdFinishPurgeCell, Cell:cellName, ErrorChan:respChan}
}
func (manager *ResourceManager) GetUnallocatedCells(resp chan ResourceResult) {
req := resourceCommand{Type: cmdQueryUnallocatedComputeCell, ResultChan:resp}
manager.commands <- req
}
func (manager *ResourceManager) QueryZoneStatus(resp chan ResourceResult) {
req := resourceCommand{Type: cmdQueryZoneStatus, ResultChan: resp}
manager.commands <- req
}
func (manager *ResourceManager) QueryComputePoolStatus(resp chan ResourceResult) {
req := resourceCommand{Type: cmdQueryComputePoolStatus, ResultChan: resp}
manager.commands <- req
}
func (manager *ResourceManager) GetComputePoolStatus(pool string, resp chan ResourceResult) {
req := resourceCommand{Type: cmdGetComputePoolStatus, Pool: pool, ResultChan: resp}
manager.commands <- req
}
func (manager *ResourceManager) QueryComputeCellStatus(pool string, resp chan ResourceResult) {
req := resourceCommand{Type: cmdQueryComputeCellStatus, Pool: pool, ResultChan: resp}
manager.commands <- req
}
func (manager *ResourceManager) GetComputeCellStatus(pool, cell string, resp chan ResourceResult) {
req := resourceCommand{Type: cmdGetComputeCellStatus, Pool: pool, Cell:cell, ResultChan: resp}
manager.commands <- req
}
func (manager *ResourceManager) GetAllComputePool(resp chan ResourceResult) {
req := resourceCommand{Type: cmdQueryAllComputePoolInfo, ResultChan: resp}
manager.commands <- req
}
func (manager *ResourceManager) GetComputePool(pool string, resp chan ResourceResult){
cmd := resourceCommand{Type: cmdGetComputePoolInfo, Pool:pool, ResultChan:resp}
manager.commands <- cmd
}
func (manager *ResourceManager) UpdateCellInfo(name, address string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdUpdateCellInfo, Cell:name, Address:address, ErrorChan:respChan}
}
func (manager *ResourceManager) GetCellStatus(cell string, respChan chan ResourceResult) {
req := resourceCommand{Type: cmdGetCellStatus, Cell: cell, ResultChan: respChan}
manager.commands <- req
}
func (manager *ResourceManager) SetCellDead(cellName string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdSetCellDead, Cell:cellName, ErrorChan:respChan}
}
func (manager *ResourceManager) QueryGuestsByCondition(condition GuestQueryCondition, respChan chan ResourceResult) {
cmd := resourceCommand{Type: cmdQueryGuestsByCondition, InstanceQuery: condition, ResultChan: respChan}
manager.commands <- cmd
}
func (manager *ResourceManager) SearchGuests(condition SearchGuestsCondition, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdSearchGuests, SearchCondition: condition, ResultChan: respChan}
}
func (manager *ResourceManager) BatchUpdateInstanceStatus(pool, cell string, instances []InstanceStatus, respChan chan error) {
cmd := resourceCommand{Type: cmdBatchUpdateInstanceStatus, Pool: pool, Cell: cell, InstanceList: instances, ErrorChan: respChan}
manager.commands <- cmd
}
func (manager *ResourceManager) AllocateInstance(pool string, config InstanceStatus, respChan chan ResourceResult) {
cmd := resourceCommand{Type: cmdAllocateInstance, Pool: pool, Instance: config, ResultChan: respChan}
manager.commands <- cmd
}
func (manager *ResourceManager) UpdateInstanceStatus(status InstanceStatus, respChan chan error) {
cmd := resourceCommand{Type: cmdUpdateInstanceStatus, Instance: status, ErrorChan: respChan}
manager.commands <- cmd
}
func (manager *ResourceManager) ConfirmInstance(id string, monitor uint, secret, ethernetAddress string, respChan chan error) {
cmd := resourceCommand{Type: cmdConfirmInstance, InstanceID: id, MonitorPort: monitor, Secret:secret, Hardware: ethernetAddress, ErrorChan: respChan}
manager.commands <- cmd
}
func (manager *ResourceManager) DeallocateInstance(id string, err error, respChan chan error) {
cmd := resourceCommand{Type: cmdDeallocateInstance, InstanceID: id, Error: err, ErrorChan: respChan}
manager.commands <- cmd
}
func (manager *ResourceManager) GetInstanceStatus(id string, respChan chan ResourceResult) {
cmd := resourceCommand{Type: cmdGetInstanceStatus, InstanceID: id, ResultChan: respChan}
manager.commands <- cmd
}
func (manager *ResourceManager) QueryInstanceStatusInPool(poolName string, respChan chan ResourceResult){
cmd := resourceCommand{Type: cmdQueryInstanceStatusInPool, Pool: poolName, ResultChan: respChan}
manager.commands <- cmd
}
func (manager *ResourceManager) QueryInstanceStatusInCell(poolName, cellName string, respChan chan ResourceResult){
cmd := resourceCommand{Type: cmdQueryInstanceStatusInCell, Pool: poolName, Cell:cellName, ResultChan: respChan}
manager.commands <- cmd
}
func (manager *ResourceManager) UpdateInstanceAddress(id, ip string, respChan chan error){
manager.commands <- resourceCommand{Type:cmdUpdateInstanceAddress, InstanceID:id, Address:ip, ErrorChan:respChan}
}
func (manager *ResourceManager) UpdateInstancePriority(id string, priority PriorityEnum, respChan chan error) {
manager.commands <- resourceCommand{Type: cmdUpdateInstancePriority, InstanceID: id, Priority: priority, ErrorChan:respChan}
}
func (manager *ResourceManager) UpdateInstanceMonitorSecret(id, secret string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdUpdateInstanceMonitorSecret, InstanceID: id, Secret: secret, ErrorChan: respChan}
}
func (manager *ResourceManager) UpdateInstanceDiskThreshold(id string, readSpeed, readIOPS, writeSpeed, writeIOPS uint64, respChan chan error) {
manager.commands <- resourceCommand{Type: cmdUpdateInstanceDiskThreshold, InstanceID: id, ReadSpeed: readSpeed, ReadIOPS: readIOPS, WriteSpeed:writeSpeed, WriteIOPS: writeIOPS, ErrorChan: respChan}
}
func (manager *ResourceManager) UpdateInstanceNetworkThreshold(id string, receive, send uint64, respChan chan error) {
manager.commands <- resourceCommand{Type: cmdUpdateInstanceNetworkThreshold, InstanceID: id, ReceiveSpeed:receive, SendSpeed: send, ErrorChan: respChan}
}
func (manager *ResourceManager) UpdateGuestAutoStart(guestID string, enabled bool, respChan chan error){
manager.commands <- resourceCommand{Type: cmdUpdateAutoStart, InstanceID: guestID, Flag: enabled, ErrorChan: respChan}
}
func (manager *ResourceManager) RenameInstance(id, name string, respChan chan error){
manager.commands <- resourceCommand{Type:cmdRenameInstance, InstanceID:id, Name:name, ErrorChan:respChan}
}
func (manager *ResourceManager) GetInstanceByName(poolName, instanceName string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type:cmdGetInstanceByName, Pool:poolName, Name:instanceName, ResultChan:respChan}
}
func (manager *ResourceManager) AddImageServer(name, host string, port int) {
cmd := resourceCommand{Type: cmdAddImageServer, Name: name, Host: host, Port: port}
manager.commands <- cmd
}
func (manager *ResourceManager) RemoveImageServer(name string) {
cmd := resourceCommand{Type: cmdRemoveImageServer, Name: name}
manager.commands <- cmd
}
func (manager *ResourceManager) GetImageServer(respChan chan ResourceResult) {
cmd := resourceCommand{Type: cmdGetImageServer, ResultChan: respChan}
manager.commands <- cmd
}
//migration
func (manager *ResourceManager) QueryMigration(respChan chan ResourceResult){
manager.commands <- resourceCommand{Type:cmdQueryMigration, ResultChan:respChan}
}
func (manager *ResourceManager) GetMigration(id string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type:cmdGetMigration, MigrationID:id, ResultChan:respChan}
}
func (manager *ResourceManager) CreateMigration(params MigrationParameter, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type:cmdCreateMigration, Migration: params, ResultChan:respChan}
}
func (manager *ResourceManager) FinishMigration(migration string, instances []string, ports []uint64, respChan chan error){
manager.commands <- resourceCommand{Type:cmdFinishMigration, MigrationID:migration, IDList:instances, PortList:ports, ErrorChan:respChan}
}
func (manager *ResourceManager) CancelMigration(migration string, err error, respChan chan error){
manager.commands <- resourceCommand{Type:cmdCancelMigration, MigrationID:migration, Error: err, ErrorChan:respChan}
}
func (manager *ResourceManager) UpdateMigration(migration string, progress uint, respChan chan error){
manager.commands <- resourceCommand{Type:cmdUpdateMigration, MigrationID:migration, Progress:progress, ErrorChan:respChan}
}
func (manager *ResourceManager) BuildFailoverPlan(cellName string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdBuildFailoverPlan, Cell:cellName, ResultChan:respChan}
}
func (manager *ResourceManager) MigrateInstance(oldCell, newCell string, instances []string, ports []uint64, respChan chan error){
manager.commands <- resourceCommand{Type: cmdMigrationInstance, Cell:oldCell, Target:newCell, IDList:instances, PortList:ports, ErrorChan:respChan}
}
func (manager *ResourceManager) PurgeInstance(cellName string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdPurgeInstance, Cell:cellName, ErrorChan:respChan}
}
func (manager *ResourceManager) QueryAddressPool(respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdQueryAddressPool, ResultChan:respChan}
}
func (manager *ResourceManager) GetAddressPool(name string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdGetAddressPool, Address:name, ResultChan:respChan}
}
func (manager *ResourceManager) CreateAddressPool(config AddressPoolConfig, respChan chan error){
manager.commands <- resourceCommand{Type:cmdCreateAddressPool, AddressPool:config, ErrorChan:respChan}
}
func (manager *ResourceManager) ModifyAddressPool(config AddressPoolConfig, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type:cmdModifyAddressPool, AddressPool: config, ResultChan:respChan}
}
func (manager *ResourceManager) DeleteAddressPool(name string, respChan chan error){
manager.commands <- resourceCommand{Type:cmdDeleteAddressPool, Address:name, ErrorChan:respChan}
}
func (manager *ResourceManager) QueryAddressRange(poolName, rangeType string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type:cmdQueryAddressRange, Address:poolName, Range:rangeType, ResultChan:respChan}
}
func (manager *ResourceManager) GetAddressRange(poolName, rangeType, startAddress string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type:cmdGetAddressRange, Address:poolName, Range:rangeType, Start:startAddress, ResultChan:respChan}
}
func (manager *ResourceManager) AddAddressRange(poolName, rangeType string, config AddressRangeConfig, respChan chan error){
manager.commands <- resourceCommand{Type:cmdAddAddressRange, Address:poolName, Range:rangeType, AddressRange: config, ErrorChan:respChan}
}
func (manager *ResourceManager) RemoveAddressRange(poolName, rangeType, startAddress string, respChan chan error){
manager.commands <- resourceCommand{Type:cmdRemoveAddressRange, Address:poolName, Range:rangeType, Start:startAddress, ErrorChan:respChan}
}
func (manager *ResourceManager) BeginResetSystem(instanceID string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdBeginResetSystem, InstanceID: instanceID, ErrorChan:respChan}
}
func (manager *ResourceManager) FinishResetSystem(instanceID string, err error, respChan chan error){
manager.commands <- resourceCommand{Type: cmdFinishResetSystem, InstanceID: instanceID, Error:err, ErrorChan:respChan}
}
//batch
func (manager *ResourceManager) StartBatchCreateGuest(request BatchCreateRequest, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdStartBatchCreateGuest, BatchCreating: request, ResultChan:respChan}
}
func (manager *ResourceManager) SetBatchCreateGuestStart(batchID, guestName, guestID string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdSetBatchCreateGuestStart, BatchID: batchID, Name: guestName, InstanceID: guestID, ErrorChan:respChan}
}
func (manager *ResourceManager) SetBatchCreateGuestFail(batchID, guestName string, err error, respChan chan error){
manager.commands <- resourceCommand{Type: cmdSetBatchCreateGuestFail, BatchID: batchID, Name: guestName, Error:err, ErrorChan:respChan}
}
func (manager *ResourceManager) GetBatchCreateGuestStatus(batchID string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdGetBatchCreateGuest, BatchID: batchID, ResultChan:respChan}
}
func (manager *ResourceManager) StartBatchDeleteGuest(id []string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdStartBatchDeleteGuest, IDList: id, ResultChan:respChan}
}
func (manager *ResourceManager) SetBatchDeleteGuestSuccess(batchID, guestID string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdSetBatchDeleteGuestSuccess, BatchID:batchID, InstanceID:guestID, ErrorChan:respChan}
}
func (manager *ResourceManager) SetBatchDeleteGuestFail(batchID, guestID string, err error, respChan chan error){
manager.commands <- resourceCommand{Type: cmdSetBatchDeleteGuestFail, BatchID:batchID, InstanceID: guestID, Error:err, ErrorChan:respChan}
}
func (manager *ResourceManager) GetBatchDeleteGuestStatus(batchID string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdGetBatchDeleteGuest, BatchID:batchID, ResultChan:respChan}
}
func (manager *ResourceManager) StartBatchStopGuest(id []string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdStartBatchStopGuest, IDList: id, ResultChan:respChan}
}
func (manager *ResourceManager) SetBatchStopGuestSuccess(batchID, guestID string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdSetBatchStopGuestSuccess, BatchID:batchID, InstanceID:guestID, ErrorChan:respChan}
}
func (manager *ResourceManager) SetBatchStopGuestFail(batchID, guestID string, err error, respChan chan error){
manager.commands <- resourceCommand{Type: cmdSetBatchStopGuestFail, BatchID:batchID, InstanceID: guestID, Error:err, ErrorChan:respChan}
}
func (manager *ResourceManager) GetBatchStopGuestStatus(batchID string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdGetBatchStopGuest, BatchID:batchID, ResultChan:respChan}
}
func (manager *ResourceManager) QuerySystemTemplates(respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdQuerySystemTemplates, ResultChan: respChan}
}
func (manager *ResourceManager) GetSystemTemplate(id string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdGetSystemTemplate, TemplateID: id, ResultChan: respChan}
}
func (manager *ResourceManager) CreateSystemTemplate(config SystemTemplateConfig, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdCreateSystemTemplate, TemplateConfig: config, ResultChan: respChan}
}
func (manager *ResourceManager) ModifySystemTemplate(id string, config SystemTemplateConfig, respChan chan error){
manager.commands <- resourceCommand{Type: cmdModifySystemTemplate, TemplateID: id, TemplateConfig: config, ErrorChan: respChan}
}
func (manager *ResourceManager) DeleteSystemTemplate(id string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdDeleteSystemTemplate, TemplateID: id, ErrorChan: respChan}
}
//Security Policy Group
func (manager *ResourceManager) QuerySecurityPolicyGroups(condition SecurityPolicyGroupQueryCondition, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdQuerySecurityPolicyGroups, PolicyGroupQuery: condition, ResultChan: respChan}
}
func (manager *ResourceManager) GetSecurityPolicyGroup(groupID string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdGetSecurityPolicyGroup, Group: groupID, ResultChan: respChan}
}
func (manager *ResourceManager) CreateSecurityPolicyGroup(config SecurityPolicyGroup, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdCreateSecurityPolicyGroup, PolicyGroup: config, ResultChan: respChan}
}
func (manager *ResourceManager) ModifySecurityPolicyGroup(groupID string, config SecurityPolicyGroup, respChan chan error){
manager.commands <- resourceCommand{Type: cmdModifySecurityPolicyGroup, Group: groupID, PolicyGroup: config, ErrorChan: respChan}
}
func (manager *ResourceManager) DeleteSecurityPolicyGroup(groupID string, respChan chan error){
manager.commands <- resourceCommand{Type: cmdDeleteSecurityPolicyGroup, Group: groupID, ErrorChan: respChan}
}
func (manager *ResourceManager) GetSecurityPolicyRules(groupID string, respChan chan ResourceResult){
manager.commands <- resourceCommand{Type: cmdGetSecurityPolicyRules, Group: groupID, ResultChan: respChan}
}
func (manager *ResourceManager) AddSecurityPolicyRule(groupID string, rule SecurityPolicyRule, respChan chan error){
manager.commands <- resourceCommand{Type: cmdAddSecurityPolicyRule, Group: groupID, PolicyRule: rule, ErrorChan: respChan}
}
func (manager *ResourceManager) ModifySecurityPolicyRule(groupID string, index int, rule SecurityPolicyRule, respChan chan error){
manager.commands <- resourceCommand{Type: cmdModifySecurityPolicyRule, Group: groupID, Index: index, PolicyRule: rule, ErrorChan: respChan}
}
func (manager *ResourceManager) RemoveSecurityPolicyRule(groupID string, index int, respChan chan error){
manager.commands <- resourceCommand{Type: cmdRemoveSecurityPolicyRule, Group: groupID, Index: index, ErrorChan: respChan}
}
func (manager *ResourceManager) MoveSecurityPolicyRule(groupID string, index int, up bool, respChan chan error){
manager.commands <- resourceCommand{Type: cmdMoveSecurityPolicyRule, Group: groupID, Index: index, Flag: up, ErrorChan: respChan}
}
func (manager *ResourceManager) mainRoutine(c framework.RoutineController) {
const (
summaryInterval = time.Second * 5
batchUpdateInterval = time.Second * 2
)
var summaryTicker = time.NewTicker(summaryInterval)
var batchUpdateTicker = time.NewTicker(batchUpdateInterval)
for !c.IsStopping() {
select {
case <- c.GetNotifyChannel():
c.SetStopping()
case report := <-manager.reportChan:
manager.onCellStatusUpdate(report)
case <-summaryTicker.C:
manager.onUpdateSystemStatus()
case <- batchUpdateTicker.C:
manager.updateBatchStatus()
case cmd := <-manager.commands:
manager.handleCommand(cmd)
}
}
c.NotifyExit()
}
func (manager *ResourceManager) updateBatchStatus(){
const (
TaskExpire = time.Second * 30
)
var expireTime = time.Now().Add(-TaskExpire)
if 0 != len(manager.batchCreateTasks){
//create batch
var expired []string
for taskID, task := range manager.batchCreateTasks{
if task.Finished {
if task.LatestUpdate.Before(expireTime){
//expired
expired = append(expired, taskID)
}
}else{
//unfinished
if task.LatestUpdate.Before(expireTime){
//expired
task.Finished = true
manager.batchCreateTasks[taskID] = task
log.Printf("<resource_manager> mark batch create task '%s' finished due to expire", taskID)
continue
}
//check all guest
var unfinishedGuestCount = 0
var taskUpdated = false
for guestIndex, guest := range task.Guests{
if guest.Status == BatchTaskStatusProcess{
if createError, exists := manager.pendingError[guest.ID]; exists{
//create fail
guest.Status = BatchTaskStatusFail
guest.Error = createError.Error()
log.Printf("<resource_manager> batch create guest '%s' fail: %s", guest.Name, createError.Error())
}else if 0 == len(guest.ID){
//not id allocated
unfinishedGuestCount++
continue
}else{
ins, exists := manager.instances[guest.ID]
if !exists{
unfinishedGuestCount++
log.Printf("<resource_manager> warning: invalid guest '%s' in batch '%s'", guest.ID, taskID)
continue
}
if ins.Created{
guest.Status = BatchTaskStatusSuccess
log.Printf("<resource_manager> update guest '%s' as created in batch '%s'", guest.Name, taskID)
}else{
guest.Progress = ins.Progress
unfinishedGuestCount++
}
}
task.Guests[guestIndex] = guest
taskUpdated = true
}
}
if taskUpdated{
task.LatestUpdate = time.Now()
manager.batchCreateTasks[taskID] = task
}
if 0 == unfinishedGuestCount{
//all guest processed
task.Finished = true
manager.batchCreateTasks[taskID] = task
log.Printf("<resource_manager> batch create task '%s' finished", taskID)
}
}
}
if 0 != len(expired){
for _, taskID := range expired{
delete(manager.batchCreateTasks, taskID)
log.Printf("<resource_manager> release expired batch create task '%s'", taskID)
}
}
}
if 0 != len(manager.batchDeleteTasks){
//delete batch
var expired []string
for taskID, task := range manager.batchDeleteTasks{
if task.Finished {
if task.LatestUpdate.Before(expireTime){
//expired
expired = append(expired, taskID)
}
}else{
//unfinished
if task.LatestUpdate.Before(expireTime){
//expired
task.Finished = true
manager.batchDeleteTasks[taskID] = task
log.Printf("<resource_manager> mark batch delete task '%s' finished due to expire", taskID)
continue
}
//check all guest
var unfinishedGuestCount = 0
for guestIndex, guest := range task.Guests{
if guest.Status == BatchTaskStatusProcess{
if _, exists := manager.instances[guest.ID];!exists{
//already deleted
guest.Status = BatchTaskStatusSuccess
task.Guests[guestIndex] = guest
log.Printf("<resource_manager> update guest '%s' as deleted in batch '%s'", guest.Name, taskID)
}else{
unfinishedGuestCount++
}
}
}
if 0 == unfinishedGuestCount{
//all guest processed
task.Finished = true
manager.batchDeleteTasks[taskID] = task
log.Printf("<resource_manager> batch delete task '%s' finished", taskID)
}
}
}
if 0 != len(expired){
for _, taskID := range expired{
delete(manager.batchDeleteTasks, taskID)
log.Printf("<resource_manager> release expired batch delete task '%s'", taskID)
}
}
}
if 0 != len(manager.batchStopTasks){
//stop batch
var expired []string
for taskID, task := range manager.batchStopTasks{
if task.Finished {
if task.LatestUpdate.Before(expireTime){
//expired
expired = append(expired, taskID)
}
}else{
//unfinished
if task.LatestUpdate.Before(expireTime){
//expired
task.Finished = true
manager.batchStopTasks[taskID] = task
log.Printf("<resource_manager> mark batch stop task '%s' finished due to expire", taskID)
continue
}
//check all guest
var unfinishedGuestCount = 0
for guestIndex, guest := range task.Guests{
if guest.Status == BatchTaskStatusProcess{
if _, exists := manager.instances[guest.ID];!exists{
//already stopd
guest.Status = BatchTaskStatusSuccess
task.Guests[guestIndex] = guest
log.Printf("<resource_manager> update guest '%s' as stopped in batch '%s'", guest.Name, taskID)
}else{
unfinishedGuestCount++
}
}
}
if 0 == unfinishedGuestCount{
//all guest processed
task.Finished = true
manager.batchStopTasks[taskID] = task
log.Printf("<resource_manager> batch stop task '%s' finished", taskID)
}
}
}
if 0 != len(expired){
for _, taskID := range expired{
delete(manager.batchStopTasks, taskID)
log.Printf("<resource_manager> release expired batch stop task '%s'", taskID)
}
}
}
}
func (manager *ResourceManager) handleCommand(cmd resourceCommand) {
var err error
switch cmd.Type {
case cmdQueryAllComputePoolInfo:
err = manager.handleQueryAllPools(cmd.ResultChan)
case cmdGetComputePoolInfo:
err = manager.handleGetComputePool(cmd.Pool, cmd.ResultChan)
case cmdCreateComputePool:
err = manager.handleCreatePool(cmd.Pool, cmd.Storage, cmd.Address, cmd.Failover, cmd.ErrorChan)
case cmdModifyComputePool:
err = manager.handleModifyPool(cmd.Pool, cmd.Storage, cmd.Address, cmd.Failover, cmd.ErrorChan)
case cmdDeleteComputePool:
err = manager.handleDeletePool(cmd.Pool, cmd.ErrorChan)
case cmdQueryStoragePool:
err = manager.handleQueryStoragePool(cmd.ResultChan)
case cmdGetStoragePool:
err = manager.handleGetStoragePool(cmd.Storage, cmd.ResultChan)
case cmdCreateStoragePool:
err = manager.handleCreateStoragePool(cmd.Storage, cmd.StorageType, cmd.Host, cmd.Target, cmd.ErrorChan)
case cmdModifyStoragePool:
err = manager.handleModifyStoragePool(cmd.Storage, cmd.StorageType, cmd.Host, cmd.Target, cmd.ErrorChan)
case cmdDeleteStoragePool:
err = manager.handleDeleteStoragePool(cmd.Storage, cmd.ErrorChan)
case cmdQueryComputeCells:
err = manager.handleQueryCellsInPool(cmd.Pool, cmd.ResultChan)
case cmdAddComputeCell:
err = manager.handleAddCell(cmd.Pool, cmd.Cell, cmd.ErrorChan)
case cmdRemoveComputeCell:
err = manager.handleRemoveCell(cmd.Pool, cmd.Cell, cmd.ErrorChan)
case cmdEnableComputeCell:
err = manager.handleEnableCell(cmd.Pool, cmd.Cell, cmd.ErrorChan)
case cmdDisableComputeCell:
err = manager.handleDisableCell(cmd.Pool, cmd.Cell, cmd.Failover, cmd.ErrorChan)
case cmdFinishPurgeCell:
err = manager.handleFinishPurgeCell(cmd.Cell, cmd.ErrorChan)
case cmdQueryUnallocatedComputeCell:
err = manager.handleGetUnallocatedCells(cmd.ResultChan)
case cmdQueryZoneStatus:
err = manager.handleQueryZoneStatus(cmd.ResultChan)
case cmdQueryComputePoolStatus:
err = manager.handleQueryComputePoolStatus(cmd.ResultChan)
case cmdGetComputePoolStatus:
err = manager.handleGetComputePoolStatus(cmd.Pool, cmd.ResultChan)
case cmdQueryComputeCellStatus:
err = manager.handleQueryComputeCellStatus(cmd.Pool, cmd.ResultChan)
case cmdGetComputeCellStatus:
err = manager.handleGetComputeCellStatus(cmd.Pool, cmd.Cell, cmd.ResultChan)
case cmdUpdateCellInfo:
err = manager.handleUpdateCellInfo(cmd.Cell, cmd.Address, cmd.ErrorChan)
case cmdGetCellStatus:
err = manager.handleGetCellStatus(cmd.Cell, cmd.ResultChan)
case cmdSearchGuests:
err = manager.handleSearchGuests(cmd.SearchCondition, cmd.ResultChan)
case cmdUpdateAutoStart:
err = manager.handleUpdateGuestAutoStart(cmd.InstanceID, cmd.Flag, cmd.ErrorChan)
case cmdBatchUpdateInstanceStatus:
err = manager.handleBatchUpdateInstanceStatus(cmd.Pool, cmd.Cell, cmd.InstanceList, cmd.ErrorChan)
case cmdAllocateInstance:
err = manager.handleAllocateInstance(cmd.Pool, cmd.Instance, cmd.ResultChan)
case cmdConfirmInstance:
err = manager.handleConfirmInstance(cmd.InstanceID, cmd.MonitorPort, cmd.Secret, cmd.Hardware, cmd.ErrorChan)
case cmdDeallocateInstance:
err = manager.handleDeallocateInstance(cmd.InstanceID, cmd.Error, cmd.ErrorChan)
case cmdUpdateInstanceStatus:
err = manager.handleUpdateInstanceStatus(cmd.Instance, cmd.ErrorChan)
case cmdGetInstanceStatus:
err = manager.handleGetInstanceStatus(cmd.InstanceID, cmd.ResultChan)
case cmdQueryInstanceStatusInPool:
err = manager.handleQueryInstanceStatusInPool(cmd.Pool, cmd.ResultChan)
case cmdQueryInstanceStatusInCell:
err = manager.handleQueryInstanceStatusInCell(cmd.Pool, cmd.Cell, cmd.ResultChan)
case cmdUpdateInstanceAddress:
err = manager.handleUpdateInstanceAddress(cmd.InstanceID, cmd.Address, cmd.ErrorChan)
case cmdRenameInstance:
err = manager.handleRenameInstance(cmd.InstanceID, cmd.Name, cmd.ErrorChan)
case cmdUpdateInstancePriority:
err = manager.handleUpdateInstancePriority(cmd.InstanceID, cmd.Priority, cmd.ErrorChan)
case cmdUpdateInstanceMonitorSecret:
err = manager.handleUpdateInstanceMonitorSecret(cmd.InstanceID, cmd.Secret, cmd.ErrorChan)
case cmdUpdateInstanceNetworkThreshold:
err = manager.handleUpdateInstanceNetworkThreshold(cmd.InstanceID, cmd.ReceiveSpeed, cmd.SendSpeed, cmd.ErrorChan)
case cmdUpdateInstanceDiskThreshold:
err = manager.handleUpdateInstanceDiskThreshold(cmd.InstanceID, cmd.ReadSpeed, cmd.ReadIOPS, cmd.WriteSpeed, cmd.WriteIOPS, cmd.ErrorChan)
case cmdGetInstanceByName:
err = manager.handleGetInstanceByName(cmd.Pool, cmd.Name, cmd.ResultChan)
case cmdQueryGuestsByCondition:
err = manager.handleQueryGuestsByCondition(cmd.InstanceQuery, cmd.ResultChan)
case cmdAddImageServer:
err = manager.handleAddImageServer(cmd.Name, cmd.Host, cmd.Port)
case cmdGetImageServer:
err = manager.handleGetImageServer(cmd.ResultChan)
case cmdSetCellDead:
err = manager.handleSetCellStopped(cmd.Cell, cmd.ErrorChan)
case cmdQueryMigration:
err = manager.handleQueryMigration(cmd.ResultChan)
case cmdGetMigration:
err = manager.handleGetMigration(cmd.MigrationID, cmd.ResultChan)
case cmdCreateMigration:
err = manager.handleCreateMigration(cmd.Migration, cmd.ResultChan)
case cmdFinishMigration:
err = manager.handleFinishMigration(cmd.MigrationID, cmd.IDList, cmd.PortList, cmd.ErrorChan)
case cmdCancelMigration:
err = manager.handleCancelMigration(cmd.MigrationID, cmd.Error, cmd.ErrorChan)
case cmdBuildFailoverPlan:
err = manager.handleBuildFailoverPlan(cmd.Cell, cmd.ResultChan)
case cmdMigrationInstance:
err = manager.handleMigrateInstance(cmd.Cell, cmd.Target, cmd.IDList, cmd.PortList, cmd.ErrorChan)
case cmdPurgeInstance:
err = manager.handlePurgeInstance(cmd.Cell, cmd.ErrorChan)
case cmdQueryAddressPool:
err = manager.handleQueryAddressPool(cmd.ResultChan)
case cmdGetAddressPool:
err = manager.handleGetAddressPool(cmd.Address, cmd.ResultChan)
case cmdCreateAddressPool:
err = manager.handleCreateAddressPool(cmd.AddressPool, cmd.ErrorChan)
case cmdModifyAddressPool:
err = manager.handleModifyAddressPool(cmd.AddressPool, cmd.ResultChan)
case cmdDeleteAddressPool:
err = manager.handleDeleteAddressPool(cmd.Address, cmd.ErrorChan)
case cmdQueryAddressRange:
err = manager.handleQueryAddressRange(cmd.Address, cmd.Range, cmd.ResultChan)
case cmdGetAddressRange:
err = manager.handleGetAddressRange(cmd.Address, cmd.Range, cmd.Start, cmd.ResultChan)
case cmdAddAddressRange:
err = manager.handleAddAddressRange(cmd.Address, cmd.Range, cmd.AddressRange, cmd.ErrorChan)
case cmdRemoveAddressRange:
err = manager.handleRemoveAddressRange(cmd.Address, cmd.Range, cmd.Start, cmd.ErrorChan)
case cmdBeginResetSystem:
err = manager.handleBeginResetSystem(cmd.InstanceID, cmd.ErrorChan)
case cmdFinishResetSystem:
err = manager.handleFinishResetSystem(cmd.InstanceID, cmd.Error, cmd.ErrorChan)
case cmdStartBatchCreateGuest:
err = manager.handleStartBatchCreateGuest(cmd.BatchCreating, cmd.ResultChan)
case cmdSetBatchCreateGuestStart:
err = manager.handleSetBatchCreateGuestStart(cmd.BatchID, cmd.Name, cmd.InstanceID, cmd.ErrorChan)
case cmdSetBatchCreateGuestFail:
err = manager.handleSetBatchCreateGuestFail(cmd.BatchID, cmd.Name, cmd.Error, cmd.ErrorChan)
case cmdGetBatchCreateGuest:
err = manager.handleGetBatchCreateGuestStatus(cmd.BatchID, cmd.ResultChan)
case cmdStartBatchDeleteGuest:
err = manager.handleStartBatchDeleteGuest(cmd.IDList, cmd.ResultChan)
case cmdSetBatchDeleteGuestSuccess:
err = manager.handleSetBatchDeleteGuestSuccess(cmd.BatchID, cmd.InstanceID, cmd.ErrorChan)
case cmdSetBatchDeleteGuestFail:
err = manager.handleSetBatchDeleteGuestFail(cmd.BatchID, cmd.InstanceID, cmd.Error, cmd.ErrorChan)
case cmdGetBatchDeleteGuest:
err = manager.handleGetBatchDeleteGuestStatus(cmd.BatchID, cmd.ResultChan)
case cmdStartBatchStopGuest:
err = manager.handleStartBatchStopGuest(cmd.IDList, cmd.ResultChan)
case cmdSetBatchStopGuestSuccess:
err = manager.handleSetBatchStopGuestSuccess(cmd.BatchID, cmd.InstanceID, cmd.ErrorChan)
case cmdSetBatchStopGuestFail:
err = manager.handleSetBatchStopGuestFail(cmd.BatchID, cmd.InstanceID, cmd.Error, cmd.ErrorChan)
case cmdGetBatchStopGuest:
err = manager.handleGetBatchStopGuestStatus(cmd.BatchID, cmd.ResultChan)
case cmdQuerySystemTemplates:
err = manager.handleQuerySystemTemplates(cmd.ResultChan)
case cmdGetSystemTemplate:
err = manager.handleGetSystemTemplate(cmd.TemplateID, cmd.ResultChan)
case cmdCreateSystemTemplate:
err = manager.handleCreateSystemTemplate(cmd.TemplateConfig, cmd.ResultChan)
case cmdModifySystemTemplate:
err = manager.handleModifySystemTemplate(cmd.TemplateID, cmd.TemplateConfig, cmd.ErrorChan)
case cmdDeleteSystemTemplate:
err = manager.handleDeleteSystemTemplate(cmd.TemplateID, cmd.ErrorChan)
case cmdQuerySecurityPolicyGroups:
err = manager.handleQuerySecurityPolicyGroups(cmd.PolicyGroupQuery, cmd.ResultChan)
case cmdGetSecurityPolicyGroup:
err = manager.handleGetSecurityPolicyGroup(cmd.Group, cmd.ResultChan)
case cmdCreateSecurityPolicyGroup:
err = manager.handleCreateSecurityPolicyGroup(cmd.PolicyGroup, cmd.ResultChan)
case cmdModifySecurityPolicyGroup:
err = manager.handleModifySecurityPolicyGroup(cmd.Group, cmd.PolicyGroup, cmd.ErrorChan)
case cmdDeleteSecurityPolicyGroup:
err = manager.handleDeleteSecurityPolicyGroup(cmd.Group, cmd.ErrorChan)
case cmdGetSecurityPolicyRules:
err = manager.handleGetSecurityPolicyRules(cmd.Group, cmd.ResultChan)
case cmdAddSecurityPolicyRule:
err = manager.handleAddSecurityPolicyRule(cmd.Group, cmd.PolicyRule, cmd.ErrorChan)
case cmdModifySecurityPolicyRule:
err = manager.handleModifySecurityPolicyRule(cmd.Group, cmd.Index, cmd.PolicyRule, cmd.ErrorChan)
case cmdRemoveSecurityPolicyRule:
err = manager.handleRemoveSecurityPolicyRule(cmd.Group, cmd.Index, cmd.ErrorChan)
case cmdMoveSecurityPolicyRule:
err = manager.handleMoveSecurityPolicyRule(cmd.Group, cmd.Index, cmd.Flag, cmd.ErrorChan)
default:
log.Printf("<resource_manager> unsupported command type %d", cmd.Type)
break
}
if err != nil {
log.Printf("<resource_manager> handle command %s fail: %s", cmd.Type.toString(), err.Error())
}
}
func (manager *ResourceManager) onCellStatusUpdate(report CellStatusReport) {
var name = report.Name
cell, exists := manager.cells[name]
if !exists {
log.Printf("<resource_manager> ignore status update for invalid cell '%s' ", name)
return
}
//update status
cell.ResourceUsage = report.ResourceUsage
cell.LatestUpdate = time.Now()
cell.Alive = true
manager.cells[name] = cell
}
func (manager *ResourceManager) onUpdateSystemStatus() {
const (
LostThreshold = 10 * time.Second
)
//begin := time.Now()
var err error
lostTime := time.Now().Add(-LostThreshold)
manager.zone.PoolStatistic.Reset()
manager.zone.CellStatistic.Reset()
manager.zone.InstanceStatistic.Reset()
manager.zone.ResourceUsage.Reset()
var modifiedCells []ManagedComputeCell
var modifiedPools []ManagedComputePool
for _, pool := range manager.pools {
pool.CellStatistic.Reset()
pool.InstanceStatistic.Reset()
pool.ResourceUsage.Reset()
for cellName, _ := range pool.Cells {
cell, exists := manager.cells[cellName]
if !exists {
log.Printf("<resource_manager> can not update status with invalid cell '%s'", cellName)
continue
}
if cell.Alive && cell.LatestUpdate.Before(lostTime) {
//cell lost
log.Printf("<resource_manager> cell '%s' lost", cellName)
cell.Alive = false
pool.OfflineCells++
modifiedCells = append(modifiedCells, cell)
continue
}
pool.ResourceUsage.Accumulate(cell.ResourceUsage)
if !cell.isInstanceConsistent(){
if err = manager.syncInstanceStatistic(cell.Name); err != nil{
log.Printf("<resource_manager> warning: sync instance statistic on cell '%s' fail: %s", cell.Name, err.Error())
continue
}
log.Printf("<resource_manager> warning: instance statistic on cell '%s' resynced due to Inconsistent", cell.Name)
}
pool.InstanceStatistic.Accumulate(cell.InstanceStatistic)
pool.OnlineCells++
}
modifiedPools = append(modifiedPools, pool)
//update zone status
if pool.Enabled {
manager.zone.EnabledPools++
} else {
manager.zone.DisabledPools++
}
manager.zone.CellStatistic.Accumulate(pool.CellStatistic)
manager.zone.InstanceStatistic.Accumulate(pool.InstanceStatistic)
manager.zone.ResourceUsage.Accumulate(pool.ResourceUsage)
}
for _, cell := range modifiedCells {
manager.cells[cell.Name] = cell
}
for _, pool := range modifiedPools {
manager.pools[pool.Name] = pool
}
//elapsed := time.Now().Sub(begin)/time.Millisecond
//log.Printf("<resource_manager> resource usage accumulated in %d milliseconds", elapsed)
}
func (manager *ResourceManager) handleQueryAllPools(resp chan ResourceResult) error {
var result []ComputePoolInfo
var names []string
for name, _ := range manager.pools{
names = append(names, name)
}
sort.Stable(sort.StringSlice(names))
for _, poolName := range names {
pool, _ := manager.pools[poolName]
var info = ComputePoolInfo{poolName, pool.Enabled, pool.Network, pool.Storage, pool.Failover, uint64(len(pool.Cells))}
result = append(result, info)
}
resp <- ResourceResult{ComputePoolInfoList:result}
return nil
}
func (manager *ResourceManager) handleGetComputePool(poolName string, resp chan ResourceResult) error{
pool, exists := manager.pools[poolName]
if !exists{
err := fmt.Errorf("invalid pool '%s'", poolName)
resp <- ResourceResult{Error:err}
return err
}
resp <- ResourceResult{ComputePoolConfig: pool.ComputePoolInfo}
return nil
}
func (manager *ResourceManager) handleCreatePool(name, storage, addressPool string, failover bool, resp chan error) (err error) {
if _, exists := manager.pools[name]; exists {
err = fmt.Errorf("'%s' alrady exists", name)
resp <- err
return err
}
var newPool = ManagedComputePool{}
newPool.Enabled = true
newPool.Name = name
newPool.Cells = map[string]bool{}
newPool.InstanceNames = map[string]string{}
if "" != storage{
if _, exists := manager.storagePools[storage]; !exists{
err = fmt.Errorf("invalid storage pool '%s'", storage)
resp <- err
return err
}
newPool.Storage = storage
log.Printf("<resource_manager> new compute pool '%s' using storage '%s' created", name, storage)
}else{
if failover{
err = errors.New("using shared storage to enable Failover feature")
resp <- err
return err
}
log.Printf("<resource_manager> new compute pool '%s' using local storage created", name)
}
if "" != addressPool{
if _, exists := manager.addressPools[addressPool]; !exists{
err = fmt.Errorf("invalid address pool '%s'", addressPool)
resp <- err
return err
}
newPool.Network = addressPool
log.Printf("<resource_manager> address pool '%s' bound to '%s'", addressPool, name)
}
newPool.Failover = failover
manager.pools[name] = newPool
resp <- nil
return manager.saveConfig()
}
func (manager *ResourceManager) handleModifyPool(poolName, storage, addressPool string, failover bool, resp chan error) (err error) {
pool, exists := manager.pools[poolName]
if !exists {
err = fmt.Errorf("invalid pool'%s'", poolName)
resp <- err
return err
}
if (pool.Storage == storage) && (pool.Failover == failover) && (pool.Network == addressPool){
err = errors.New("no need to change")
resp <- err
return err
}
var sharedStorage = "" != storage
if pool.Failover != failover{
//change failover
if failover{
//enable
if !sharedStorage{
err = errors.New("using shared storage to enable Failover feature")
resp <- err
return err
}
log.Printf("<resource_manager> failover enabled on pool '%s'", poolName)
}else{
log.Printf("<resource_manager> failover disabled on pool '%s'", poolName)
}
pool.Failover = failover
}
if pool.Storage != storage{
if 0 != len(pool.Cells) {
err = errors.New("must remove all cells before change storage")
resp <- err
return err
}
if sharedStorage{
if _, exists = manager.storagePools[storage]; !exists{
err = fmt.Errorf("invalid storage pool '%s'", storage)
resp <- err
return err
}
log.Printf("<resource_manager> compute pool '%s' change to storage pool '%s'", poolName, storage)
}else if pool.Failover{
err = errors.New("can not using local storage when failover enabled")
resp <- err
return err
}else{
log.Printf("<resource_manager> compute pool '%s' change to local storage", poolName)
}
pool.Storage = storage
}
if addressPool != pool.Network{
if "" != pool.Network{
//check previous addresses
if current, exists := manager.addressPools[pool.Network]; !exists{
err = fmt.Errorf("invalid current address pool '%s'", pool.Network)
resp <- err
return err
}else{
var allocated = 0
for _, addressRange := range current.ranges{
for allocatedAddress, instanceID := range addressRange.allocated{
ins, exists := manager.instances[instanceID]
if !exists{
err = fmt.Errorf("can't find instance '%s' allocated with address '%s' in current pool '%s'",
instanceID, allocatedAddress, pool.Network)
resp <- err
return err
}
if ins.Pool == poolName{
allocated++
}
}
}
if 0 != allocated{
err = fmt.Errorf("%d instance address(es) allocated in current pool '%s', remove or detach all address before change address pool",
allocated, current.name)
resp <- err
return err
}
}
}
if "" != addressPool{
if _, exists := manager.addressPools[addressPool]; !exists{
err = fmt.Errorf("invalid address pool '%s'", addressPool)
resp <- err
return err
}
log.Printf("<resource_manager> address pool of '%s' changed to '%s'", poolName, addressPool)
}else{
log.Printf("<resource_manager> address pool '%s' detached from '%s'", pool.Network, poolName)
}
pool.Network = addressPool
}
manager.pools[poolName] = pool
resp <- nil
return manager.saveConfig()
}
func (manager *ResourceManager) handleDeletePool(name string, resp chan error) error {
pool, exists := manager.pools[name]
if !exists {
err := fmt.Errorf("invalid compute pool '%s'", name)
resp <- err
return err
}
if 0 != len(pool.Cells) {
err := errors.New("must remove all cells before delete")
resp <- err
return err
}
delete(manager.pools, name)
log.Printf("<resource_manager> compute pool '%s' deleted", name)
resp <- nil
return manager.saveConfig()
}
//storage pools
func (manager *ResourceManager) handleCreateStoragePool(name, storageType, host, target string, respChan chan error) (err error){
if _, exists := manager.storagePools[name]; exists{
err = fmt.Errorf("storage pool '%s' already exists", name)
respChan <- err
return err
}
switch storageType {
case StorageTypeNFS:
break
default:
err = fmt.Errorf("invalid storage type '%s'", storageType)
respChan <- err
return err
}
var newStorage = StoragePoolInfo{name, storageType, host, target}
manager.storagePools[name] = newStorage
log.Printf("<resource_manager> new storage pool '%s' created for %s://%s/%s",
name, storageType, host, target)
respChan <- nil
return manager.saveConfig()
}
func (manager *ResourceManager) handleModifyStoragePool(name, storageType, host, target string, respChan chan error) (err error){
currentStorage, exists := manager.storagePools[name]
if !exists{
err = fmt.Errorf("invalid storage pool '%s'", name)
respChan <- err
return err
}
//check attached compute pool
for poolName, pool := range manager.pools{
if pool.Storage == name{
err = fmt.Errorf("compute pool '%s' still attached to storage '%s'", poolName, name)
respChan <- err
return err
}
}
switch storageType {
case StorageTypeNFS:
break
default:
err = fmt.Errorf("invalid storage type '%s'", storageType)
respChan <- err
return err
}
var isEqual = func(source, target StoragePoolInfo) bool{
if source.Type != target.Type{
return false
}
if source.Host != target.Host{
return false
}
if source.Target != target.Target{
return false
}
return true
}
var newStorage = StoragePoolInfo{name, storageType, host, target}
if isEqual(currentStorage, newStorage){
err = errors.New("no need to change")
respChan <- err
return err
}
manager.storagePools[name] = newStorage
log.Printf("<resource_manager> storage pool '%s' changed to %s://%s/%s",
name, storageType, host, target)
respChan <- nil
return manager.saveConfig()
}
func (manager *ResourceManager) handleDeleteStoragePool(name string, respChan chan error) (err error){
if _, exists := manager.storagePools[name]; !exists{
err = fmt.Errorf("invalid storage pool '%s'", name)
respChan <- err
return err
}
//check attached compute pool
for poolName, pool := range manager.pools{
if pool.Storage == name{
err = fmt.Errorf("compute pool '%s' still attached to storage '%s'", poolName, name)
respChan <- err
return err
}
}
delete(manager.storagePools, name)
log.Printf("<resource_manager> storage pool '%s' deleted", name)
respChan <- nil
return manager.saveConfig()
}
func (manager *ResourceManager) handleGetStoragePool(name string, respChan chan ResourceResult) (err error){
pool, exists := manager.storagePools[name]
if !exists{
err = fmt.Errorf("invalid storage pool '%s'", name)
respChan <- ResourceResult{Error:err}
return err
}
respChan <- ResourceResult{StoragePool: pool}
return nil
}
func (manager *ResourceManager) handleQueryStoragePool(respChan chan ResourceResult) (err error){
var result []StoragePoolInfo
var keys = make([]string, len(manager.storagePools))
var keyIndex = 0
for name, _ := range manager.storagePools{
keys[keyIndex] = name
keyIndex++
}
sort.Stable(sort.StringSlice(keys))
for _, poolName := range keys{
storage, exists := manager.storagePools[poolName]
if !exists{
err = fmt.Errorf("invalid storage pool '%s'", poolName)
respChan <- ResourceResult{Error:err}
return err
}
result = append(result, storage)
}
respChan <- ResourceResult{StoragePoolList: result}
return nil
}
func (manager *ResourceManager) handleQueryCellsInPool(poolName string, resp chan ResourceResult) error{
pool, exists := manager.pools[poolName]
if !exists {
err := fmt.Errorf("invalid compute pool '%s'", poolName)
resp <- ResourceResult{Error:err}
return err
}
var names []string
for name, _ := range pool.Cells{
names = append(names, name)
}
sort.Stable(sort.StringSlice(names))
var cells []ComputeCellInfo
for _, cellName:= range names{
if cell, exists := manager.cells[cellName];!exists{
err := fmt.Errorf("invalid compute cell '%s'", cellName)
resp <- ResourceResult{Error:err}
return err
}else {
var info = ComputeCellInfo{cell.Name, cell.Address, cell.Enabled, cell.Alive, cell.PurgeAppending}
cells = append(cells, info)
}
}
resp <- ResourceResult{ComputeCellInfoList:cells}
return nil
}
func (manager *ResourceManager) handleAddCell(poolName, cellName string, resp chan error) error {
pool, exists := manager.pools[poolName]
if !exists {
err := fmt.Errorf("invalid compute pool '%s'", poolName)
resp <- err
return err
}
if _, exists := manager.unallocatedCells[cellName]; !exists {
err := fmt.Errorf("cell '%s' already allocated", cellName)
resp <- err
return err
}
cell, exists := manager.cells[cellName]
if !exists {
err := fmt.Errorf("invalid compute cell '%s'", cellName)
resp <- err
return err
}
if cell.Pool != "" {
err := fmt.Errorf("cell '%s' already in pool '%s'", cellName, cell.Pool)
resp <- err
return err
}
delete(manager.unallocatedCells, cellName)
cell.Pool = poolName
pool.Cells[cellName] = true
pool.CellCount = uint64(len(pool.Cells))
manager.cells[cellName] = cell
manager.pools[poolName] = pool
log.Printf("<resource_manager> new cell '%s' added into pool '%s'", cellName, poolName)
resp <- nil
return manager.saveConfig()
}
func (manager *ResourceManager) handleRemoveCell(poolName, cellName string, resp chan error) error {
pool, exists := manager.pools[poolName]
if !exists {
err := fmt.Errorf("invalid compute pool '%s'", poolName)
resp <- err
return err
}
cell, exists := manager.cells[cellName]
if !exists {
err := fmt.Errorf("invalid compute cell '%s'", cellName)
resp <- err
return err
}
if cell.Pool != poolName {
err := fmt.Errorf("cell '%s' not in pool '%s'", cellName, cell.Pool)
resp <- err
return err
}
var left = len(cell.Instances) + len(cell.Pending)
if 0 != left{
err := fmt.Errorf("%d instance(s) left in cell '%s', migrate or delete all instance(s) before remove cell", left, cellName,)
resp <- err
return err
}
delete(pool.Cells, cellName)
cell.Pool = ""
manager.unallocatedCells[cellName] = true
pool.CellCount = uint64(len(pool.Cells))
//update
manager.cells[cellName] = cell
manager.pools[poolName] = pool
log.Printf("<resource_manager> cell '%s' removed from pool '%s'", cellName, poolName)
resp <- nil
return manager.saveConfig()
}
func (manager *ResourceManager) handleEnableCell(poolName, cellName string, respChan chan error) (err error){
_, exists := manager.pools[poolName]
if !exists {
err = fmt.Errorf("invalid compute pool '%s'", poolName)
respChan <- err
return err
}
cell, exists := manager.cells[cellName]
if !exists {
err = fmt.Errorf("invalid compute cell '%s'", cellName)
respChan <- err
return err
}
if cell.Pool != poolName {
err = fmt.Errorf("cell '%s' not in pool '%s'", cellName, cell.Pool)
respChan <- err
return err
}
if cell.Enabled{
err = fmt.Errorf("cell '%s' already enabled", cellName)
respChan <- err
return err
}
if cell.PurgeAppending{
log.Printf("<resource_manager> warning: appending purge canceled when cell '%s' in pool '%s' enabled", cellName, poolName)
}else{
log.Printf("<resource_manager> cell '%s' in pool '%s' enabled", cellName, poolName)
}
cell.PurgeAppending = false
cell.Enabled = true
manager.cells[cellName] = cell
respChan <- nil
return manager.saveConfig()
}
func (manager *ResourceManager) handleDisableCell(poolName, cellName string, purge bool, respChan chan error) (err error){
_, exists := manager.pools[poolName]
if !exists {
err = fmt.Errorf("invalid compute pool '%s'", poolName)
respChan <- err
return err
}
cell, exists := manager.cells[cellName]
if !exists {
err = fmt.Errorf("invalid compute cell '%s'", cellName)
respChan <- err
return err
}
if cell.Pool != poolName {
err = fmt.Errorf("cell '%s' not in pool '%s'", cellName, cell.Pool)
respChan <- err
return err
}
if !cell.Enabled{
err = fmt.Errorf("cell '%s' already disabled", cellName)
respChan <- err
return err
}
if purge{
log.Printf("<resource_manager> cell '%s' in pool '%s' disabled with purge appending", cellName, poolName)
}else{
log.Printf("<resource_manager> cell '%s' in pool '%s' disabled", cellName, poolName)
}
cell.PurgeAppending = purge
cell.Enabled = false
manager.cells[cellName] = cell
respChan <- nil
return manager.saveConfig()
}
func (manager *ResourceManager) handleFinishPurgeCell(cellName string, respChan chan error) (err error){
cell, exists := manager.cells[cellName]
if !exists {
err = fmt.Errorf("invalid compute cell '%s'", cellName)
respChan <- err
return err
}
if !cell.PurgeAppending{
err = fmt.Errorf("cell '%s' doesn't have purge appending", cellName)
respChan <- err
return err
}
cell.PurgeAppending = false
manager.cells[cellName] = cell
log.Printf("<resource_manager> appending purge in cell '%s' canceled ", cellName)
respChan <- nil
return manager.saveConfig()
}
func (manager *ResourceManager) handleGetUnallocatedCells(resp chan ResourceResult) error {
var cells []ComputeCellInfo
for name, _ := range manager.unallocatedCells {
if cell, exists := manager.cells[name];!exists{
err := fmt.Errorf("invalid cell '%s'", name)
resp <- ResourceResult{Error:err}
return err
}else{
var info = ComputeCellInfo{cell.Name, cell.Address, cell.Enabled, cell.Alive, cell.PurgeAppending}
cells = append(cells, info)
}
}
resp <- ResourceResult{ComputeCellInfoList:cells}
return nil
}
func (manager *ResourceManager) handleQueryZoneStatus(resp chan ResourceResult) error {
var s = ZoneStatus{Name:manager.zone.Name,
PoolStatistic: manager.zone.PoolStatistic, CellStatistic: manager.zone.CellStatistic,
InstanceStatistic: manager.zone.InstanceStatistic, ResourceUsage: manager.zone.ResourceUsage,
StartTime: manager.startTime}
resp <- ResourceResult{Zone: s}
return nil
}
func (manager *ResourceManager) handleQueryComputePoolStatus(resp chan ResourceResult) error {
var pools []ComputePoolStatus
var names []string
for name, _ := range manager.pools{
names = append(names, name)
}
sort.Stable(sort.StringSlice(names))
for _, poolName := range names {
pool, _ := manager.pools[poolName]
var s = ComputePoolStatus{
Name:pool.Name, Enabled:pool.Enabled,
CellStatistic:pool.CellStatistic, InstanceStatistic: pool.InstanceStatistic, ResourceUsage: pool.ResourceUsage}
pools = append(pools, s)
}
resp <- ResourceResult{ComputePoolList: pools}
return nil
}
func (manager *ResourceManager) handleGetComputePoolStatus(name string, resp chan ResourceResult) error {
pool, exists := manager.pools[name]
if !exists {
err := fmt.Errorf("invalid pool '%s'", name)
resp <- ResourceResult{Error: err}
return err
}
var s = ComputePoolStatus{
Name:pool.Name, Enabled:pool.Enabled,
CellStatistic:pool.CellStatistic, InstanceStatistic: pool.InstanceStatistic, ResourceUsage: pool.ResourceUsage}
resp <- ResourceResult{ComputePool: s}
return nil
}
func (manager *ResourceManager) handleQueryComputeCellStatus(poolName string, resp chan ResourceResult) error {
pool, exists := manager.pools[poolName]
if !exists{
err := fmt.Errorf("invalid pool '%s'", poolName)
resp <- ResourceResult{Error:err}
return err
}
var result []ComputeCellStatus
var names []string
for name, _ := range pool.Cells{
names = append(names, name)
}
sort.Stable(sort.StringSlice(names))
for _, cellName:= range names{
cell, exists := manager.cells[cellName]
if !exists {
err := fmt.Errorf("invalid cell '%s' in pool %s", cellName, poolName)
resp <- ResourceResult{Error: err}
return err
}
var s = ComputeCellStatus{ComputeCellInfo:cell.ComputeCellInfo,
InstanceStatistic: cell.InstanceStatistic, ResourceUsage: cell.ResourceUsage}
result = append(result, s)
}
resp <- ResourceResult{ComputeCellList: result}
return nil
}
func (manager *ResourceManager) handleGetComputeCellStatus(poolName, cellName string, resp chan ResourceResult) error{
cell, exists := manager.cells[cellName]
if !exists {
err := fmt.Errorf("invalid cell '%s'", cellName)
resp <- ResourceResult{Error:err}
return err
}
if cell.Pool != poolName{
err := fmt.Errorf("cell '%s' not in pool '%s'", cellName, poolName)
resp <- ResourceResult{Error:err}
return err
}
var s = ComputeCellStatus{ComputeCellInfo:cell.ComputeCellInfo, InstanceStatistic: cell.InstanceStatistic, ResourceUsage: cell.ResourceUsage}
resp <- ResourceResult{ComputeCell: s}
return nil
}
func (manager *ResourceManager) handleUpdateCellInfo(cellName, cellAddress string, respChan chan error) (err error){
cell, exists := manager.cells[cellName]
if !exists{
var cellStatus = ManagedComputeCell{}
cellStatus.LatestUpdate = time.Now()
cellStatus.Name = cellName
cellStatus.Address = cellAddress
cellStatus.Enabled = true
cellStatus.Alive = true
cellStatus.Instances = map[string]bool{}
cellStatus.Pending = map[string]bool{}
manager.unallocatedCells[cellName] = true
manager.cells[cellName] = cellStatus
log.Printf("<resource_manager> new unallocated cell '%s' (address %s) available", cellName, cellAddress)
}else{
cell.Alive = true
cell.LatestUpdate = time.Now()
if cell.Address != cellAddress{
log.Printf("<resource_manager> cell '%s' address changed to %s", cellName, cellAddress)
cell.Address = cellAddress
}
manager.cells[cellName] = cell
}
respChan <- nil
return nil
}
func (manager *ResourceManager) handleGetCellStatus(cellName string, respChan chan ResourceResult) error {
cell, exists := manager.cells[cellName]
if !exists {
err := fmt.Errorf("invalid cell '%s'", cellName)
respChan <- ResourceResult{Error: err}
return err
}
var status = ComputeCellStatus{cell.ComputeCellInfo, cell.InstanceStatistic, cell.ResourceUsage}
respChan <- ResourceResult{Pool: cell.Pool, ComputeCell: status}
return nil
}
func (manager *ResourceManager) handleSetCellStopped(cellName string, respChan chan error) (err error){
cell, exists := manager.cells[cellName]
if !exists {
err = fmt.Errorf("invalid cell '%s'", cellName)
respChan <- err
return err
}
cell.Alive = false
log.Printf("<resource_manager> remote cell '%s' stopped", cellName)
if "" != cell.Pool{
//update resource statistic
cell.LostInstances = 0
for instanceID, _ := range cell.Instances{
if ins, exists := manager.instances[instanceID];exists{
if ins.Running{
cell.RunningInstances--
}else{
cell.StoppedInstances--
}
cell.LostInstances++
ins.Lost = true
manager.instances[instanceID] = ins
}else{
err = fmt.Errorf("invalid instance '%s' in cell '%s'", instanceID, cellName)
respChan <- err
return err
}
}
}
manager.cells[cellName] = cell
respChan <- nil
return nil
}
func (manager *ResourceManager) handleQueryGuestsByCondition(condition GuestQueryCondition, respChan chan ResourceResult) (err error) {
var idList []string
{
//build filter list
if condition.InCell {
if cell, exists := manager.cells[condition.Cell]; !exists {
err := fmt.Errorf("invalid cell '%s'", condition.Cell)
respChan <- ResourceResult{Error:err}
return err
} else {
for id, _ := range cell.Instances {
idList = append(idList, id)
}
}
} else if pool, exists := manager.pools[condition.Pool]; !exists {
err := fmt.Errorf("invalid pool '%s'", condition.Pool)
respChan <- ResourceResult{Error:err}
return err
} else {
for cellName, _ := range pool.Cells {
if cell, exists := manager.cells[cellName]; !exists {
err := fmt.Errorf("invalid cell '%s' in pool '%s'", condition.Cell, condition.Pool)
respChan <- ResourceResult{Error:err}
return err
} else {
for id, _ := range cell.Instances {
idList = append(idList, id)
}
}
}
}
}
var names []string
var nameToID = map[string]string{}
{
//filter by property
for _, id := range idList {
if instance, exists := manager.instances[id]; !exists {
err := fmt.Errorf("invalid instance '%s'", id)
respChan <- ResourceResult{Error:err}
return err
} else {
if !(condition.WithOwner && instance.User == condition.Owner) && !(condition.WithGroup && instance.Group == condition.Group) {
continue
}
if condition.WithStatus && (instance.Running != (condition.Status == InstanceStatusRunning)) {
continue
}
if condition.WithCreateFlag && (instance.Created != condition.Created) {
continue
}
if _, exists := nameToID[instance.Name];exists{
err = fmt.Errorf("encounter duplicate instance name '%s'", instance.Name)
respChan <- ResourceResult{Error:err}
return
}
names = append(names, instance.Name)
nameToID[instance.Name] = id
}
}
}
sort.Stable(sort.StringSlice(names))
var result []InstanceStatus
for _, name := range names{
id, exists := nameToID[name]
if !exists{
err = fmt.Errorf("no instance mapped with name '%s'", name)
respChan <- ResourceResult{Error:err}
return err
}
ins, exists := manager.instances[id]
if !exists{
err = fmt.Errorf("invalid instance '%s' with name '%s'", id, name)
respChan <- ResourceResult{Error:err}
return err
}
result = append(result, ins)
}
respChan <- ResourceResult{InstanceList: result}
return nil
}
func (manager *ResourceManager) handleBatchUpdateInstanceStatus(poolName, cellName string, instances []InstanceStatus, respChan chan error) error {
pool, exists := manager.pools[poolName]
if !exists{
err := fmt.Errorf("invalid pool '%s'", poolName)
respChan <- err
return err
}
cell, exists := manager.cells[cellName]
if !exists {
err := fmt.Errorf("invalid cell '%s'", cellName)
respChan <- err
return err
}
if cell.Pool != poolName {
err := fmt.Errorf("cell '%s' not belong to '%s'", cellName, poolName)
respChan <- err
return err
}
if 0 != len(cell.Instances) {
for id, _ := range cell.Instances {
log.Printf("<resource_manager> clear expired instance status, id '%s'", id)
if ins, exists := manager.instances[id];exists{
delete(pool.InstanceNames, ins.Name)
}
delete(manager.instances, id)
}
}
cell.Instances = map[string]bool{}
cell.Pending = map[string]bool{}
cell.InstanceStatistic.Reset()
for _, config := range instances {
config.InternalNetwork.MonitorAddress = cell.Address
config.Host = cell.Address
manager.instances[config.ID] = config
cell.Instances[config.ID] = true
//todo: migrating
if config.Running{
cell.RunningInstances++
}else{
cell.StoppedInstances++
}
pool.InstanceNames[config.Name] = config.ID
}
manager.cells[cellName] = cell
manager.pools[poolName] = pool
log.Printf("<resource_manager> %d instance updated in cell '%s' ", len(cell.Instances), cellName)
respChan <- nil
return nil
}
func (manager *ResourceManager) handleAllocateInstance(poolName string, config InstanceStatus, respChan chan ResourceResult) (err error) {
pool, exists := manager.pools[poolName]
if !exists{
err = fmt.Errorf("invalid compute pool '%s'", poolName)
respChan <- ResourceResult{Error: err}
return err
}
if _, exists = pool.InstanceNames[config.Name];exists{
err = fmt.Errorf("instance '%s' already exists in pool '%s'", config.Name, poolName)
respChan <- ResourceResult{Error: err}
return err
}
var newID = uuid.NewV4()
config.ID = newID.String()
cellName, err := manager.selectCell(poolName, config.InstanceResource, true)
if err != nil {
log.Printf("<resource_manager> select cell fail: %s", err.Error())
respChan <- ResourceResult{Error: err}
return err
}
config.Cell = cellName
config.Pool = poolName
cell, exists := manager.cells[cellName]
if !exists {
err = fmt.Errorf("invalid cell '%s'", cellName)
respChan <- ResourceResult{Error: err}
return err
}
if "" != pool.Network{
//select address
internal, external, err := manager.allocateNetworkAddress(pool, config.ID)
if err != nil{
respChan <- ResourceResult{Error: err}
return err
}
if "" != external{
config.ExternalNetwork.AssignedAddress = external
log.Printf("<resource_manager> address '%s/%s' assigned for instance '%s'", internal, external, config.Name)
}else{
log.Printf("<resource_manager> internal address '%s' assigned for instance '%s'", internal, config.Name)
}
config.InternalNetwork.AssignedAddress = internal
}
config.InternalNetwork.MonitorAddress = cell.Address
config.Host = cell.Address
cell.Pending[config.ID] = true
pool.InstanceNames[config.Name] = config.ID
manager.instances[config.ID] = config
manager.cells[cellName] = cell
manager.pools[poolName] = pool
respChan <- ResourceResult{Instance: config}
log.Printf("<resource_manager> allocate cell '%s' for instance '%s'(%s)", cellName, config.Name, config.ID)
return nil
}
//running/create/progress/media only
func (manager *ResourceManager) handleUpdateInstanceStatus(status InstanceStatus, respChan chan error) error {
ins, exists := manager.instances[status.ID]
if !exists {
err := fmt.Errorf("invalid instance '%s'", status.ID)
respChan <- err
return err
}
if ins.Running != status.Running {
cell, exists := manager.cells[ins.Cell]
if !exists{
err := fmt.Errorf("invalid cell '%s' for instance '%s'", ins.Cell, status.ID)
respChan <- err
return err
}
if ins.Running{
//running => stopped
cell.StoppedInstances++
cell.RunningInstances--
}else{
//stopped => running
cell.StoppedInstances--
cell.RunningInstances++
}
ins.Running = status.Running
manager.cells[ins.Cell] = cell
}
if !ins.Created {
ins.Progress = status.Progress
ins.Created = status.Created
}
if ins.Cores != status.Cores{
ins.Cores = status.Cores
}
if ins.Memory != status.Memory{
ins.Memory = status.Memory
}
if len(ins.Disks) == len(status.Disks){
for index := 0; index < len(ins.Disks); index++{
if ins.Disks[index] != status.Disks[index]{
ins.Disks[index] = status.Disks[index]
}
}
}
if ins.MediaAttached != status.MediaAttached{
ins.MediaAttached = status.MediaAttached
ins.MediaSource = status.MediaSource
}
manager.instances[status.ID] = ins
respChan <- nil
return nil
}
func (manager *ResourceManager) handleConfirmInstance(id string, monitorPort uint, monitorSecret, ethernetAddress string, respChan chan error) error {
status, exists := manager.instances[id]
if !exists {
err := fmt.Errorf("invalid instance '%s'", id)
respChan <- err
return err
}
cell, exists := manager.cells[status.Cell]
if !exists {
err := fmt.Errorf("invalid cell '%s'", status.Cell)
respChan <- err
return err
}
if _, exists := cell.Pending[id]; !exists {
err := fmt.Errorf("instance '%s' already confirmed", id)
respChan <- err
return err
}
delete(cell.Pending, id)
cell.Instances[id] = true
cell.StoppedInstances++
status.InternalNetwork.MonitorPort = monitorPort
status.MonitorSecret = monitorSecret
status.HardwareAddress = ethernetAddress
status.Created = true
status.Progress = 0
status.CreateTime = time.Now().Format(TimeFormatLayout)
//update
manager.instances[id] = status
manager.cells[status.Cell] = cell
log.Printf("<resource_manager> instance '%s' confirmed, monitor port %d", id, status.InternalNetwork.MonitorPort)
respChan <- nil
return nil
}
func (manager *ResourceManager) handleDeallocateInstance(id string, err error, respChan chan error) error {
ins, exists := manager.instances[id]
if !exists {
err := fmt.Errorf("invalid instance '%s'", id)
respChan <- err
return err
}
if ins.Cell == "" {
err := fmt.Errorf("instance '%s' not allocated", id)
respChan <- err
return err
}
var cellName = ins.Cell
cell, exists := manager.cells[cellName]
if !exists {
err := fmt.Errorf("invalid cell '%s'", cellName)
respChan <- err
return err
}
if _, exists = cell.Pending[id]; exists {
delete(cell.Pending, id)
log.Printf("<resource_manager> pending instance '%s' deallocated in cell '%s'", id, cellName)
} else {
delete(cell.Instances, id)
log.Printf("<resource_manager> instance '%s' deallocated in cell '%s'", id, cellName)
}
if ins.Running{
cell.RunningInstances--
}else{
cell.StoppedInstances--
}
if pool, exists := manager.pools[ins.Pool];exists{
if "" != pool.Network{
manager.deallocateNetworkAddress(pool, ins.InternalNetwork.AssignedAddress, ins.ExternalNetwork.AssignedAddress)
}
delete(pool.InstanceNames, ins.Name)
manager.pools[ins.Pool] = pool
}
//update instance statistic
manager.cells[cellName] = cell
delete(manager.instances, id)
if err != nil{
manager.pendingError[id] = err
}
respChan <- nil
return nil
}
func (manager *ResourceManager) handleGetInstanceStatus(id string, respChan chan ResourceResult) (err error) {
var exists bool
var status InstanceStatus
if err, exists = manager.pendingError[id]; exists{
//fetch pending error
delete(manager.pendingError, id)
respChan <- ResourceResult{Error: err}
log.Printf("<resource_manager> pending error of instance '%s' fetched", id)
return nil
}else if status, exists = manager.instances[id]; exists{
respChan <- ResourceResult{Instance: status}
return nil
}else{
err = fmt.Errorf("invalid instance '%s'", id)
respChan <- ResourceResult{Error: err}
return err
}
}
func (manager *ResourceManager) handleQueryInstanceStatusInPool(poolName string, respChan chan ResourceResult) (err error){
pool, exists := manager.pools[poolName]
if !exists{
err = fmt.Errorf("invalid pool '%s'", poolName)
respChan <- ResourceResult{Error:err}
return err
}
var idList []string
for cellName, _ := range pool.Cells{
if cell, exists := manager.cells[cellName]; !exists{
err = fmt.Errorf("invalid cell '%s'", cellName)
respChan <- ResourceResult{Error:err}
return err
}else{
for instanceID, _ := range cell.Instances{
idList = append(idList, instanceID)
}
for instanceID, _ := range cell.Pending{
idList = append(idList, instanceID)
}
}
}
result, err := manager.getSortedInstances(idList)
if err != nil{
respChan <- ResourceResult{Error:err}
return err
}
respChan <- ResourceResult{InstanceList: result}
return nil
}
func (manager *ResourceManager) handleQueryInstanceStatusInCell(poolName, cellName string, respChan chan ResourceResult) (err error){
pool, exists := manager.pools[poolName]
if !exists{