技术漫谈 | 集群中容器间的通迅

2017-10-11 ReafeMerb

作者:周益

相关技术

redis 服务发现

mysql 数据持久化

golang 原生rpc-json

我们在k8s中启动3个restful api service实例,当对外提供服务的时候,这个api service 可能出现实例的内存数据不一致的情况。

原本consul作为服务发现是最佳的。为了进一步减少组件,我们使用redis来完成服务发现的功能。

APService 3个实例如下

  • Instance1 192.168.0.101
  • Instance2 192.168.0.102
  • Instance3192.168.0.103

1.instance 启动之后,每秒发送一次心跳数据包。如果3秒之类没有发起,redis自动过期。

我们可以认为instance下线了。

2.获得在线的服务器列表

Response:

  • Instance1
  • Instance2
  • Instance3

3.获得真实的

Response:

  • 192.168.0.101
  • 192.168.0.102
  • 192.168.0.103

当外部客户端调用如果Instance 有写入数据的操作,有以下的4个步骤:

  1. 获得除了自己之外的在线服务器ip地址。
  2. 通过RPC调用直接修改其它服务器的内存数据 Instance1--rpc-->Instance2
    修改其内存数据 Instance1--rpc-->Instance3 修改其内存数据。
    3.RPC调用都成功后,再将数据写入mysql数据库中,否则对外报告失败了。
    4.仅由当前的Instance1下发配置信息到agent。

容器之间的通迅可以使用golang 原生的rpc-json 来实现peer to peer调用

每个instance 都作为服务器,又都作为客户端。

1.instance 启动时,监听着7771端口。

globalClusterServer := NewClusterServer()  

globalClusterServer.Init()

2.instance 在需要的同步数据的地方直接调用。

globalClusterClient = NewClusterClient()

globalClusterClient.Init()if err := globalClusterClient.StoreIngressRule(request, response); err!=nil {    log.Error(err.Error())

}if err := globalClusterClient.StoreIngressConfig(reqeust, response); err!=nil {    log.Error(err.Error())

}

cluster_server.go

package ingress



import ("fmt"

"errors"

"net"

"net/rpc"

"net/rpc/jsonrpc"

"github.com/astaxie/beego/utils"

"wise2c/wisecloud-ingress-controller/log")



type ICAPIServer intfunc (t *ICAPIServer) StoreIngressRule(request *RequestStoreIngressRule, response *ResponseStoreIngressRule) error {if request == nil {

return errors.New("request is empty.")

}if response == nil {

return errors.New("response is empty.")

}



err := globalIngresProcess.MemStoreIngressRule(&request.IngressRule)if err != nil {

response.Set(0, fmt.Sprintf("MemStoreIngressRule is failed.%s", err.Error()))

return err

}



response.Set(1, "ok")

return nil

}//////////////////////////////////////////////////////////////////////////////////////type ClusterServer struct {

Portocol string

Port     string

Listener   *net.TCPListener

}func NewClusterServer() *ClusterServer {

return &ClusterServer{

Portocol  : "tcp",

Port      : common.ClusterPort,

Listener    : nil,

}

}func (this *ClusterServer) Init() {

icapi_server := new(ICAPIServer)

rpc.Register(icapi_server)



addr, _ := net.ResolveTCPAddr("tcp", this.Port)

var err error

this.Listener, err = net.ListenTCP("tcp", addr)if err != nil {

panic(err)

}

log.Info("The Controller's cluster server is listened[%s].", addr)



go this.Listen()

}func (this *ClusterServer) Listen() {for {

conn, e := this.Listener.Accept()if e != nil {continue

}

go jsonrpc.ServeConn(conn)

}

}





globalClusterServer := NewClusterServer()

globalClusterServer.Init()

cluster_client.go

"net/rpc/jsonrpc"

"wise2c/wisecloud-ingress-controller/log"

