Skip to content

LV005-消息队列

本文主要是进程通信——消息队列的相关笔记,若笔记中有错误或者不合适的地方,欢迎批评指正😃。

一、System V 消息队列

前边我们知道System V IPC经过优化出现了POSIX IPC,消息队列的话也就会有两种,这里所学习的是System V IPC的消息队列。

1. 相关概念

消息队列是System V IPC中的一种,它的底层是一个链队列,与共享内存的不同是,内核要保证消息队列的FIFO性质,因此当有多个接收方进程接收消息队列中的消息的时候,不会产生冲突,由内核来协调他们的执行顺序。由于队列性质,队尾写,队头读,所以读写也不会存在冲突。

进程间通过IPC消息队列通信时,进程产生的每条消息都被发送到一个IPC消息队列中,这个消息一直存放在队列中,直到另外一个进程将其读走,故消息只适用于两个进程间通信。消息是由固定大小的首部,该首部是一个long int 型,用来存放消息类型和可变长度的正文组成。

2. 消息队列特点

消息队列有如下特点:

  • 消息队列由消息队列ID来唯一标识。

  • 消息队列就是一个消息的列表,用户可以在消息队列中添加消息、读取消息等。

  • 消息队列可以按照类型来发送或者接收消息。

3. 消息队列在内核的形式

image-20220609181315642

4. 如何使用消息队列?

4.1 发送消息端

消息队列发送端的使用步骤一般如下:

(1)申请key,用于不同的进程的话,可以使用ftok()函数实现,若是用于具有血缘关系的进程,则也可以是IPC_PRIVATE

(2)打开或者创建消息队列,通过msgget()函数实现。

(3)向消息队列发送消息,通过msgsnd实现。

4.2 接收消息端

消息队列接收端的是哦用步骤一般如下:

(1)申请key,用于不同的进程的话,可以使用ftok()函数实现,若是用于具有血缘关系的进程,则也可以是IPC_PRIVATE。保证与发送端相同,以使用同一个IPC对象ID

(2)打开或者创建消息队列,通过msgget()函数实现。

(3)从消息队列接收消息,通过 msgrcv()函数实现。

(4)控制(删除)消息队列,通过msgctl()函数实现。

二、消息队列基本操作

1. 打开或创建消息队列

1.1 msgget()

linux下可以使用man 2 msgget命令查看该函数的帮助手册。

c
/* 需包含的头文件 */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/* 函数声明 */
int msgget(key_t key, int msgflg);

函数说明】该函数打开或者创建一个消息队列,获取消息队列的IPC对象ID

函数参数

  • keykey_t类型,通过ftok创建的key(键)值或者是IPC_PRIVATE(这样的话只能用于具有血缘关系的进程通信)。
  • shmflgint类型,共享内存标志位,使用时需要与IPC对象存取权限(如0666)进行|运算来确定读写权限。一般是需要创建共享内存时为 IPC_CREAT|0666,若不需要创建,直接打开时,使用0666即可。常用可取的值及含义:
IPC_CREAT如果共享内存不存在,则创建一个共享内存,否则打开操作。
IPC_EXCL 只有在共享内存不存在的时候,新的共享内存才建立,否则就产生错误。
如果单独使用`IPC_CREAT`,`msgget()`函数要么返回一个已经存在的消息队列的标识符,要么返回一个新建的消息队列的标识符。如果将`IPC_CREAT`和`IPC_EXCL`标志一起使用,`msgget()`将返回一个新建的消息队列的标识符;如果该消息队列已存在,就会返回错误。

返回值int类型,成功返回消息队列的标识符,也就是消息队列ID,失败返回-1,并设置errno

使用格式】一般情况下基本使用格式如下:

c
/* 需要包含的头文件 */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
/* 至少应该有的语句 */
key_t key;
int msgid;
key = ftok("./key.txt", 100);
msgid = msgget(key, IPC_CREAT|0666);

注意事项none

1.2 使用实例

暂无。

2. 发送消息

2.1 msgsnd()

linux下可以使用man 2 msgsnd命令查看该函数的帮助手册。

c
/* 需包含的头文件 */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/* 函数声明 */
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

