sonic在orchagent应用与syncd应用之间,从是否等待最终结果来看,提供了同步与异步两种模式进行交互:
异步模式:每一步操作的终点到写入数据库,写入成功即为成功同步模式:orchagent写入asic_db以后,等待syncd向asic_db写入执行结果
从进程交互来看,提供了两种RPC的交互方式:一种是通过redis的订阅/发布一种是使用ZeroMQ机制
下面分别从RPC交互来看。
1. 基本流程
1.1 基于redis机制
如上图所示,如果是异步模式,那么每一个步骤终点为写入数据库,写入即成功,如上图的蓝色线流程。如果是同步模式,那么在每一个需要调用sai的步骤,orchagent应用都需要死循环等待syncd应用的响应。
应用初始化:
orchagent应用:订阅config_db相关表项的事件通知,使用的redis的pub/sub机制
orchagent应用初始化sairedis抽象层:通过asic_db发布通知,使用redis的list队列,显式调用publish命令,发布频道订阅的相关通知;订阅asic_db相关频道,通过redis的list队列获取response;根据配置信息,初始化应用之间的交互通道–redis channel
如果是同步模式:所有操作都会通过订阅redis的事件通知得到响应,设置超时阻塞等待,获取到对应的response如果是异步模式:只有get操作才会等待syncd应用的返回
syncd应用:订阅asic_db相关频道,通过redis的list队列获取命令;通过asic_db发布通知,使用redis的list队列,显式调用publish命令,发布response
如果是同步模式:所有操作会向asic_db中写入操作结果如果是异步模式:只有get操作会向asic_db写入结果
下发流程为:
4. cli应用调用hiredis库直接将相关信息写入config_db对应表
5. orchagent应用接收到config_db相关表项的事件通知,针对通知信息做一些预处理,主要是将通知信息转化为与sai接口对应的属性对象
6. orchagent应用调用sairedis接口
7. sairedis应用序列化得到的通知信息,主要是Object id、attribute list,将事件写入redis list,显式调用publish发布频道订阅通知
- 如果是同步模式:所有操作都会阻塞等待redis的相关通知,直到获取到对应通知或者超时
- 如果是异步模式:只有get操作会阻塞等待redis的相关通知,直到获取到对应通知或者超时
syncd应用接收到来自asic_db的事件通知,从redis list取出具体的通知信息,进行预处理,主要是做一些属性验证,以及将操作信息写入asic_db的asic_state表中
如果是同步模式:得到response以后,所有操作都会向redis相关表项写入操作结果如果是异步模式:只有get操作会向redis相关表项写入查询到的信息
syncd应用调用sai接口
通过sai接口进一步调用厂商提供的具体实现
sdk会将得到的Object下发到asic
同步模式下的响应流程为:
sdk得到asic的responsesai接口得到responsesyncd应用得到responsesyncd应用将response写入asic_db相关表项sairedis应用接收到来自asic_db的response通知orchagent应用得到response,写相关Log
1.2 基于ZeroMQ机制
同理,上图的蓝色线流程是异步模式下的流程,红色线是同步模式下的响应流程。(当前交换机是默认模式,即redis的异步模式,不能百分百确定流程是正确的) – ZeroMQ机制只支持同步模式
应用初始化:
orchagent应用:订阅config_db相关表项的事件通知,使用的redis的pub/sub机制
orchagent应用初始化sairedis抽象层:与syncd之间构建ZeroMQ socket
如果是同步模式:所有操作都会设置超时阻塞,等待获取socket的response如果是异步模式:只有get操作才会等待syncd应用的返回
syncd应用:与sairedis之间创建ZeroMQ Socket,等待接收来自sairedis的请求;通过socket进行响应
如果是同步模式:所有操作会通过socket写入response如果是异步模式:只有get操作会向socket写入response
下发流程为:
cli应用调用hiredis库直接将相关信息写入config_db对应表orchagent应用接收到config_db相关表项的事件通知,针对通知信息做一些预处理,主要是将通知信息转化为与sai接口对应的属性对象orchagent应用调用sairedis接口sairedis应用序列化得到的通知信息,主要是Object id、attribute list,将事件通过ZeroMQ socket写入到syncd
如果是同步模式:所有操作都会阻塞等待来自socket的response,直到获取到对应数据或者超时如果是异步模式:只有get操作会阻塞等待来自socket的response,直到获取到对应数据或者超时 syncd应用接收到来自socket的请求,获取信息进行预处理,主要是做一些属性验证 – 暂时不确定这一步会不会将信息写入asic_db中的asic_state表中
如果是同步模式:得到response以后,所有操作都会向socket写入操作结果如果是异步模式:只有get操作会向socket写入查询到的信息 syncd应用调用sai接口通过sai接口进一步调用厂商提供的具体实现sdk会将得到的Object下发到asic
同步模式下的响应流程为:
sdk得到asic的responsesai接口得到responsesyncd应用得到responsesyncd应用将response写入socketsairedis应用接收到来自socket的responseorchagent应用得到response,写相关Log
以异步模式下的redis通信流程为例进行详细介绍,同步模式的redis以及ZeroMQ大体是差不多的,命令执行结果向上响应的过程与命令下发的过程是一样的。
2. 异步模式下的redis通信流程
cli写入config_dborch应用接收并处理通知sairedis写入asic_dbsyncd应用接收并处理通知
2.1 cli写入config_db
上述流程是cli写入数据库的基本流程,与redis数据库的连接使用hiredis库,命令处理函数入口位于
:
sonic-utilities/config/main.py
调用hiredis库连接数据库
ConfigDBConnector_Native::db_connect() ->
SonicV2Connector_Native::connect() ->
DBInterface::connect() ->
DBInterface::_onetime_connect() ->
DBConnector::DBConnector() ->
RedisContext::initContext() ->
hiredis::redisConnect()
调用hiredis库选择config_db库:
SELECT CONFIG_DB_INDEX
DBConnector::select() -> //构造命令参数
RedisReply::RedisReply() -> //构造函数,直接执行了写入与获取响应
hiredis::redisAppendFormattedCommand() -> //写入redis命令
hiredis::redisGetReply() //获取redis命令反馈
调用hiredis配置订阅模式:
CONFIG SET notify-keyspace-events AKE
DBConnector::config_set() ->
RedisCommand::format() -> //格式化redis config命令
RedisReply::RedisReply() -> //构造函数,直接执行了写入与获取响应
hiredis::redisAppendFormattedCommand() -> //写入redis命令
hiredis::redisGetReply() //获取redis命令反馈
从config_db获取初始化标志位:
GET CONFIG_DB_INITIALIZED
DBConnector::get() ->
RedisCommand::format() -> //格式化redis config命令
RedisReply::RedisReply() -> //构造函数,直接执行了写入与获取响应
hiredis::redisAppendFormattedCommand() -> //写入redis命令
hiredis::redisGetReply() //获取redis命令反馈
判断是否已经初始化: 如果没有初始化,执行下一步;如果已经初始化,执行第11步
调用hiredis库执行订阅:
。
PSUBSCRIBE __keyspace@4__:CONFIG_DB_INITIALIZED
这个过程会创建一个新的连接专门用于接收订阅的事件通知: 连接成功得到的结果是hiredis提供的
对象,其中包含连socket的fd,用于epoll循环监听通知信息。
redisContext
PubSub::psubscribe() -> //模式订阅
RedisSelect::psubscribe() ->
DBConnector::newConnector() -> //根据当前连接创建一个新的连接,用于得到一个新的fd,专门用于监听订阅事件
DBConnector::psubscribe() -> //构造命令: `PSUBSCRIBE __keyspace@4__:CONFIG_DB_INITIALIZED`
RedisReply::RedisReply()
将第6步得到的消费者对象加入
– 一个
m_select
类,用于管理当前的所有消费者: 将socket对应的描述符fd加入红黑树开始监听
Select
epoll循环获取订阅的事件通知
PubSub::listen_message() ->
PubSub::get_message_internal() ->
Select::select() -- important,执行epoll_wait()
上述流程图是
的过程,epoll循环监听事件通知,然后读取。
Select::select()
调用epoll_wait获取事件通知信息遍历每一个有事件通知的fd对应的消费者对象
读取数据
Selectable::readData()
是一个虚类,
Selectable
的具体实现需要结合上下文,看当前消费者对象是使用哪一个子类实例化的。将当前消费者对象保存到
readData()
中 –
m_ready
是一个待处理的消费者对象列表 从
m_ready
中获取第一个消费者对象删除该对象更新该对象的时间戳判断当前对象是否存在待处理事件,如果存在则继续,否则执行第3步将该对象赋值给指针参数: 传址得到要处理的消费者对象判断当前消费者是否存在不只一个事件通知需要处理,如果是则继续,否则执行第10步将该对象重新插入到
m_ready
中,还需要继续等待轮询处理更新当前对象的待处理消息数量
m_ready
如果得到通知则执行下一步,否则执行上一步
从config_db获取初始化标志位 – 参考第4步
判断是否已经初始化: 如果没有初始化,执行第6步;如果已经初始化,执行下一步
调用hiredis库取消订阅:
PUNSUBSCRIBE __keyspace@4__:CONFIG_DB_INITIALIZED
PubSub::punsubscribe() ->
RedisSelect::punsubscribe() ->
DBConnector::punsubscribe()
调用hiredis库写入config_db
ConfigDBConnector_Native::set_entry()
构建hash值:
,如
to_upper(table)|key
如果命令行写入参数为空,则执行下一步,否则执行第4步调用hireds从数据库中删除这一条记录,结束:
ACL_TABLE|TABLE_NAME
DEL ACL_TABLE|TABLE_NAME
DBConnector::del()
调用hieredis获取hash值对应的记录信息:
HGETALL ACL_TABLE|TABLE_NAME
ConfigDBConnector_Native::get_entry() ->
RedisClient::hgetall() ->
DBConnector::hgetall() ->
调用hiredis创建或者覆盖hash值对应的记录:
HMSET ACL_TABLE|TABLE_NAME FIELD1 VALUE1 ...FIELDN VALUEN
DBConnector::hmset()
如果hash值对应记录原本就存在,那么针对每一个field进行判断,如果原来存在但是更新的不存在,那么执行删除字段操作:
HDEL ACL_TABLE|TABLE_NAME FIELD1
DBConnector::hdel()
2.2 orchagent处理
2.2.1 订阅
与cli写入config_db小节的第6步订阅是一样的操作,调用hiredis向redis写入订阅命令,如
。每一个
PSUBSCRIBE __keyspace@4__:ACL_TABLE|*
都会继承同一个父类
agent
,在父类的构造函数中执行订阅操作:
Orch()
Orch::Orch() ->
Orch::addConsumer() ->
SubscriberStateTable::SubscriberStateTable() -> //不同的db会构造不同的子类,config_db是通过该子类构造消费者对象
RedisSelect::subscribe()
2.2.2 事件监听
这里存在多个子代理,所以在进程中对所有子代理的消费者对象通过
对象进行统一管理,与cli写入config_db小节的第8步是一样的,根据时间戳大小轮询选择消费者对象进行处理。 –
Select
OrchDaemon::start()
2.2.3 事件预处理
首先说明,这里的订阅对象从内核中读取到事件通知以后存到一个缓冲队列
–
m_keyspace_event_buffer
,还存在一个待处理队列
readData()
。
m_toSync
事件处理之前需要将
同步到
m_keyspace_event_buffer
中:
m_toSync
Consumer::execute() ->
SubscriberStateTable::pops() ->
Consumer::addToSync()
如上图所示,预处理会做一些简单的判断:
从
中获取event –
m_keyspace_event_buffer
,如果为空则结束
SubscriberStateTable::popEventBuffer()
event信息可以看成是一个json数据(map对象):
判断订阅的模式是否匹配当前消费者对象的订阅模式判断频道是否匹配如果是set操作,执行下一步,否则执行第7步调用hiredis从数据库中获取该条记录对应的详细信息:
event={"type":"pmessage","pattern":"__keyspace@4__:ACL_TABLE|*","channel":"__keyspace@4__:ACL_TABLE|TEST","data":"set"}
HGETALL ACL_TABLE|TABLE_NAME
Table::get()
构建
操作的对象,形如:
SET
构建
<key, "set", <<field1,value1>,<field2,value2>,...>
操作的对象,形如:
DEL
<key, "set/del",null>
上述7步是
操作,下面是
SubscriberStateTable::pops()
操作,同步events信息到
Consumer::addToSync()
(multimap对象) – 事件处理队列,守护进程处理的是这个map中订阅对象的events。如果通知信息中key不在m_toSync中,则直接插入,结束,否则执行下一步判断如果是一条DEL操作的通知,擦除m_toSync对应key的原有记录,插入新的通知,结束,否则执行下一步如果是一条SET操作的通知,找到m_toSync中对应key的记录且是SET操作的数据,使用新的通知中表字段信息取代原来的信息,且保留新的信息中不存在但是原来信息中就有的字段信息
m_toSync
2.2.4 事件处理
进行事件处理,调用sairedis api将信息写入asic_db – `AclOrch::doAclTableTask():
上述流程是处理SET操作的基本流程:
创建一个AclTable对象,根据通知信息设置table属性 –
AclTable::AclTable()
descriptiontable type –
ports: PortsOrch初始化时会调用sai获取ports信息 –
AclOrch::processAclTableType()
AclOrch::processAclTablePorts()
判断需要绑定的port是否处于ready状态,如果不是则跳过,如果是则进一步获取port id将port id加入table对象的port成员 stage: 仅支持INGRESS&EGRESS –
验证table类型是否支持 –
AclOrch::processAclTableStage()
针对table类型添加相关stage的必要属性 –
AclTable::validateAddType()
针对stage类型添加必要的acl action –
AclTable::addStageMandatoryMatchFields()
再次验证table的stage信息以及acl action信息 –
AclTable::addMandatoryActions()
获取
AclTable::validate()
–
object id
AclOrch::getTableById()
如果
(
table id
)之前已经创建过,那么将直接获取到对应的oid如果创建一个新的
table name
,那么
table
返回
object id
,等于0
SAI_NULL_OBJECT_ID
是否已经存在,即
table
是否等于
object id
:
SAI_NULL_OBJECT_ID
,存在,则采取update操作 –
object id!=SAI_NULL_OBJECT_ID
AclOrch::updateAclTable()
,不存在,则采取create操作 –
object id==SAI_NULL_OBJECT_ID
AclOrch::addAclTable()
上述两个操作都是通过调用
写入
sairedis api
asic_db
2.3 sairedis
是一层抽象层,用于与syncd应用构建通信通道,因为存在两种通信通道:
sairedis
&
Redis Channel
。
ZeroMQ Channel
实际上存在两个步骤:
addAclTable()
–
create table
AclTable::create()
–
bind ports
AclOrch::bindAclTable()
2.3.1 create table
构建与sai对应的属性信息
:
SAI_ACL_TABLE_ATTR_ACL_BIND_POINT_TYPE_LIST
typedef struct _sai_s32_list_t
{
uint32_t count;
int32_t *list;
} sai_s32_list_t;
对应的
key
类型为上述代码所示,假设是
value
类型,那么其中:
L3
: 2 – 表示可绑定的point类型的数量
count
:
list
– 表示可绑定的point类型
{SAI_ACL_BIND_POINT_TYPE_PORT, SAI_ACL_BIND_POINT_TYPE_LAG}
:
SAI_ACL_TABLE_ATTR_ACL_ACTION_TYPE_LIST
typedef struct _sai_s32_list_t
{
uint32_t count;
int32_t *list;
} sai_s32_list_t;
对应的
key
类型为上述代码所示,其中:
value
: 表示可绑定的action类型的数量
count
: 表示可绑定的action类型
list
如果是
类型,不存在
L3
,所以该属性不会发送给syncd应用
action list
:
SAI_ACL_TABLE_ATTR_ACL_STAGE
typedef int32_t sai_int32_t;
可设置为:
ingress
egress
每一种类型可macth的字段是不一样的,假设是
类型,那么:
L3
typedef struct _sai_attribute_t
{
/** @passparam meta */
sai_attr_id_t id;
/** @passparam meta */
sai_attribute_value_t value;
} sai_attribute_t;
每一种可匹配类型转换成为上述格式,分为两种匹配类型,一种是基本匹配信息,其中
表示属性
id
,
id
选择
value
类型,表示是否支持该属性值,全部设置为
bool
:
true
SAI_ACL_TABLE_ATTR_FIELD_ETHER_TYPE
SAI_ACL_TABLE_ATTR_FIELD_OUTER_VLAN_ID
SAI_ACL_TABLE_ATTR_FIELD_ACL_IP_TYPE
SAI_ACL_TABLE_ATTR_FIELD_SRC_IP
SAI_ACL_TABLE_ATTR_FIELD_DST_IP
SAI_ACL_TABLE_ATTR_FIELD_ICMP_TYPE
SAI_ACL_TABLE_ATTR_FIELD_ICMP_CODE
SAI_ACL_TABLE_ATTR_FIELD_IP_PROTOCOL
SAI_ACL_TABLE_ATTR_FIELD_L4_SRC_PORT
SAI_ACL_TABLE_ATTR_FIELD_L4_DST_PORT
SAI_ACL_TABLE_ATTR_FIELD_TCP_FLAGS
一种是范围匹配信息,其中
表示属性
id
,
id
选择
value
类型,对应
sai_s32_list_t
为
key
,其中:
SAI_ACL_TABLE_ATTR_FIELD_ACL_RANGE_TYPE
: 2 – 表示支持的
count
的数量
range type
:
list
– 表示支持的
{SAI_ACL_RANGE_TYPE_L4_SRC_PORT_RANGE, SAI_ACL_RANGE_TYPE_L4_DST_PORT_RANGE}
最终实现是
range type
– 下面的步骤是
RedisRemoteSaiInterface::create()
对象的处理生成oid –
RedisRemoteSaiInterface
VirtualObjectIdManager::allocateNewObjectId()
检查
是否位于范围内,否则抛出错误检查
object type
是否有效,否则抛出错误生成
switch id
: 调用
oid
的
redis
命令增加
incr
中
asic_db
的值如果
VIDCOUNTER
超过
oid
的最大值,抛出错误构建最终
object id
:
oid
,
switch_id
,
oid
序列化
guid
–
oid
序列化属性列表,循环属性列表
"oid:0x%"
根据
以及
objectType
获取对应的详细的属性信息 – sdk,是通过perl语言拿到的变量信息如果不存在,抛出错误存在则获取
attr id
根据
attr name
的类型进行序列化构成
attr.value
:
pair
序列化
<attr_name,serialized_attr_value>
: 根据
object type
获取对应的详细的类型信息,然后得到type name – sdk,是通过perl语言拿到的变量信息组成
object type
:
key
通知发布: 调用
serialized_object_type:oid:0x%
写入
m_communicationChannel->set()
,参数信息为:
asic_db
– 当前配置的通信模式为
{"key":"", "entry":"", "op":"create"}
,所以使用的子类
RedisChannel Async Mode
初始化通信通道
RedisChannel
将详细的通知信息写入
的
asic_db
:
list
LPUSH ASIC_STATE_KEY_VALUE_OP_QUEUE key value op
: 如
key
SAI_OBJECT_TYPE_ACL_TABLE:oid:0x%
value
: 如
op
–
REDIS_ASIC_STATE_COMMAND_CREATE
发布通知:
create
等待response –
PUBLISH ASIC_STATE_CHANNEL G
RedisRemoteSaiInterface::waitForResponse()
如果成员
为false,是异步模式,直接返回success如果成员
m_syncMode
为true,是同步模式,开启对频道
m_syncMode
的订阅与监听,获取通知信息输出LOG
ASIC_STATE_CHANNEL@asic_db_index
2.3.2 bind ports
最终调用的是Port的守护进程进行绑定 –
,分为几个大步骤:
gPortsOrch->bindAclTable()
创建table group
构建table group属性
:
SAI_ACL_TABLE_GROUP_ATTR_ACL_STAGE
SAI_ACL_STAGE_INGRESS/SAI_ACL_STAGE_EGRESS
:
SAI_ACL_TABLE_GROUP_ATTR_ACL_BIND_POINT_TYPE_LIST
{count:1, list:SAI_ACL_BIND_POINT_TYPE_PORT}
:
SAI_ACL_TABLE_GROUP_ATTR_TYPE
调用
SAI_ACL_TABLE_GROUP_TYPE_PARALLEL
创建 – 与上一小节的2-9步骤是一样的 将table group与port绑定
sai_acl_api->create_acl_table_group()
属性信息:
调用
{attr.id=SAI_PORT_ATTR_INGRESS_ACL/SAI_PORT_ATTR_EGRESS_ACL, atr.value.oid=m_ingress_acl_table_group_id/m_egress_acl_table_group_id}
设置port对应的table group – 与上一小节的2-9步骤是类似的 创建group member
sai_port_api->set_port_attribute()
属性信息
:
SAI_ACL_TABLE_GROUP_MEMBER_ATTR_ACL_TABLE_GROUP_ID
group_oid
:
SAI_ACL_TABLE_GROUP_MEMBER_ATTR_ACL_TABLE_ID
table_oid
: 100 调用
SAI_ACL_TABLE_GROUP_MEMBER_ATTR_PRIORITY
创建 – 与上一小节的2-9步骤是一样的
sai_acl_api->create_acl_table_group_member()
创建一个acl table并绑定到端口的过程,实际上需要通过acl table group进行连接。如上图所示,table1和table2是ingress的一个group的member,而该group又与port1/2/3相连,即table1与table2的规则会应用到port1/2/3的入口流量,而table3的规则会应用到port1/2/3的出口流量。
上述步骤描述的是基于redis机制的模式,如果是ZeroMQ机制,只支持同步模式,那么将直接与syncd应用建立通信,与类
相关。
ZeroMQChannel
2.4 syncd处理
2.4.1 订阅
与cli写入config_db小节的第6步订阅是一样的操作,调用hiredis向redis写入订阅命令,但是这里是使用频道订阅,订阅的是asic_db中asic_state表的变化 –
。在syncd应用初始化的时候进行订阅:
SUBSCRIBE ASIC_STATE_CHANNEL@asic_db_index
Syncd::Syncd() ->
RedisSelectableChannel::RedisSelectableChannel() ->
ConsumerTable::ConsumerTable() ->
RedisSelect::subscribe()
2.4.2 监听
这里存在多个不同的消费者对象,订阅多个数据库,所以在主进程中对所有子代理的消费者对象通过
对象进行统一管理,与cli写入config_db小节的第8步是一样的,根据时间戳大小轮询选择消费者对象进行处理。 –
Select
Syncd::run()
2.4.3 事件预处理
这里的事件预处理与orchagent不同,因为使用的是不同的消息队列,这里使用redis的list存储通知的详细信息 –
:
ConsumerTable::pops()
成员
是待处理的事件通知队列,如果为空,则首先访问redis数据库获取详细的事件通知
m_buffer
使用
命令调用
EVALSHA
脚本: 当前配置是读取redis的
consumer_table_pops.lua
队列的事件通知,与orchagent处理章节中的事件处理小节的第8步发布的通知相对应,然后将信息写入
ASIC_STATE_KEY_VALUE_OP_QUEUE
的
ASIC_DB
表中,记录的
ASIC_STATE
值形如
key
ASIC_STATE:SAI_OBJECT_TYPE_ACL_TABLE:oid:0x%
LRANGE ASIC_STATE_KEY_VALUE_OP_QUEUE start-index end-index
然后解析事件通知,每一个事件通知格式为
,存储为一个
<key,op,values>
– 存储的是序列化的值
vector
如果选择了这个消费者对象,从
中取出事件通知进行处理
m_buffer
2.4.4 事件处理
处理就是调用switch sai,进一步调用sdk的具体实现写入asic。