当前位置: > 质感生活
pipe是什么意思(进程间通信|IPC系列-管道|pipe)
2023-05-27 10:28:01
Mars
123
来源:Linux码农

pipe是什么意思?今天从源码的角度分析下进程间通信之管道(pipe)。

pipe是什么意思

什么是管道 ?

所谓管道,是指用于连接一个读进程和一个写进程,以实现它们之间通信的共享文件,又称 pipe 文件。

向管道(共享文件)提供输入的发送进程(即写进程),以字符流形式将大量的数据送入管道;而接收管道输出的接收进程(即读进程),可从管道中接收数据。由于发送进程和接收进程是利用管道进行通信的,故又称管道通信。

为了协调双方的通信,管道通信机制必须提供以下3 方面的协调能力。

互斥。当一个进程正在对 pipe 进行读/写操作时,另一个进程必须等待。

同步。当写(输入)进程把一定数量(如4KB)数据写入 pipe 后,便去睡眠等待,直到读(输出)进程取走数据后,再把它唤醒。当读进程读到一空 pipe 时,也应睡眠等待,直至写进程将数据写入管道后,才将它唤醒。

对方是否存在。只有确定对方已存在时,才能进行通信。

管道的应用

管道是利用 pipe() 系统调用而不是利用 open() 系统调用建立的。pipe() 调用的原型是:

int pipe(int fd[2])

我们看到,有两个文件描述符与管道结合在一起,一个文件描述符用于管道的read() 端,一个文件描述符用于管道的 write() 端。

由于一个函数调用不能返回两个值,pipe() 的参数是指向两个元素的整型数组的指针,它将由调用两个所要求的文件描述符填入。

fd[0] 元素将含有管道 read() 端的文件描述符,而 fd[1] 含有管道 write() 端的文件描述符。系统可根据 fd[0] 和 fd[1] 分别找到对应的 file 结构。

注意,在 pipe 的参数中,没有路径名,这表明,创建管道并不像创建文件一样,要为它创建一个目录连接。这样做的好处是,其他现存的进程无法得到该管道的文件描述符,从而不能访问它。

那么,两个进程如何使用一个管道来通信呢?

我们知道,fork() 和 exec() 系统调用可以保证文件描述符的复制品既可供双亲进程使用,也可供它的子女进程使用。也就是说,一个进程用 pipe() 系统调用创建管道,然后用 fork() 调用创建一个或多个进程,那么,管道的文件描述符将可供所有这些进程使用。

这里更明确的含义是:一个普通的管道仅可供具有共同祖先的两个进程之间共享,并且这个祖先必须已经建立了供它们使用的管道。

注意,在管道中的数据始终以和写数据相同的次序来进行读,这表示 lseek()系统调用对管道不起作用。

下面给出在两个进程之间设置和使用管道的简单程序:

include

include

fcntl(fd,F_SETFL,O_NONBlOCK);

上述例子如下图:

管道的源码分析

pipefs 初始化

pipefs 是一种简单的、虚拟的文件系统类型,因为它没有对应的物理设备,因此其安装时不需要块设备。大部分文件系统是以模块的形式来实现的。该文件系统相关的代码在 fs/pipe.c 中:

static struct file_system_type pipe_fs_type = {

.name = "pipefs",

.get_sb = pipefs_get_sb,

.kill_sb = kill_anon_super,

};

static int __init init_pipe_fs(void)

{

/*

pipe_fs_type 链接到file_systems 链表可以通过读/proc/filesystems 找到

“pipefs”入口点,在那里,“nodev”标志表示没有设置

FS_REQUIRES_DEV 标志,即该文件系统没有对应的物理设备。

*/

int err = register_filesystem(&pipe_fs_type);

if (!err) {

//安装pipefs 文件系统

pipe_mnt = kern_mount(&pipe_fs_type);

if (IS_ERR(pipe_mnt)) {

err = PTR_ERR(pipe_mnt);

unregister_filesystem(&pipe_fs_type);

}

}

return err;

}