函数说明】该函数向消息队列发送一个消息。

函数参数

  • msqidint类型,消息队列ID
  • msgpvoid *类型,要发送的消息的缓冲区地址,一般是指向一个我们自定义特定格式的结构体变量,这个结构体变量中存储的是消息的类型和消息的正文。指向的结构体一般格式如下:
c
struct msgbuf 
{
	long mtype;       /* message type, must be > 0 */
	char mtext[1];    /* message data */
};
  • msgszsize_t类型,消息正文长度。
  • msgflgint类型,一些标志位,一般的话选择0。常用可取的值及含义如下:
0当消息队列满时,msgsnd将会阻塞,直到消息能写进消息队列
IPC_NOWAIT当消息队列已满的时候,msgsnd函数不等待立即返回
IPC_NOERROR若发送的消息大于size字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程
如果队列中的可用空间不足,那么`msgsnd()`的缺省行为是阻塞,直到空间可用为止。如果在`msgflg`中指定了`IPC_NOWAIT`,那么消息队列已满的时候调用将会失败并返回错误`EAGAIN`。

返回值int类型,成功返回0,失败返回-1,并设置errno

使用格式】一般情况下基本使用格式如下:

c
/* 需要包含的头文件 */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
/* 至少应该有的语句 */
msgT msg;/* msgT 下边有介绍,是自定义的一种消息结构*/
msgsnd(msgid, &msgp, MSGLEN, 0);

注意事项

(1)消息结构必须有long类型的msg_type字段,表示消息的类型。

(2)消息长度不包括首类型 long

(3)msgsnd()解除阻塞的条件三个条件:第一,不满足消息队列满或个数满两个条件,即消息队列中有容纳该消息的空间;第二,msqid代表的消息队列被删除。第三,调用msgsnd函数的进程被信号中断。

2.2 消息格式

我们发送消息之前,需要先了解一下消息的格式,有上边的消息发送函数以及消息在内核中的存储形式,我们知道,消息有类型和正文两个属性,于是我们就可以自定义一个结构体作为消息的格式,但是我们还需要知道消息正文内容的长度,这时候可以用宏定义。

c
/* 消息格式结构体 */
typedef struct
{
	long msg_type; /* 消息类型 */
	char buf[128]; /* 消息内容 */
} msgT;
#define MSGLEN  (sizeof(msgT)-sizeof(long))

2.3 使用实例

暂无。

3. 接收消息

3.1 msgrcv()

linux下可以使用man 2 msgrcv命令查看该函数的帮助手册。

c
/* 需包含的头文件 */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

/* 函数声明 */
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

函数说明】该函数从消息队列读取一个消息。

函数参数

  • msqidint类型,消息队列ID
  • msgpvoid *类型,要发送的消息的缓冲区地址,一般是指向一个我们自定义特定格式的结构体变量,这个结构体变量中存储的是消息的类型和消息的正文。
  • msgszsize_t类型,指定要接收的消息正文长度。
  • msgtyplong类型,指定接收的消息类型。 msgtyp 取值范围的含义如下:
msgtype = 0收到的第一条消息,任意类型。
msgtype > 0收到的第一条 msg_type类型的消息。
msgtype < 0接收类型等于或者小于msgtype绝对值的第一个消息。
例如,如果`msgtype=-4`,则只接受类型是`1`、`2`、`3`、`4`的消息
  • msgflgint类型,一些标志位,一般的话选择0。常用可取的值及含义如下:
0阻塞式接收消息,没有该类型的消息msgrcv函数一直阻塞等待
IPC_NOWAIT如果没有返回条件的消息调用立即返回,此时错误码为ENOMSG
MSG_EXCEPT与msgtype配合使用返回队列中第一个类型不为msgtype的消息
【**返回值**】`ssize_t`类型,成功时返回收到的消息长度,失败返回`-1`,并设置`errno`。

使用格式】一般情况下基本使用格式如下:

c
/* 需要包含的头文件 */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
/* 至少应该有的语句 */
msgT msg;/* msgT 下边有介绍,是自定义的一种消息结构*/
msgrcv(msgid, &msg, MSGLEN, 0, 0);/* 阻塞式接收任意类型消息 */

