Browse Source

sync disk/media images

dev
akumas 11 months ago
parent
commit
30e43ad111
  1. 3
      CHANGELOG.md
  2. 2
      core_service.go
  3. 16
      core_transaction_manager.go
  4. 186
      src/imageserver/image_manager.go
  5. 2
      src/imageserver/image_service.go
  6. 44
      src/imageserver/sync_disk_images.go
  7. 44
      src/imageserver/sync_media_images.go
  8. 17
      src/imageserver/task_manager.go
  9. 63
      src/modules/api_module.go
  10. 64
      src/task/sync_disk_images.go
  11. 63
      src/task/sync_media_images.go

3
CHANGELOG.md

@ -1,12 +1,13 @@
# Change Log
## [1.3.0] - 2020-11-06
## [1.3.0] - 2020-11-07
### Added
- Manage security policy of instance
- Manage security policy group
- Allow address pool bind IP using Cloud-Init
- Sync disk/media images from local path
### Changed

2
core_service.go

@ -129,12 +129,14 @@ func (core *CoreService)OnMessageReceived(msg framework.Message){
case framework.CreateDiskImageRequest:
case framework.DeleteDiskImageRequest:
case framework.ModifyDiskImageRequest:
case framework.SynchronizeDiskImageRequest:
case framework.QueryMediaImageRequest:
case framework.GetMediaImageRequest:
case framework.CreateMediaImageRequest:
case framework.DeleteMediaImageRequest:
case framework.ModifyMediaImageRequest:
case framework.SynchronizeMediaImageRequest:
case framework.QuerySnapshotRequest:
case framework.GetSnapshotRequest:

16
core_transaction_manager.go

@ -599,5 +599,21 @@ func CreateTransactionManager(sender framework.MessageSender, resourceModule mod
err = fmt.Errorf("register delete security policy group fail: %s", err.Error())
return
}
if err = manager.RegisterExecutor(framework.SynchronizeMediaImageRequest,
&task.SyncMediaImagesExecutor{
Sender: sender,
ResourceModule: resourceModule,
}); err != nil{
err = fmt.Errorf("register sync media images fail: %s", err.Error())
return
}
if err = manager.RegisterExecutor(framework.SynchronizeDiskImageRequest,
&task.SyncDiskImagesExecutor{
Sender: sender,
ResourceModule: resourceModule,
}); err != nil{
err = fmt.Errorf("register sync disk images fail: %s", err.Error())
return
}
return manager, nil
}

186
src/imageserver/image_manager.go