"wise2c/wisecloud-ingress-controller/common")type ICAPIClient struct {

Client     *rpc.Client

Retry      int}func NewICAPIClient(address string) (*ICAPIClient, error) {

api_client := &ICAPIClient{

Client :nil,

Retry  :3,

}var err error

api_client.Client, err = jsonrpc.Dial("tcp", address)if err != nil {return nil, errors.New(fmt.Sprintf("ICAPIClient connect %s is failed. %s", address, err.Error()))

}return api_client, nil}func (this *ICAPIClient) Close() error {if this.Client == nil {return nil

}



err := this.Client.Close()if err != nil {return errors.New(fmt.Sprintf("ICAPIClient close is failed. %s", err.Error()))

}



this.Client = nil

return nil}func (this *ICAPIClient) Handler(func_name string,request interface{}, response interface{}) error {if this.Client == nil {return errors.New(fmt.Sprintf("ICAPIClient's client is null"))

}for i:=0; i<this.Retry; i++ {var err error

err = this.Client.Call(func_name, request, response)if err != nil {

log.Error(fmt.Sprintf("%s is failed. %s", func_name, err.Error()))

time.Sleep(1*time.Second)continue

}break

}return nil}func (this *ICAPIClient) StoreIngressRule(request *RequestStoreIngressRule, response *ResponseStoreIngressRule) error {return this.Handler("ICAPIServer.StoreIngressRule", request, response)

}func (this *ICAPIClient) StoreIngressConfig(request *RequestStoreIngressConfig, response *ResponseStoreIngressConfig) error {return this.Handler("ICAPIServer.StoreIngressConfig", request, response)

}////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////type ClusterClient struct {

Portocol string

Port     string}func NewClusterClient() *ClusterClient {return &ClusterClient{}

}func (this *ClusterClient) Init() {

this.Portocol = "tcp"

this.Port     = common.ClusterPort

}type IngressRuleCallback func(request, response interface{}, client *ICAPIClient) errorfunc (client *ClusterClient) IngressRuleHandler(request, response interface{}, callback IngressRuleCallback )  (err error){

cluster_ips, err := globalRedisClient.GetClusterOtherIP(globalSelfPodName)if err != nil {return errors.New(fmt.Sprintf("GetOtherController() is failed. %s", err.Error()))

}



err_msg := ""

for _, ip := range cluster_ips {



address := fmt.Sprintf("%s%s", ip, client.Port)



rpc_client, err := NewICAPIClient(address)if err != nil {

err_msg += fmt.Sprintf("connect cluster %s is failed.%s", address, err.Error())continue

}defer rpc_client.Close()if callback != nil {

err = callback(request, response, rpc_client)if err != nil {

err_msg += fmt.Sprint("call cluster is failed.", address, err.Error())

}

}

}if err_msg != "" {return errors.New(err_msg)

}return}func (this *ClusterClient) StoreIngressRule(request *RequestStoreIngressRule, response *ResponseStoreIngressRule) error {return  this.IngressRuleHandler(request, response, func(request, response interface{}, client *ICAPIClient) error {return client.StoreIngressRule(request.(*RequestStoreIngressRule), response.(*ResponseStoreIngressRule))

})

}func (this *ClusterClient) StoreIngressConfig(request *RequestStoreIngressConfig, response *ResponseStoreIngressConfig) error {return  this.IngressRuleHandler(request, response, func(request, response interface{}, client *ICAPIClient) error {return client.StoreIngressConfig(request.(*RequestStoreIngressConfig), response.(*ResponseStoreIngressConfig))

})

}



globalClusterClient = NewClusterClient()

globalClusterClient.Init()

文末福利:请大家关注【Wise2C】并回复【进群】,睿云小助手会第一时间拉你进入【 Docker企业落地实践群】,我们分享的各个企业案例项目的技术专家与用户代表,正在敬候您的光临,期待大家就项目的更多细节与疑问与群里的大牛们进行咨询探讨。

需要了解更多有关睿云智合的客户项目细节,请在Wise2C公众号中最佳实践菜单中查看。

若需要了解更多有关Wise系列PaaS产品的详情,请与我们的市场团队联系: contact@wise2c.com


用户评论
开源开发学习小组列表