注意事项

(1)消息结构必须有long类型的msg_type字段,表示消息的类型。

(2)消息长度不包括首类型 long

3.2 消息格式

我们读取的消息格式自然要与发送的消息格式相同啦。

c
/* 消息格式结构体 */
typedef struct
{
	long msg_type; /* 消息类型 */
	char buf[128]; /* 消息内容 */
} msgT;
#define MSGLEN  (sizeof(msgT)-sizeof(long))

3.3 使用实例

暂无。

4.  消息控制

4.1 msgctl()

linux下可以使用man 2 msgctl命令查看该函数的帮助手册。

c
/* 需包含的头文件 */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
/* 函数声明 */
int msgctl(int msqid, int cmd, struct msqid_ds *buf);

函数说明】该函数用于控制消息队列,例如,修改一个消息队列控制权限,删除一个消息队列。

函数参数

  • msqidint类型,消息队列ID
  • cmdint类型,表示要执行的操作。cmd 常用值及含义如下:
IPC_STAT把msqid_ds结构中的数据设置为消息队列的当前关联值,和*buf参数配合获取msqid消息队列信息
IPC_SET 在进程有足够权限的前提下,把消息队列的当前关联值设置为msqid_ds数据结构中给的值
IPC_RMID删除消息队列
- `buf`:`struct msqid_ds`类型结构体指针变量,指向存储消息队列相关信息的结构体,一般是设置为`NULL`。 struct msqid_ds 定义如下:
c
struct msqid_ds
{
	struct ipc_perm msg_perm;   /* Ownership and permissions */
	time_t          msg_stime;  /* Time of last msgsnd(2) */
	time_t          msg_rtime;  /* Time of last msgrcv(2) */
	time_t          msg_ctime;  /* Time of creation or last
                                              modification by msgctl() */
	unsigned long   msg_cbytes; /* # of bytes in queue */
	msgqnum_t       msg_qnum;   /* # number of messages in queue */
	msglen_t        msg_qbytes; /* Maximum # of bytes in queue */
	pid_t           msg_lspid;  /* PID of last msgsnd(2) */
	pid_t           msg_lrpid;  /* PID of last msgrcv(2) */
};

返回值int类型,成功时,cmdIPC_STAT, IPC_SET或者 IPC_RMID 返回0,失败返回-1,并设置errno

使用格式】一般情况下基本使用格式如下:

c
/* 需要包含的头文件 */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
/* 至少应该有的语句 */
msgT msg;/* msgT 下边有介绍,是自定义的一种消息结构*/
msgrcv(msgid, &msg, MSGLEN, 0, 0);/* 阻塞式接收任意类型消息 */

注意事项】该函数的返回值还有其他的情况,只是我们一般是使用上边写出的三个参数,需要使用其他参数的话,可以查看帮助手册。

4.2 使用实例

暂无。

三、消息队列实例

1. msg_send.c

c
/* 头文件 */
#include <stdio.h>  /* perror sprintf*/
#include <sys/ipc.h>/* msgget msgsnd ftok */
#include <sys/msg.h>/* msgget msgsnd */
#include <string.h> /* strcpy */
#include <stdlib.h> /* system */

/* 消息格式结构体 */
typedef struct
{
	long msg_type; /* 消息类型 */
	char buf[128]; /* 消息内容 */
} msgT;
#define MSGLEN  (sizeof(msgT)-sizeof(long))

/* 主函数 */
int main(int argc, char *argv[])
{
	int ret;
	key_t key;
	int msgid;
	msgT msg;

	int i = 0;
	char buff[50];

	/* 1. 生成key */
	key = ftok("./key.txt", 100);
	if(key < 0)
	{
		perror("ftok");
		return -1;
	}
	/* 2. 打开或创建消息队列 */
	msgid = msgget(key, IPC_CREAT|0666);
	if(msgid < 0)
	{
		perror("msgget");
		return -1;
	}
	/* 3. 发送消息 */
	for(i = 0; i < 6; i++)/* 发送5 */
	{
		if(i > 3)
		{
			msg.msg_type = 3;/* 消息类型 */
		}
		else
		{
			msg.msg_type = 1;/* 消息类型 */
		}
		sprintf(buff, "This message type is %d!", (int)msg.msg_type);
		
		strcpy(msg.buf, buff);/* 消息内容 */
		ret = msgsnd(msgid, &msg, MSGLEN, 0);
		if(ret < 0)
		{
			perror("msgsnd");
			return -1;
		}
		printf("Send message [%s] success!\n", msg.buf);
	}
	system("ipcs -q");
	return 0;
}

