问题描述
发现在node-1,2,3节点sar不定时出现较大的rx/tx的统计远远超出网卡的正常带宽。
排除思路
- 首先怀疑是sar工具导致的,所以将sar升级到centos8的最新版本,目前还没有复现问题;
- 通过EMS的监控可以发现ovs的统计也出现较大的流量,所以也怀疑是不是底层驱动出现问题,不一定是sar的问题;
分析/proc/net/dev的统计
enp6s0f0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500 ether f8:f2:1e:63:07:0c txqueuelen 1000 (Ethernet) RX packets 65159306397 bytes 77198981900481 (70.2 TiB) RX errors 0 dropped 105116416 overruns 0 frame 0 TX packets 69105547360 bytes 85071138279622 (77.3 TiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 Inter-| Receive | Transmit face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed enp6s0f0: 77198981900481 65159306397 0 105116416 0 0 0 541703498 85071138279622 69105547360 0 0 0 0 0
通过ifconfig和查看
/proc/net/dev
的统计发现 Receive的drop项有丢包105116416,Transmit errs项有错包69105547360,现在通过代码追一下它们的具体来源。
字段 | 解释 |
---|---|
bytes | The total number of bytes of data transmitted or received by the interface. |
packets | The total number of packets of data transmitted or received by the interface. |
errs | The total number of transmit or receive errors detected by the device driver. |
drop | The total number of packets dropped by the device driver. |
fifo | The number of FIFO buffer errors. |
frame | The number of packet framing errors. |
colls | The number of collisions detected on the interface. |
compressed | The number of compressed packets transmitted or received by the device driver. (This appears to be unused in the 2.2.15 kernel.) |
carrier | The number of carrier losses detected by the device driver. |
multicast | The number of multicast frames transmitted or received by the device driver. |
由上面的解释可以看到drop是发生在网卡的驱动层面,我们深入查看以下代码。
/proc/net/dev
的代码在net/core/net-procfs.c
,可以发现上面的打印主要是函数dev_seq_printf_stats
78 static void dev_seq_printf_stats(struct seq_file *seq, struct net_device *dev)
/* [previous][next][first][last][top][bottom][index][help] */
79 {
80 struct rtnl_link_stats64 temp;
81 const struct rtnl_link_stats64 *stats = dev_get_stats(dev, &temp);
82
83 seq_printf(seq, "%6s: %7llu %7llu %4llu %4llu %4llu %5llu %10llu %9llu "
84 "%8llu %7llu %4llu %4llu %4llu %5llu %7llu %10llu\n",
85 dev->name, stats->rx_bytes, stats->rx_packets,
86 stats->rx_errors,
87 stats->rx_dropped + stats->rx_missed_errors,
88 stats->rx_fifo_errors,
89 stats->rx_length_errors + stats->rx_over_errors +
90 stats->rx_crc_errors + stats->rx_frame_errors,
91 stats->rx_compressed, stats->multicast,
92 stats->tx_bytes, stats->tx_packets,
93 stats->tx_errors, stats->tx_dropped,
94 stats->tx_fifo_errors, stats->collisions,
95 stats->tx_carrier_errors +
96 stats->tx_aborted_errors +
97 stats->tx_window_errors +
98 stats->tx_heartbeat_errors,
99 stats->tx_compressed);
100 }
dev_seq_printf_stats
函数里,drop的部分是有两个统计组成:
stats->rx_dropped + stats->rx_missed_errors
。
继续查看dev_get_stats
的函数可知,rx_dropped
和rx_missed_errors
都是从设备中获取的,需要设备驱动实现。
8737 struct rtnl_link_stats64 *dev_get_stats(struct net_device *dev,
/* [previous][next][first][last][top][bottom][index][help] */
8738 struct rtnl_link_stats64 *storage)
8739 {
8740 const struct net_device_ops *ops = dev->netdev_ops;
8741
8742 if (ops->ndo_get_stats64) {
8743 memset(storage, 0, sizeof(*storage));
8744 ops->ndo_get_stats64(dev, storage);
8745 } else if (ops->ndo_get_stats) {
8746 netdev_stats_to_stats64(storage, ops->ndo_get_stats(dev));
8747 } else {
8748 netdev_stats_to_stats64(storage, &dev->stats);
8749 }
8750 storage->rx_dropped += (unsigned long)atomic_long_read(&dev->rx_dropped);
8751 storage->tx_dropped += (unsigned long)atomic_long_read(&dev->tx_dropped);
8752 storage->rx_nohandler += (unsigned long)atomic_long_read(&dev->rx_nohandler);
8753 return storage;
8754 }
结构体rtnl_link_stats64
定义如下:
57 /* The main device statistics structure */
58 struct rtnl_link_stats64 {
59 __u64 rx_packets; /* total packets received */
60 __u64 tx_packets; /* total packets transmitted */
61 __u64 rx_bytes; /* total bytes received */
62 __u64 tx_bytes; /* total bytes transmitted */
63 __u64 rx_errors; /* bad packets received */
64 __u64 tx_errors; /* packet transmit problems */
65 __u64 rx_dropped; /* no space in linux buffers */
66 __u64 tx_dropped; /* no space available in linux */
67 __u64 multicast; /* multicast packets received */
68 __u64 collisions;
69
70 /* detailed rx_errors: */
71 __u64 rx_length_errors;
72 __u64 rx_over_errors; /* receiver ring buff overflow */
73 __u64 rx_crc_errors; /* recved pkt with crc error */
74 __u64 rx_frame_errors; /* recv'd frame alignment error */
75 __u64 rx_fifo_errors; /* recv'r fifo overrun */
76 __u64 rx_missed_errors; /* receiver missed packet */
77
78 /* detailed tx_errors */
79 __u64 tx_aborted_errors;
80 __u64 tx_carrier_errors;
81 __u64 tx_fifo_errors;
82 __u64 tx_heartbeat_errors;
83 __u64 tx_window_errors;
84
85 /* for cslip etc */
86 __u64 rx_compressed;
87 __u64 tx_compressed;
88
89 __u64 rx_nohandler; /* dropped, no handler found */
90
91 #ifdef __KERNEL__
92 #ifndef __GENKSYMS__
93 char __rh_tail[0];
94 __u64 __rh_reserved_1;
95 __u64 __rh_reserved_2;
96 __u64 __rh_reserved_3;
97 __u64 __rh_reserved_4;
98 #endif
99 #endif
100 };
通过上面的代码我们大概知道了rx_dropped
是linux中的缓存区空间不足导致的丢包,rx_missed_errors
这一项没有具体说明,但是查了以下,rx_missed_errors
应该是fifo队列(rx_ring_buffer)满而丢弃的数量,其实和rx_fifo_errors
是等同的,对于ixgb是等同的(参考链接:stackexchager和 example)
[root@node-2 ~]# ethtool -S enp6s0f0
NIC statistics:
rx_packets: 65383120287
tx_packets: 69346021992
rx_bytes: 77463498567689
tx_bytes: 85363688445519
rx_pkts_nic: 65383120059
tx_pkts_nic: 69346021992
rx_bytes_nic: 78053991601893
tx_bytes_nic: 85918747980773
lsc_int: 4
tx_busy: 0
non_eop_descs: 0
rx_errors: 0
tx_errors: 0
rx_dropped: 0
tx_dropped: 0
multicast: 542040597
broadcast: 462600757
rx_no_buffer_count: 0
collisions: 0
rx_over_errors: 0
rx_crc_errors: 0
rx_frame_errors: 0
hw_rsc_aggregated: 0
hw_rsc_flushed: 0
fdir_match: 63740301136
fdir_miss: 1271748787
fdir_overflow: 10379
rx_fifo_errors: 0
rx_missed_errors: 105216339
tx_aborted_errors: 0
tx_carrier_errors: 0
tx_fifo_errors: 0
tx_heartbeat_errors: 0
tx_timeout_count: 0
可以发现我们的丢包主要是来自于rx_missed_errors: 105216339
,说明丢包是因为ring_buffer(fifo)满导致的,但奇怪的是fifo那一项没有统计数字,ixgbe驱动没有对这一项进行统计,我们继续看一下代码,了解一下网卡初始化及其接收数据包的过程。
网卡设备初始化
ixgbe_probe
除了必要的PCI的初始化工作,还会作如下的工作:
- 注册
struct net_device_ops
,网卡的操作函数; ethtool
的操作函数;- 从网卡获取默认的MAC address;
- 其他一些工作;
下面我们详细看一下。
struct net_device_ops
struct net_device_ops
包含了很多控制网卡设备的操作函数(drivers/net/ethernet/intel/ixgbe/ixgbe_main.c
)。
10717 static int ixgbe_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
/* [previous][next][first][last][top][bottom][index][help] */
10718 {
/*...*/
netdev->netdev_ops = &ixgbe_netdev_ops;
10356 static const struct net_device_ops ixgbe_netdev_ops = {
10357 .ndo_open = ixgbe_open,
10358 .ndo_stop = ixgbe_close,
10359 .ndo_start_xmit = ixgbe_xmit_frame,
10360 .ndo_set_rx_mode = ixgbe_set_rx_mode,
10361 .ndo_validate_addr = eth_validate_addr,
10362 .ndo_set_mac_address = ixgbe_set_mac,
10363 .ndo_change_mtu = ixgbe_change_mtu,
10364 .ndo_tx_timeout = ixgbe_tx_timeout,
10365 .ndo_set_tx_maxrate = ixgbe_tx_maxrate,
10366 .ndo_vlan_rx_add_vid = ixgbe_vlan_rx_add_vid,
10367 .ndo_vlan_rx_kill_vid = ixgbe_vlan_rx_kill_vid,
10368 .ndo_do_ioctl = ixgbe_ioctl,
10369 .ndo_set_vf_mac = ixgbe_ndo_set_vf_mac,
10370 .ndo_set_vf_vlan = ixgbe_ndo_set_vf_vlan,
10371 .ndo_set_vf_rate = ixgbe_ndo_set_vf_bw,
/*...*/
ethtool注册
ethtool
是通过系统调用ioctl
和设备驱动通信。网卡设备驱动注册一些函数用来执行ethtool
的命令(drivers/net/ethernet/intel/ixgbe/ixgbe_ethtool.c
)
10717 static int ixgbe_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
/* [previous][next][first][last][top][bottom][index][help] */
10718 {
/*...*/
ixgbe_set_ethtool_ops(netdev);
3445 static const struct ethtool_ops ixgbe_ethtool_ops = {
3446 .get_drvinfo = ixgbe_get_drvinfo,
3447 .get_regs_len = ixgbe_get_regs_len,
3448 .get_regs = ixgbe_get_regs,
3449 .get_wol = ixgbe_get_wol,
3450 .set_wol = ixgbe_set_wol,
3451 .nway_reset = ixgbe_nway_reset,
3452 .get_link = ethtool_op_get_link,
3453 .get_eeprom_len = ixgbe_get_eeprom_len,
3454 .get_eeprom = ixgbe_get_eeprom,
3455 .set_eeprom = ixgbe_set_eeprom,
3456 .get_ringparam = ixgbe_get_ringparam,
3457 .set_ringparam = ixgbe_set_ringparam,
3458 .get_pauseparam = ixgbe_get_pauseparam,
3459 .set_pauseparam = ixgbe_set_pauseparam,
3460 .get_msglevel = ixgbe_get_msglevel,
IRQ
当一个数据被DMA写到内存中,网卡怎么通知内核数据已经到达呢?
传统情况,网卡会产生一个中断(IRQ)表明接收到数据。一般有三个类型的中断:MSI-X、MSI和legacy IRQs。当数据已通过DMA写入RAM时,生成IRQ的设备非常简单,但是如果有大量数据帧到达,则可能导致生成大量IRQ。生成的IRQ越多,可用于其他任务(如用户进程)的CPU时间就越少,为优化这一问题,提出了一个NAPI(New Api)。
NAPI
NAPI作为减少数据包到达时网络设备生成的IRQ数量的机制。虽然NAPI减少了IRQ的数量,但它还是建立在中断机制上面,所以无法消除中断。
NAPI在几个重要方面不同于传统的数据收集方法。 NAPI允许设备驱动程序注册轮询功能,NAPI子系统将调用该轮询功能来收集数据帧。
NAPI在网络设备驱动程序中的用途如下:
- 驱动会打开NAPI,但是默认是关闭的;
- 数据到达并由NIC DMA存储到内存中;
- NIC会生成一个IRQ,它会在驱动程序中触发IRQ处理程序;
- 驱动程序使用softirq唤醒NAPI子系统。这将通过在单独的执行线程中调用驱动程序的注册
poll
函数来开始收集数据包; - 驱动程序应禁用来自NIC的其他IRQ。这样做是为了允许NAPI子系统在不中断设备的情况下处理数据包;
- 在收包完成后,NAPI子系统将被禁用,设备的IRQ将被重新启用;
- 返回步骤2。
使用NAPI与传统的方法相比,这种收集数据的方法减少了CPU的开销,因为一次可以处理很多数据。NAPI初始化(ixgbe driver)
ixgbe有如下的调用链:
ixgbe_probe --->ixgbe_init_interrupt_scheme --->ixgbe_set_interrupt_capability(adapter); --->ixgbe_alloc_q_vectors --->ixgbe_alloc_q_vector --->netif_napi_add
上面的调用链做了如下一些事情:
ixgbe_set_interrupt_capability
函数里面设置interrupt mode,会调用pci_enable_msix_range
或者pci_enable_msi
打开MSIX或者MSI;- 调用
ixgbe_alloc_q_vector
为tx和rx队列ring分配内存; -
每次对
ixgbe_alloc_q_vector
的调用都会调用netif_napi_add
来注册该队列的轮询函数以及struct napi_struct
实例,该实例获取数据包时传递给poll;829 static int ixgbe_alloc_q_vector(struct ixgbe_adapter *adapter, /* [previous][next][first][last][top][bottom][index][help] */ 830 int v_count, int v_idx, 831 int txr_count, int txr_idx, 832 int xdp_count, int xdp_idx, 833 int rxr_count, int rxr_idx) 834 { 835 struct ixgbe_q_vector *q_vector; 836 struct ixgbe_ring *ring; 837 int node = NUMA_NO_NODE; 838 int cpu = -1; 839 int ring_count; 840 u8 tcs = adapter->hw_tcs; 855 /* allocate q_vector and rings */ 856 q_vector = kzalloc_node(struct_size(q_vector, ring, ring_count), 857 GFP_KERNEL, node); 858 if (!q_vector) 859 q_vector = kzalloc(struct_size(q_vector, ring, ring_count), 860 GFP_KERNEL); /*....*/ 874 /* initialize NAPI */ 875 netif_napi_add(adapter->netdev, &q_vector->napi, 876 ixgbe_poll, 64); /*....*/
上面的代码是用于接收队列的分配内存,并向NAPI注册了函数
ixgbe_poll
。它提供了与此新创建的RX队列(上面的&q_vector-napi
)相关联的struct napi_struct
的引用。当需要从此RX队列中收集数据包时,由NAPI调用时,它将被传递到ixgbe_poll
中。启动网络设备
当一个网络设备被bring up起来之后(例如:ifconfig eth0 up),会调用
ndo_open
成员的函数。
这个ixgbe_open
函数一般会做如下事情:- 分配tx、rx的队列buffer分配内存(和上面不同);
- 根据中断模式注册中断函数;
- 使能NAPI;
- 使能硬件中断;
-
等等其他事情;
准备从网络接收数据
一般情况下,网卡都是通过DMA将数据直接写到内存中,系统可以对其的数据进行处理,为此一般都ring buffer来实现。但是这样存在一个问题,就是如果数据包速率远远大于单个cpu的处理能力,就会出现丢包的问题,为了解决此问题,现在的网卡从硬件层面和软件层面解决此问题,RSS( Receive Side Scaling)和RPS(Receive Packet Steering),大概是根据包的五元组计算的hash值,将数据包分配到不同的cpu中,但是RSS并不是所有的网卡都支持该功能。
我们使用的intel 82599网卡支持多对列功能,在ixgbe的驱动里面,当网卡up起来的时候调用ixgbe_setup_all_rx_resources --->ixgbe_setup_rx_resources
6511 int ixgbe_setup_rx_resources(struct ixgbe_adapter *adapter, /* [previous][next][first][last][top][bottom][index][help] */ 6512 struct ixgbe_ring *rx_ring) 6513 { 6514 struct device *dev = rx_ring->dev; 6515 int orig_node = dev_to_node(dev); 6516 int ring_node = -1; 6517 int size; 6518 6519 size = sizeof(struct ixgbe_rx_buffer) * rx_ring->count; 6520 6521 if (rx_ring->q_vector) 6522 ring_node = rx_ring->q_vector->numa_node; 6523 6524 rx_ring->rx_buffer_info = vmalloc_node(size, ring_node); 6525 if (!rx_ring->rx_buffer_info) 6526 rx_ring->rx_buffer_info = vmalloc(size); 6527 if (!rx_ring->rx_buffer_info) 6528 goto err; /*...*/
228 struct ixgbe_rx_buffer { 229 struct sk_buff *skb; 230 dma_addr_t dma; 231 union { 232 struct { 233 struct page *page; 234 __u32 page_offset; 235 __u16 pagecnt_bias; 236 }; 237 struct { 238 void *addr; 239 u64 handle; 240 }; 241 }; 242 };
调用ixgbe_setup_rx_resources
为每个RX queue分配一个可以DMA的内存,网卡设备可以把输入数据写入其中。
注册中断函数
一般设备可以使用不同的方法来发出中断信号:MSI-X、MSI、legacy interrupt。使用什么中断模型取决于硬件是否支持,对于多对列网卡MSI-X是最优的选择,我们看一下代码:
3364 static int ixgbe_request_irq(struct ixgbe_adapter *adapter)
/* [previous][next][first][last][top][bottom][index][help] */
3365 {
3366 struct net_device *netdev = adapter->netdev;
3367 int err;
3368
3369 if (adapter->flags & IXGBE_FLAG_MSIX_ENABLED)
3370 err = ixgbe_request_msix_irqs(adapter);
3371 else if (adapter->flags & IXGBE_FLAG_MSI_ENABLED)
3372 err = request_irq(adapter->pdev->irq, ixgbe_intr, 0,
3373 netdev->name, adapter);
3374 else
3375 err = request_irq(adapter->pdev->irq, ixgbe_intr, IRQF_SHARED,
3376 netdev->name, adapter);
3377
3378 if (err)
3379 e_err(probe, "request_irq failed, Error %d\n", err);
3380
3381 return err;
3382 }
上面的代码可以看出ixgbe的driver对应MSI-X、MSI、legacy interrupt:
MSI-X --> ixgbe_request_msix_irqs
MSI --> ixgbe_intr
legacy interrupt --> ixgbe_intr
使能NAPI
当网络设备up起来后,驱动会是能NAPI,前面我们看到NAPI如何注册poll
,但是一般NAPI的使能是在网卡up的时候。
在调用ixgbe_up_complete
函数的时候会调用ixgbe_napi_enable_all
4992 static void ixgbe_napi_enable_all(struct ixgbe_adapter *adapter)
/* [previous][next][first][last][top][bottom][index][help] */
4993 {
4994 int q_idx;
4995
4996 for (q_idx = 0; q_idx < adapter->num_q_vectors; q_idx++)
4997 napi_enable(&adapter->q_vector[q_idx]->napi);
4998 }
使能中断
到这里基本都设置好了,只剩下使能中断等待收包,ixgbe
驱动里面调用ixgbe_irq_enable
使能中断。
软中断(softirq)
什么是softirq
Linux内核中的softirq是一种用于在驱动程序中实现的中断处理程序的上下文之外执行代码的机制。这种机制很重要,因为在执行中断处理程序的全部或部分过程中可能会禁用硬件中断。禁用的中断时间越长,事件丢失的机会就越大。因此,重要的是将所有长时间运行的操作推迟到中断处理程序之外,以便它可以尽快完成并重新启用设备中断。
可以将softirq系统想象为一系列内核线程(每个CPU一个),这些线程运行已为不同softirq事件注册的中断处理函数。我们查看top命令下面的ksoftirqd/0
,这就是运行在CPU 0上的softirq内核线程。
注册软中断处理函数
可以通过调用open_softirq
函数来注册softirq的中断函数,例如在net_dev_init
函数中
9558 open_softirq(NET_TX_SOFTIRQ, net_tx_action);
9559 open_softirq(NET_RX_SOFTIRQ, net_rx_action);
ksoftirqd
看一下ksoftirqd的初始化代码:
724 static struct smp_hotplug_thread softirq_threads = {
725 .store = &ksoftirqd,
726 .thread_should_run = ksoftirqd_should_run,
727 .thread_fn = run_ksoftirqd,
728 .thread_comm = "ksoftirqd/%u",
729 };
上面代码可以看出在struct smp_hotplug_thread
有两个函数被注册ksoftirqd_should_run
、和run_ksoftirqd
。在kernel/smpboot.c
的smpboot_thread_fn
函数首先调用ksoftirqd_should_run
检测是否还有需要处理的softirq,如果有调用run_ksoftirqd
,最终执行__do_softirq
函数。
106 static int smpboot_thread_fn(void *data)
/* [previous][next][first][last][top][bottom][index][help] */
107 {
108 struct smpboot_thread_data *td = data;
109 struct smp_hotplug_thread *ht = td->ht;
110
111 while (1) {
/*...*/
158 if (!ht->thread_should_run(td->cpu)) {
159 preempt_enable_no_resched();
160 schedule();
161 } else {
162 __set_current_state(TASK_RUNNING);
163 preempt_enable();
164 ht->thread_fn(td->cpu);
165 }
/*...*/
}
646 static void run_ksoftirqd(unsigned int cpu)
/* [previous][next][first][last][top][bottom][index][help] */
647 {
648 local_irq_disable();
649 if (local_softirq_pending()) {
650 /*
651 * We can safely run softirq on inline stack, as we are not deep
652 * in the task stack here.
653 */
654 __do_softirq();
655 local_irq_enable();
656 cond_resched();
657 return;
658 }
659 local_irq_enable();
660 }
__do_softirq
__do_softirq
函数处理了如下函数:
- 确定那个softirq需要处理;
- 关于softirq的时间统计和次数统计;
- 执行待处理的中断函数;
291 trace_softirq_entry(vec_nr); 292 h->action(h); 293 trace_softirq_exit(vec_nr);
总结一下初始化softirq如下:
spawn_ksoftirqd ---> smpboot_register_percpu_thread ---> __smpboot_create_thread ---> kthread_create_on_cpu
在
kthread_create_on_cpu
回调thread_fn
函数,ksoftirqd进程在loop里面执行之前注册的run_ksoftirqd函数;
接收数据包的流程
接收数据包的过程比较复杂,但大致需要如下几个步骤:
- NIC接收数据包;
- 将数据包从网卡的硬件buffer拷贝到系统内存中;
- 硬件发送中断通知内核处理;
- 执行网络设备驱动注册的中断处理函数;
- 调用
napi_schedule
函数添加驱动的poll函数; - 设置softieq的pending位,通知ksoftirqd处理数据;
- 运行__do_softirq执行收包函数
net_rx_action
;
网络数据处理开始阶段
net_rx_action
--->ixgbe_poll
--->ixgbe_clean_rx_irq
--->ixgbe_rx_skb
--->napi_gro_receive
--->napi_skb_finish
net_rx_action
6137 static __latent_entropy void net_rx_action(struct softirq_action *h)
/* [previous][next][first][last][top][bottom][index][help] */
6138 {
6139 struct softnet_data *sd = this_cpu_ptr(&softnet_data);
6140 unsigned long time_limit = jiffies +
6141 usecs_to_jiffies(netdev_budget_usecs);
6142 int budget = netdev_budget;
/*...*/
6149
6150 for (;;) {
6151 struct napi_struct *n;
6152
6153 if (list_empty(&list)) {
6154 if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
6155 goto out;
6156 break;
6157 }
6158
6159 n = list_first_entry(&list, struct napi_struct, poll_list);
6160 budget -= napi_poll(n, &repoll);
6161
6162 /* If softirq window is exhausted then punt.
6163 * Allow this to run for 2 jiffies since which will allow
6164 * an average latency of 1.5/HZ.
6165 */
6166 if (unlikely(budget <= 0 ||
6167 time_after_eq(jiffies, time_limit))) {
6168 sd->time_squeeze++;
6169 break;
6170 }
6171 }
6172
/*...*/
6182 out:
6183 __kfree_skb_flush();
6184 }
napi_skb_finish
5441 static gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb)
/* [previous][next][first][last][top][bottom][index][help] */
5442 {
5443 switch (ret) {
5444 case GRO_NORMAL:
5445 if (netif_receive_skb_internal(skb))
5446 ret = GRO_DROP;
5447 break;
5448
5449 case GRO_DROP:
5450 kfree_skb(skb);
5451 break;
5452
5453 case GRO_MERGED_FREE:
5454 if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)
5455 napi_skb_free_stolen_head(skb);
5456 else
5457 __kfree_skb(skb);
5458 break;
5459
5460 case GRO_HELD:
5461 case GRO_MERGED:
5462 case GRO_CONSUMED:
5463 break;
5464 }
5465
5466 return ret;
5467 }
主要做下面几件事情:
- 检查NAPI poll list
- 执行ixgbe注册的ixgbe_poll函数;
- 检查budget和已用时间,确保cpu不会被长时间占用;(budget = 300和两个时间片)
- 通过
napi_gro_receive
来处理接收的数据,数据保留给GRO,否么调用netif_receive_skb_internal
函数将数据直接传递给协议栈;
下面我们主要看一下netif_receive_skb_internal
网络数据处理中间阶段
netif_receive_skb_internal
5036 static int netif_receive_skb_internal(struct sk_buff *skb) /* [previous][next][first][last][top][bottom][index][help] */ 5037 { 5038 int ret; 5039 5040 net_timestamp_check(netdev_tstamp_prequeue, skb); 5041 5042 if (skb_defer_rx_timestamp(skb)) 5043 return NET_RX_SUCCESS; 5044 5045 if (static_branch_unlikely(&generic_xdp_needed_key)) { 5046 int ret; 5047 5048 preempt_disable(); 5049 rcu_read_lock(); 5050 ret = do_xdp_generic(rcu_dereference(skb->dev->xdp_prog), skb); 5051 rcu_read_unlock(); 5052 preempt_enable(); 5053 5054 if (ret != XDP_PASS) 5055 return NET_RX_DROP; 5056 } 5057 5058 rcu_read_lock(); 5059 #ifdef CONFIG_RPS 5060 if (static_key_false(&rps_needed)) { 5061 struct rps_dev_flow voidflow, *rflow = &voidflow; 5062 int cpu = get_rps_cpu(skb->dev, skb, &rflow); 5063 5064 if (cpu >= 0) { 5065 ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail); 5066 rcu_read_unlock(); 5067 return ret; 5068 } 5069 } 5070 #endif 5071 ret = __netif_receive_skb(skb); 5072 rcu_read_unlock(); 5073 return ret; 5074 }
网络处理函数
netif_receive_skb_internal
,根据RPS是否开启主要有两个处理路径:
RPS关闭情况:netif_receive_skb_internal --->__netif_receive_core --->__netif_receive_skb_core
__netif_receive_core
根据type类型例如TAP设备或者协议栈分发到不同的地方
RPS打开情况:netif_receive_skb_internal --->enqueue_to_backlog() --->__netif_receive_skb
- 数据被传递到
enqueue_to_backlog
,数据被放在每个cpu的queue里面; - 通过IPI唤醒remote cpu的softirq的进程,运行预先注册的poll函数,这里面的poll函数是
process_backlog
; __netif_receive_core
根据type类型例如TAP设备或者协议栈分发到不同的地方
- 数据被传递到
协议栈和用户空间
ip_rcv
upd_rcv
udp_queue_rcv_skb
参考链接:
https://blog.packagecloud.io/eng/2016/10/11/monitoring-tuning-linux-networking-stack-receiving-data-illustrated/
https://tech.meituan.com/2018/03/16/redis-high-concurrency-optimization.html