@ -1,16 +1,17 @@
package imageserver
import (
"encoding/json"
"fmt"
"github.com/project-nano/framework"
"log"
"github.com/satori/go.uuid"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"encoding/json"
"io/ioutil"
"time"
"sort"
"strings"
"time"
)
type MediaConfig struct {
@ -82,7 +83,7 @@ const (
cmdUnlockMediaImage
cmdGetMediaImageFile
cmdGetMediaImage
cmdSyncMediaImages
cmdQueryDiskImage
cmdCreateDiskImage
cmdModifyDiskImage
@ -93,6 +94,7 @@ const (
cmdGetDiskImage
cmdGetDiskImageFile
cmdUpdateDiskImageProgress
cmdSyncDiskImages
)
type ImageResult struct {
@ -120,7 +122,11 @@ type ImageManager struct {
}
const (
TimeFormatLayout = "2006-01-02 15:04:05"
TimeFormatLayout = "2006-01-02 15:04:05"
FormatExtQCOW2 = "qcow2"
FormatExtISO = "iso"
DefaultDiskFormat = FormatExtQCOW2
DefaultMediaFormat = FormatExtISO
)
func CreateImageManager(dataPath string) (manager *ImageManager, err error){
@ -264,7 +270,6 @@ func (manager *ImageManager) handleCommand(cmd imageCommand){
err = manager.handleGetMediaImage(cmd.ID, cmd.ResultChan)
case cmdModifyMediaImage:
err = manager.handleModifyMediaImage(cmd.ID, cmd.MediaImageConfig, cmd.ErrorChan)
case cmdQueryDiskImage:
err = manager.handleQueryDiskImage(cmd.User, cmd.Group, cmd.Tags, cmd.ResultChan)
case cmdCreateDiskImage:
@ -285,7 +290,10 @@ func (manager *ImageManager) handleCommand(cmd imageCommand){
err = manager.handleGetDiskImageFile(cmd.ID, cmd.ResultChan)
case cmdUpdateDiskImageProgress:
err = manager.handleUpdateDiskImageProgress(cmd.ID, cmd.Progress, cmd.ErrorChan)
case cmdSyncMediaImages:
err = manager.handleSyncMediaImages(cmd.User, cmd.Group, cmd.ErrorChan)
case cmdSyncDiskImages:
err = manager.handleSyncDiskImages(cmd.User, cmd.Group, cmd.ErrorChan)
default:
log.Printf("<image> unsupported command type %d", cmd.Type)
break
@ -384,11 +392,18 @@ func (manager * ImageManager) GetDiskImageFile(id string, respChan chan ImageRes
manager.commands <- cmd
}
func (manager * ImageManager) UpdateDiskImageProgress(id string, progress uint, respChan chan error){
manager.commands <- imageCommand{Type: cmdUpdateDiskImageProgress, ID:id, Progress: progress, ErrorChan: respChan}
}
func (manager * ImageManager) SyncMediaImages(owner, group string, respChan chan error){
manager.commands <- imageCommand{Type: cmdSyncMediaImages, User: owner, Group: group, ErrorChan: respChan}
}
func (manager * ImageManager) SyncDiskImages(owner, group string, respChan chan error){
manager.commands <- imageCommand{Type: cmdSyncDiskImages, User: owner, Group: group, ErrorChan: respChan}
}
func (manager *ImageManager) handleQueryMediaImage(owner, group string, respChan chan ImageResult) (err error){
var result []MediaStatus
var names []string
@ -428,9 +443,6 @@ func (manager *ImageManager) handleQueryMediaImage(owner, group string, respChan
}
func (manager *ImageManager) handleCreateMediaImage(config MediaConfig, respChan chan ImageResult) (err error){
const (
MediaImageFormat = "iso"
)
var nameWithGroup = fmt.Sprintf("%s.%s", config.Group, config.Name)
if _, exists := manager.mediaImageNames[nameWithGroup]; exists{
err = fmt.Errorf("media image '%s' already exists in group '%s'", config.Name, config.Group)
@ -444,7 +456,7 @@ func (manager *ImageManager) handleCreateMediaImage(config MediaConfig, respChan
image.Size = 0
image.Version = 0
image.Locked = false
image.Format = MediaImageFormat
image.Format = DefaultMediaFormat
image.CreateTime = time.Now().Format(TimeFormatLayout)
manager.mediaImages[image.ID] = image
manager.mediaImageNames[nameWithGroup] = true
@ -686,9 +698,6 @@ func (manager *ImageManager) handleQueryDiskImage(owner, group string, tags []st
}
func (manager *ImageManager) handleCreateDiskImage(config DiskConfig, respChan chan ImageResult) (err error){
const (
DiskImageFormat = "qcow2"
)
var nameWithGroup = fmt.Sprintf("%s.%s", config.Group, config.Name)
if _, exists := manager.diskImageNames[nameWithGroup]; exists{
err = fmt.Errorf("disk image '%s' already exists in group '%s'", config.Name, config.Group)
@ -706,7 +715,7 @@ func (manager *ImageManager) handleCreateDiskImage(config DiskConfig, respChan c
image.Created = false
image.Progress = 0
//todo: more format support
image.Format = DiskImageFormat
image.Format = DefaultDiskFormat
image.CreateTime = time.Now().Format(TimeFormatLayout)
manager.diskImages[image.ID] = image
manager.diskImageNames[nameWithGroup] = true
@ -920,3 +929,144 @@ func (manager * ImageManager) handleGetDiskImageFile(id string, respChan chan Im
return nil
}
func (manager * ImageManager) handleSyncMediaImages(owner, group string, respChan chan error) (err error){
var existsKeys = map[string]bool{}
for id, _ := range manager.mediaImages{
existsKeys[id] = true
}
var filenames []string
if filenames, err = findAbsentFile(manager.mediaPath, DefaultMediaFormat, existsKeys); err != nil{
err = fmt.Errorf("find absent media images fail: %s", err.Error())
respChan <- err
return
}
if 0 == len(filenames){
respChan <- nil
log.Println("<image> all media images synchronized, no absent file discovered")
return
}
for _, filename := range filenames{
var now = time.Now()
var timestamp = now.Format(TimeFormatLayout)
var image MediaStatus
image.Owner = owner
image.Group = group
image.Name = fmt.Sprintf("%s_%d", filename, now.Second())
image.Description = fmt.Sprintf("generated by synchronize media images on %s", timestamp)
image.ID = uuid.NewV4().String()
image.Version = 1
image.Format = DefaultMediaFormat
image.Locked = false
image.CreateTime = timestamp
image.ModifyTime = timestamp
image.Path = filepath.Join(manager.mediaPath, fmt.Sprintf("%s_v%d.%s", image.ID, image.Version, image.Format))
var info os.FileInfo
var sourceFile = filepath.Join(manager.mediaPath, fmt.Sprintf("%s.%s", filename, DefaultMediaFormat))
if info, err = os.Stat(sourceFile); err != nil{
err = fmt.Errorf("check source media file '%s' fail: %s", sourceFile, err.Error())
respChan <- err
return
}
image.Size = uint(info.Size())
if err = os.Rename(sourceFile, image.Path); err != nil{
err = fmt.Errorf("rename '%s' to '%s' fail: %s", sourceFile, image.Path, err.Error())
respChan <- err
return
}
var nameWithGroup = fmt.Sprintf("%s.%s", group, image.Name)
manager.mediaImages[image.ID] = image
manager.mediaImageNames[nameWithGroup] = true
log.Printf("<image> synchronize %s to media image '%s'(%s)", filename, image.Name, image.ID)
}
respChan <- nil
log.Printf("<image> %d new media image(s) synchronized", len(filenames))
return manager.SaveData()
}
func (manager * ImageManager) handleSyncDiskImages(owner, group string, respChan chan error) (err error){
var existsKeys = map[string]bool{}
for id, _ := range manager.diskImages{
existsKeys[id] = true
}
var filenames []string
if filenames, err = findAbsentFile(manager.diskPath, DefaultDiskFormat, existsKeys); err != nil{
err = fmt.Errorf("find absent disk images fail: %s", err.Error())
respChan <- err
return
}
if 0 == len(filenames){
respChan <- nil
log.Println("<image> all disk images synchronized, no absent file discovered")
return
}
for _, filename := range filenames{
var now = time.Now()
var timestamp = now.Format(TimeFormatLayout)
var image DiskStatus
image.Owner = owner
image.Group = group
image.Name = fmt.Sprintf("%s_%d", filename, now.Second())
image.Description = fmt.Sprintf("generated by synchronize disk images on %s", timestamp)
image.ID = uuid.NewV4().String()
image.Version = 1
image.Format = DefaultDiskFormat
image.CreateTime = timestamp
image.ModifyTime = timestamp
image.Created = true
image.Locked = false
image.Path = filepath.Join(manager.mediaPath, fmt.Sprintf("%s_v%d.%s", image.ID, image.Version, image.Format))
var info os.FileInfo
var sourceFile = filepath.Join(manager.mediaPath, fmt.Sprintf("%s.%s", filename, DefaultDiskFormat))
if info, err = os.Stat(sourceFile); err != nil{
err = fmt.Errorf("check source disk file '%s' fail: %s", sourceFile, err.Error())
respChan <- err
return
}
image.Size = uint(info.Size())
log.Printf("<image> compute checksum for '%s'...", sourceFile)
if image.CheckSum, err = computeCheckSum(sourceFile); err != nil{
err = fmt.Errorf("compute checksum for '%s' fail: %s", sourceFile, err.Error())
respChan <- err
return
}
if err = os.Rename(sourceFile, image.Path); err != nil{
err = fmt.Errorf("rename '%s' to '%s' fail: %s", sourceFile, image.Path, err.Error())
respChan <- err
return
}
var nameWithGroup = fmt.Sprintf("%s.%s", group, image.Name)
manager.diskImages[image.ID] = image
manager.diskImageNames[nameWithGroup] = true
log.Printf("<image> synchronize %s to disk image '%s'(%s)", filename, image.Name, image.ID)
}
respChan <- nil
log.Printf("<image> %d new disk image(s) synchronized", len(filenames))
return manager.SaveData()
}
func findAbsentFile(targetPath, ext string, existsKeys map[string]bool) (names []string, err error){
var suffix = fmt.Sprintf(".%s", ext)
var exists bool
err = filepath.Walk(targetPath, func(currentFile string, info os.FileInfo, accessErr error) error {
if accessErr != nil{
return fmt.Errorf("access '%s' fail: %s", currentFile, accessErr.Error())
}
if targetPath == currentFile{
return nil
}
if info.IsDir(){
return filepath.SkipDir
}
var base = filepath.Base(currentFile)
if !strings.HasSuffix(base, suffix){
return nil
}
var filename = strings.TrimSuffix(base, suffix)
if _, exists = existsKeys[filename]; !exists{
names = append(names, filename)
}
return nil
})
return
}

2
src/imageserver/image_service.go

@ -44,12 +44,14 @@ func (service *ImageService) OnMessageReceived(msg framework.Message){
case framework.CreateDiskImageRequest:
case framework.DeleteDiskImageRequest:
case framework.ModifyDiskImageRequest:
case framework.SynchronizeDiskImageRequest:
case framework.QueryMediaImageRequest:
case framework.GetMediaImageRequest:
case framework.CreateMediaImageRequest:
case framework.DeleteMediaImageRequest:
case framework.ModifyMediaImageRequest:
case framework.SynchronizeMediaImageRequest:
case framework.DiskImageUpdatedEvent:
default:

44
src/imageserver/sync_disk_images.go

@ -0,0 +1,44 @@
package imageserver
import (
"fmt"
"github.com/project-nano/framework"
"log"
)
type SyncDiskImagesExecutor struct {
Sender framework.MessageSender
ImageServer *ImageManager
}
func (executor *SyncDiskImagesExecutor)Execute(id framework.SessionID, request framework.Message,
incoming chan framework.Message, terminate chan bool) (err error) {
var owner, group string
if owner, err = request.GetString(framework.ParamKeyUser); err != nil {
err = fmt.Errorf("get owner fail: %s", err.Error())
return err
}
if group, err = request.GetString(framework.ParamKeyGroup); err != nil {
err = fmt.Errorf("get group fail: %s", err.Error())
return err
}
log.Printf("[%08X] %s.[%08X] request synchronize disk images...",
id, request.GetSender(), request.GetFromSession())
var respChan = make(chan error, 1)
executor.ImageServer.SyncDiskImages(owner, group, respChan)
err = <- respChan
resp, _ := framework.CreateJsonMessage(framework.SynchronizeDiskImageResponse)
resp.SetSuccess(false)
resp.SetFromSession(id)
resp.SetToSession(request.GetFromSession())
if err != nil{
resp.SetError(err.Error())
log.Printf("[%08X] sync disk images fail: %s", id, err.Error())
}else{
log.Printf("[%08X] disk images synchronized", id)
resp.SetSuccess(true)
}
return executor.Sender.SendMessage(resp, request.GetSender())
}

44
src/imageserver/sync_media_images.go

@ -0,0 +1,44 @@
package imageserver
import (
"fmt"
"github.com/project-nano/framework"
"log"
)
type SyncMediaImagesExecutor struct {
Sender framework.MessageSender
ImageServer *ImageManager
}
func (executor *SyncMediaImagesExecutor)Execute(id framework.SessionID, request framework.Message,
incoming chan framework.Message, terminate chan bool) (err error) {
var owner, group string
if owner, err = request.GetString(framework.ParamKeyUser); err != nil {
err = fmt.Errorf("get owner fail: %s", err.Error())
return err
}
if group, err = request.GetString(framework.ParamKeyGroup); err != nil {
err = fmt.Errorf("get group fail: %s", err.Error())
return err
}
log.Printf("[%08X] %s.[%08X] request synchronize media images...",
id, request.GetSender(), request.GetFromSession())
var respChan = make(chan error, 1)
executor.ImageServer.SyncMediaImages(owner, group, respChan)
err = <- respChan
resp, _ := framework.CreateJsonMessage(framework.SynchronizeMediaImageResponse)
resp.SetSuccess(false)
resp.SetFromSession(id)
resp.SetToSession(request.GetFromSession())
if err != nil{
resp.SetError(err.Error())
log.Printf("[%08X] sync media images fail: %s", id, err.Error())
}else{
log.Printf("[%08X] media images synchronized", id)
resp.SetSuccess(true)
}
return executor.Sender.SendMessage(resp, request.GetSender())
}

17
src/imageserver/task_manager.go

@ -1,6 +1,7 @@
package imageserver
import (
"fmt"
"github.com/project-nano/framework"
)
@ -61,6 +62,22 @@ func CreateTaskManager(sender framework.MessageSender, imageManager *ImageManage
&DiskImageUpdateExecutor{sender, imageManager}); err != nil{
return nil, err
}
if err = manager.RegisterExecutor(framework.SynchronizeDiskImageRequest,
&SyncDiskImagesExecutor{
Sender: sender,
ImageServer: imageManager,
}); err != nil {
err = fmt.Errorf("register sync disk images fail: %s", err.Error())
return nil, err
}
if err = manager.RegisterExecutor(framework.SynchronizeMediaImageRequest,
&SyncMediaImagesExecutor{
Sender: sender,
ImageServer: imageManager,
}); err != nil {
err = fmt.Errorf("register sync disk images fail: %s", err.Error())
return nil, err
}
return &manager, nil
}

63
src/modules/api_module.go

@ -26,12 +26,12 @@ import (
)
type APIModule struct {
server http.Server
exitChan chan bool
currentImageHost string
currentImageURL string
server http.Server
exitChan chan bool
currentImageHost string
currentImageURL string
currentImageProxy *httputil.ReverseProxy
apiCredentials map[string]string
apiCredentials map[string]string
proxy *RequestProxy
resource ResourceModule
}
@ -436,7 +436,7 @@ func (module *APIModule) RegisterAPIHandler(router *httprouter.Router) {
router.POST(apiPath("/media_images/"), module.createMediaImage)
router.PUT(apiPath("/media_images/:id"), module.modifyMediaImage)
router.DELETE(apiPath("/media_images/:id"), module.deleteMediaImage)
router.PATCH(apiPath("/media_images/"), module.syncMediaImages)
router.POST(apiPath("/media_images/:id/file/"), module.redirectToImageServer)
@ -447,6 +447,7 @@ func (module *APIModule) RegisterAPIHandler(router *httprouter.Router) {
router.POST(apiPath("/disk_images/"), module.createDiskImage)
router.PUT(apiPath("/disk_images/:id"), module.modifyDiskImage)
router.DELETE(apiPath("/disk_images/:id"), module.deleteDiskImage)
router.PATCH(apiPath("/disk_images/"), module.syncDiskImages)
router.GET(apiPath("/disk_images/:id/file/"), module.redirectToImageServer)
router.POST(apiPath("/disk_images/:id/file/"), module.redirectToImageServer)//upload from web
@ -2803,7 +2804,6 @@ func (module *APIModule) modifyMediaImage(w http.ResponseWriter, r *http.Request
ResponseOK("", w)
}
func (module *APIModule) deleteMediaImage(w http.ResponseWriter, r *http.Request, params httprouter.Params){
if err := module.verifyRequestSignature(r); err != nil{
ResponseFail(ResponseDefaultError, err.Error(), w)
@ -2827,6 +2827,30 @@ func (module *APIModule) deleteMediaImage(w http.ResponseWriter, r *http.Request
ResponseOK("", w)
}
func (module *APIModule) syncMediaImages(w http.ResponseWriter, r *http.Request, params httprouter.Params){
if err := module.verifyRequestSignature(r); err != nil{
ResponseFail(ResponseDefaultError, err.Error(), w)
return
}
var filterOwner = r.URL.Query().Get("owner")
var filterGroup = r.URL.Query().Get("group")
msg, _ := framework.CreateJsonMessage(framework.SynchronizeMediaImageRequest)
msg.SetString(framework.ParamKeyUser, filterOwner)
msg.SetString(framework.ParamKeyGroup, filterGroup)
var respChan = make(chan ProxyResult, 1)
if err := module.proxy.SendRequest(msg, respChan); err != nil {
log.Printf("<api> send sync media images request fail: %s", err.Error())
ResponseFail(ResponseDefaultError, err.Error(), w)
return
}
_, errMsg, success := IsResponseSuccess(respChan)
if !success {
log.Printf("<api> sync media images fail: %s", errMsg)
ResponseFail(ResponseDefaultError, errMsg, w)
return
}
ResponseOK("", w)
}
func (module *APIModule) queryDiskImage(w http.ResponseWriter, r *http.Request, params httprouter.Params){
if err := module.verifyRequestSignature(r); err != nil{
@ -3129,6 +3153,31 @@ func (module *APIModule) deleteDiskImage(w http.ResponseWriter, r *http.Request,
ResponseOK("", w)
}
func (module *APIModule) syncDiskImages(w http.ResponseWriter, r *http.Request, params httprouter.Params){
if err := module.verifyRequestSignature(r); err != nil{
ResponseFail(ResponseDefaultError, err.Error(), w)
return
}
var filterOwner = r.URL.Query().Get("owner")
var filterGroup = r.URL.Query().Get("group")
msg, _ := framework.CreateJsonMessage(framework.SynchronizeDiskImageRequest)
msg.SetString(framework.ParamKeyUser, filterOwner)
msg.SetString(framework.ParamKeyGroup, filterGroup)
var respChan = make(chan ProxyResult, 1)
if err := module.proxy.SendRequest(msg, respChan); err != nil {
log.Printf("<api> send sync disk images request fail: %s", err.Error())
ResponseFail(ResponseDefaultError, err.Error(), w)
return
}
_, errMsg, success := IsResponseSuccess(respChan)
if !success {
log.Printf("<api> sync disk images fail: %s", errMsg)
ResponseFail(ResponseDefaultError, errMsg, w)
return
}
ResponseOK("", w)
}
func (module *APIModule) handleModifyGuestName(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
if err := module.verifyRequestSignature(r); err != nil{
ResponseFail(ResponseDefaultError, err.Error(), w)

64
src/task/sync_disk_images.go

@ -0,0 +1,64 @@
package task
import (
"github.com/project-nano/core/modules"
"github.com/project-nano/framework"
"log"
"time"
)
type SyncDiskImagesExecutor struct {
Sender framework.MessageSender
ResourceModule modules.ResourceModule
}
func (executor *SyncDiskImagesExecutor)Execute(id framework.SessionID, request framework.Message,
incoming chan framework.Message, terminate chan bool) (err error) {
var originSession = request.GetFromSession()
var respChan = make(chan modules.ResourceResult, 1)
executor.ResourceModule.GetImageServer(respChan)
var result = <- respChan
resp, _ := framework.CreateJsonMessage(framework.SynchronizeDiskImageResponse)
resp.SetSuccess(false)
resp.SetFromSession(id)
resp.SetToSession(request.GetFromSession())
if result.Error != nil{
err = result.Error
log.Printf("[%08X] get image server for sync disk images fail: %s", id, err.Error())
resp.SetError(err.Error())
return executor.Sender.SendMessage(resp, request.GetSender())
}
//forward to image server
request.SetFromSession(id)
request.SetToSession(0)
var imageServer = result.Name
if err = executor.Sender.SendMessage(request, imageServer); err != nil{
log.Printf("[%08X] forward sync disk images request fail: %s", id, err.Error())
resp.SetError(err.Error())
return executor.Sender.SendMessage(resp, request.GetSender())
}
//wait response
timer := time.NewTimer(modules.DefaultOperateTimeout)
select{
case forwardResp := <- incoming:
if !forwardResp.IsSuccess(){
log.Printf("[%08X] sync disk images fail: %s", id, forwardResp.GetError())
}
forwardResp.SetFromSession(id)
forwardResp.SetToSession(originSession)
forwardResp.SetTransactionID(request.GetTransactionID())
//forward
return executor.Sender.SendMessage(forwardResp, request.GetSender())
case <- timer.C:
//timeout
log.Printf("[%08X] sync disk images timeout", id)
resp.SetError("time out")
return executor.Sender.SendMessage(resp, request.GetSender())
}
}

63
src/task/sync_media_images.go

@ -0,0 +1,63 @@
package task
import (
"github.com/project-nano/core/modules"
"github.com/project-nano/framework"
"log"
"time"
)
type SyncMediaImagesExecutor struct {
Sender framework.MessageSender
ResourceModule modules.ResourceModule
}
func (executor *SyncMediaImagesExecutor)Execute(id framework.SessionID, request framework.Message,
incoming chan framework.Message, terminate chan bool) (err error) {
var originSession = request.GetFromSession()
var respChan = make(chan modules.ResourceResult, 1)
executor.ResourceModule.GetImageServer(respChan)
var result = <- respChan
resp, _ := framework.CreateJsonMessage(framework.SynchronizeMediaImageResponse)
resp.SetSuccess(false)
resp.SetFromSession(id)
resp.SetToSession(request.GetFromSession())
if result.Error != nil{
err = result.Error
log.Printf("[%08X] get image server for sync media images fail: %s", id, err.Error())
resp.SetError(err.Error())
return executor.Sender.SendMessage(resp, request.GetSender())
}
//forward to image server
request.SetFromSession(id)
request.SetToSession(0)
var imageServer = result.Name
if err = executor.Sender.SendMessage(request, imageServer); err != nil{
log.Printf("[%08X] forward sync media images request fail: %s", id, err.Error())
resp.SetError(err.Error())
return executor.Sender.SendMessage(resp, request.GetSender())
}
//wait response
timer := time.NewTimer(modules.DefaultOperateTimeout)
select{
case forwardResp := <- incoming:
if !forwardResp.IsSuccess(){
log.Printf("[%08X] sync media images fail: %s", id, forwardResp.GetError())
}
forwardResp.SetFromSession(id)
forwardResp.SetToSession(originSession)
forwardResp.SetTransactionID(request.GetTransactionID())
//forward
return executor.Sender.SendMessage(forwardResp, request.GetSender())
case <- timer.C:
//timeout
log.Printf("[%08X] sync media images timeout", id)
resp.SetError("time out")
return executor.Sender.SendMessage(resp, request.GetSender())
}
}
Loading…
Cancel
Save