2. msg_receive.c

c
/* 头文件 */
#include <stdio.h>  /* perror */
#include <sys/ipc.h>/* msgget msgsnd ftok */
#include <sys/msg.h>/* msgget msgsnd */
#include <string.h> /* strcpy */
#include <stdlib.h> /* system */

/* 消息格式结构体 */
typedef struct
{
	long msg_type; /* 消息类型 */
	char buf[128]; /* 消息内容 */
} msgT;
#define MSGLEN  (sizeof(msgT)-sizeof(long))

/* 主函数 */
int main(int argc, char *argv[])
{
	int ret;
	int count = 0;
	key_t key;
	int msgid;
	msgT msg;

	/* 1. 生成key */
	key = ftok("./key.txt", 100);
	if(key < 0)
	{
		perror("ftok");
		return -1;
	}
	/* 2. 打开或创建消息队列 */
	msgid = msgget(key, IPC_CREAT|0666);
	if(msgid < 0)
	{
		perror("msgget");
		return -1;
	}
	/* 3. 接收消息 */
	while(1)
	{
		ret = msgrcv(msgid, &msg, MSGLEN, 0, 0);/* 阻塞式接收任意类型消息 */
		if(ret < 0)
		{
			perror("msgrcv");
			return -1;
		}
		printf("[count=%d]receive message type=%d,buf=%s\n", count, (int)msg.msg_type, msg.buf);
		count++;
		if(count > 5)
		{
			break;
		}
	}
	system("ipcs -q");
	/* 4. 删除消息队列 */
	printf("\n==============Delete=================\n");
	ret = msgctl(msgid, IPC_RMID, NULL);
	if(ret < 0)
	{
		perror("msgctl");
		return -1;
	}
	system("ipcs -q");
	return 0;
}

3. makefile

makefile
CC = gcc

DEBUG = -g -O2 -Wall
CFLAGS += $(DEBUG)

# 所有.c文件去掉后缀
TARGET_LIST = ${patsubst %.c, %, ${wildcard *.c}} 

all : $(TARGET_LIST)

%.o : %.c
	$(CC) $(CFLAGS) -c $< -o $@

.PHONY: all clean clean_o
clean : clean_o
	@rm -vf $(TARGET_LIST) 
	
clean_o :
	@rm -vf *.o

4. 编译与测试

在终端执行以下命令编译程序:

shell
make # 生成可执行文件
./msg_send        # 向消息队列发送消息
./msg_receive     # 从消息队列读取消息

然后,终端会有以下信息显示:

shell
# 执行 ./msg_send
Send message [This message type is 1!] success!
Send message [This message type is 1!] success!
Send message [This message type is 1!] success!
Send message [This message type is 1!] success!
Send message [This message type is 3!] success!
Send message [This message type is 3!] success!

--------- 消息队列 -----------
        msqid      拥有者  权限     已用字节数 消息      
0x643d0b4e 5          hk         666        768          6

# 执行 ./msg_receive
[count=0]receive message type=1,buf=This message type is 1!
[count=1]receive message type=1,buf=This message type is 1!
[count=2]receive message type=1,buf=This message type is 1!
[count=3]receive message type=1,buf=This message type is 1!
[count=4]receive message type=3,buf=This message type is 3!
[count=5]receive message type=3,buf=This message type is 3!

--------- 消息队列 -----------
        msqid      拥有者  权限     已用字节数 消息      
0x643d0b4e 5          hk         666        0            0           


==============Delete=================

--------- 消息队列 -----------
        msqid      拥有者  权限     已用字节数 消息