//pipefs 文件系统是作为一个模块来安装的

fs_initcall(init_pipe_fs);

module_exit(exit_pipe_fs); //模块卸载函数

上述就是初始化时注册 pipefs 文件系统的过程,操作如下:

把 pipe_fs_type 链接到 file_systems 链表中。

创建一个struct vfsmount 结构,然后把该结构赋值给全局变量pipe_mnt。该 vfsmount 结构通过 super_block 与 pipefs 文件系统进行了关联。

pipe 的创建

pipefs 文件系统的入口点就是pipe()系统调用,其内核实现函数为sys_pipe(),而真正的工作是调用 do_pipe() 函数来完成的,其代码在 fs/pipe.c 中:

int do_pipe(int *fd)

{

struct file *fw, *fr;

int fdw, fdr;

//创建管道写端的file结构

fw = create_write_pipe();

//在写端的file结构基础上构建读端

fr = create_read_pipe(fw);

//创建读端fd

fdr = get_unused_fd();

//创建写端fd

fdw = get_unused_fd();

//fd 和 file进行关联

fd_install(fdr, fr);

fd_install(fdw, fw);

//返回读写端fd

fd[0] = fdr;

fd[1] = fdw;

...

return 0;

}

struct file *create_write_pipe(void)

{

...

struct qstr name = { .name = "" };

//创建 file 结构

f = get_empty_filp();

//创建一个pipe相关的 inode

inode = get_pipe_inode();

//创建一个dentry结构

dentry = d_alloc(pipe_mnt->mnt_sb->s_root, &name);

//inode 和 dentry 相关联

d_instantiate(dentry, inode);

// pipe 和 pipe_mnt 关联

f->f_path.mnt = mntget(pipe_mnt);

//file 与 dentry 相关联

f->f_path.dentry = dentry;

//该file是只写的

f->f_flags = O_WRONLY;

//该pipe的可操作方法

f->f_op = &write_pipe_fops;

...

return f;

}

struct file *create_read_pipe(struct file *wrf)

{

//创建一个file结构用于读

struct file *f = get_empty_filp();

//file 与 已有的dentry、inode、struct vfsmount 相关联

f->f_path.mnt = mntget(wrf->f_path.mnt);

f->f_path.dentry = dget(wrf->f_path.dentry);

f->f_mapping = wrf->f_path.dentry->d_inode->i_mapping;

//该file是只读的

f->f_flags = O_RDONLY;

f->f_op = &read_pipe_fops;

f->f_mode = FMODE_READ;

f->f_version = 0;

return f;

}

do_pipe() 的操作也很简单,操作如下:

创建管道的读写端关联的 file、inode、dentry 结构。

把 fd 和 file 进行联系,并返回给用户关于管道的读写描述符。

创建一个 pipe_inode_info 对象,该结构指向具体的数据页内存缓冲区。后续所有向管道的读写均操作于该数据页内存缓冲区中。

上述 pipe 的初始化中,创建的 pipe_inode_info 对象记录了 pipe 读写过程中所有的得数据。pipe_inode_info 对象的结构如下

struct pipe_inode_info {

wait_queue_head_t wait;//存储等待读写进程的等待队列

/* nrbufs: 写入但还未被读取的数据占用缓冲区的页数

curbuf:当前正在读取环形缓冲区中的页节点

*/

unsigned int nrbufs, curbuf;

struct page *tmp_page; //临时缓冲区页面

unsigned int readers; //正在读取pipe的读进程数目

unsigned int writers; //正在写pipe的写进程数目

unsigned int waiting_writers; //等待管道可以写的进程数目

...

struct inode *inode; //pipe 对应的inode结构

struct pipe_buffer bufs[PIPE_BUFFERS]; //环形缓冲区,每个元素对应一个内存页

};

经过上述的一系列初始化,整个管道的内存结构如下图所示

pipe_read

static ssize_t pipe_read(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs, loff_t pos)

{

...

//要读的数据长度

total_len = iov_length(iov, nr_segs);

do_wakeup = 0;

ret = 0;

//读之前先加锁

mutex_lock(&inode->i_mutex);

//循环读数据

for (;;) {

int bufs = pipe->nrbufs;

if (bufs) {

int curbuf = pipe->curbuf;

struct pipe_buffer *buf = pipe->bufs + curbuf;

size_t chars = buf->len;

//待读的数据比要读的数据多,则设置要读的长度

if (chars > total_len)

chars = total_len;

error = ops->confirm(pipe, buf);

if (error) {

if (!ret)

error = ret;

break;

}

atomic = !iov_fault_in_pages_write(iov, chars);

redo:

//把数据拷贝到用户缓冲区

addr = ops->map(pipe, buf, atomic);

error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars, atomic);

ops->unmap(pipe, buf, addr);

if (unlikely(error)) {

...

}

ret += chars;

buf->offset += chars;

buf->len -= chars;

//该页的数据全部读完,释放该page

if (!buf->len) {

buf->ops = NULL;

ops->release(pipe, buf);

curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);

pipe->curbuf = curbuf;

pipe->nrbufs = --bufs;

do_wakeup = 1;

}

total_len -= chars;

if (!total_len)

break; /* common path: read succeeded */

} //if (bufs)

//若该页没读完,继续循环读,若该page读完,则读下一个page

if (bufs) /* More to do? */

continue;

if (!pipe->writers)

break;

if (!pipe->waiting_writers) {

if (ret)

break;

//非阻塞跳出循环,不进行休眠

if (filp->f_flags & O_NONBLOCK) {

ret = -EAGAIN;

break;

}

}

if (signal_pending(current)) {

if (!ret)

ret = -ERESTARTSYS;

break;

}

//唤醒写进程

if (do_wakeup) {

wake_up_interruptible_sync(&pipe->wait);

kill_fasync(&pipe->fasync_writers, SIGIO,POLL_OUT);

}

//让出cpu,进行休眠,等待条件唤醒

pipe_wait(pipe);

} // for

mutex_unlock(&inode->i_mutex);

if (do_wakeup) {

wake_up_interruptible_sync(&pipe->wait);

kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);

}

if (ret > 0)

file_accessed(filp);

return ret;

}

读的操作很简单,操作如下:

读之前先锁定内存。

从缓冲区中获取数据页,把页中数据拷贝到用户缓冲区中。

数据全部被读完,则发送信号唤醒写进程,同时读进程让出 CPU 进行休眠。

读取数据的过程如下

pipe_write

static ssize_t pipe_write(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs, loff_t ppos)

{

...

//需要写入数据的长度

total_len = iov_length(iov, nr_segs);

do_wakeup = 0;

mutex_lock(&inode->i_mutex);

//管道读端已关闭,返回SIGPIPE信号

if (!pipe->readers) {

send_sig(SIGPIPE, current, 0);

ret = -EPIPE;

goto out;

}

/* We try to merge small writes

curbuf:当前的pipe缓冲节点

nrbufs:非空的pipe缓冲节点数目

buffers:buf缓冲区总数目

buf->offset:页内可用数据的偏移量

buf->len :可用数据的长度

buf->offset + buf->len :页内可以往有效数据后追加数据的下标

*/

//获取完整页之外的数量

chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */

if (pipe->nrbufs && chars != 0) {

//获取下一个可用的缓冲区,比如缓冲区是0~5, 有效数据起始buff是3,有效数据是4,那么存储buff数据依次为3,4,5,0,下一个可用的buff为1 ((3+4-1)/5)

int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) & (PIPE_BUFFERS-1);

//获取下一个可用的buff,lastbuf为获取到的索引

struct pipe_buffer *buf = pipe->bufs + lastbuf;

const struct pipe_buf_operations *ops = buf->ops;

/*offset: page中数据的偏移量。len:page中数据的长度

目前数据在page中的有效起始地址 + 有效数据长度 = 下一个可存放数据的地址。

管道是从前往后读的,并没规定读写大小,有可能只读取了page的前一部分,中间部分尚未读取,但是写的时候必须从中间有效数据后继续写

*/

int offset = buf->offset + buf->len;

//当前需要写入的数据 + 已有的数据若没有超过PAGE_SIZE大小,则拷贝到page中

if (ops->can_merge && offset + chars <= PAGE_SIZE) {

...

redo1:

addr = ops->map(pipe, buf, atomic);

//将用户数据拷贝到page中

error = pipe_iov_copy_from_user(offset + addr, iov, chars, atomic);

ops->unmap(pipe, buf, addr);

ret = error;

do_wakeup = 1;

if (error) {

...

}

//更新有效数据

buf->len += chars;

total_len -= chars;

ret = chars;

//全拷贝完则跳出

if (!total_len)

goto out;

}

}

for (;;) {

int bufs;

if (!pipe->readers) {

...

}

/*获取管道还有多少有效的buffer缓冲区*/

bufs = pipe->nrbufs;

if (bufs < PIPE_BUFFERS) { //有效的bufs小于缓冲区总数

int newbuf = (pipe->curbuf + bufs) & (PIPE_BUFFERS-1); //获取下一个可用的buf

struct pipe_buffer *buf = pipe->bufs + newbuf;

struct page *page = pipe->tmp_page;

if (!page) {

page = alloc_page(GFP_HIGHUSER);

if (unlikely(!page)) {

...

}

pipe->tmp_page = page;

}

do_wakeup = 1;

chars = PAGE_SIZE;

if (chars > total_len)

chars = total_len;

iov_fault_in_pages_read(iov, chars);

redo2:

error = pipe_iov_copy_from_user(src, iov, chars, atomic);

ret += chars;

//更新有效数据

buf->page = page;

buf->ops = &anon_pipe_buf_ops;

buf->offset = 0;

buf->len = chars;

pipe->nrbufs = ++bufs;

pipe->tmp_page = NULL;

total_len -= chars;

//数据写完,跳出循环结束

if (!total_len)

break;

}

//还有可用的缓冲区,继续写

if (bufs < PIPE_BUFFERS)

continue;

//缓冲区全部写完了,则判断是否需要阻塞休眠

if (filp->f_flags & O_NONBLOCK) {

if (!ret)

ret = -EAGAIN;

break;

}

if (signal_pending(current)) {

if (!ret)

ret = -ERESTARTSYS;

break;

}

//唤醒读进程

if (do_wakeup) {

wake_up_interruptible_sync(&pipe->wait);

kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);

do_wakeup = 0;

}

//写进程休眠

pipe->waiting_writers++;

pipe_wait(pipe);//将当前任务加入到等待列表,释放锁,让出CPU

pipe->waiting_writers--;

}

out:

...

return ret;

}

管道写操作流程如下:

写之前先锁定内存。

获取可写的缓冲区,若可写则循环写。

若不可写,则唤醒读进程,同时写进程进行休眠,让出 CPU。

往管道中写数据的过程如下

管道中最重要的2个方法就是管道的读写。从上述的分析来看,读写进程共同操作内核中的数据缓冲区,若有缓冲区可写,则进程往缓冲区中写,若条件不允许写,则进程休眠让出 CPU。读操作同理。

从上述管道读写操作可知,父子进程之所以能够通过 pipe 进行通信,是因为在内核中共同指向了同一个pipe_inode_info 对象,共同操作同一个内存页。

总结

管道也称无名管道,是 UNIX 系统中进程间通信(IPC)中的一种。

管道由于是无名管道,因此只能在有亲缘关系的进程间使用。

管道不是普通的文件,它是基于内存的。

管道属于半双工,数据只能从一方流向另一方,也即数据只能从一端写,从另一端读。

管道中读数据是一次性的操作,数据读取后就会释放空间,让出空间供更多的数据写。

管道写数据遵循先入先出的原则。

本文关键词: pipe是什么意思
本文标签: