多线程编程-通过fork方式创建任务进程

3/8/2017来源:ASP.NET技巧人气:438

背景

在多进程中需要创建一个任务进程进行待处理任务的分发工作。也就是说,需要在主进程中detach剥离出一个进程,且该子进程独立于主进程,进行独立的任务发送。如果是单纯在主进程中fork一个子进程的话,如果进行了wait等待操作,那么该任务进程就失去了独立性,无法实现边取任务边分发任务。而如果是不进行wait等待操作的话,那么子进程可能也会执行后续创建多个处理任务的进程,使得出现的并行进程数量为预期的double。当然这也是可以通过在并发多进程wait之前对任务分配进程进行wait操作来达到预期要求。本文采取两种方式来实现任务分配进程和任务处理进程的共存。

方案1:

双fork方式,以孙进程作为detach主进程的任务分配进程,由于该进程是游离于主进程的,所以其回收是通过系统实现的。

代码

void childPRocnew_hash_test() { struct msgstrunew_hash_align msgs; int msgid,ret_value; char str[512]; /* First, we set up the message queue. */ // msgid = msgget((key_t)MSGKEY, 0666 | ipC_CREAT);//该键值则唯一对应一个消息队列 while(1) { msgid = msgget(MSGKEY,IPC_EXCL );/*检查消息队列是否存在 */ if(msgid < 0){ printf("msq not existed! errno=%d [%s]\n",errno,strerror(errno)); exit(EXIT_FAILURE); } /*接收消息队列*/ ret_value = msgrcv(msgid,&msgs,sizeof(struct msgstrunew_hash_align),0,0); std::cout<<"get task pid="<<getpid()<<","<<msgs.msgtext<<std::endl; usleep(100); if(ret_value == -1) { fprintf(stderr, "msgrcv failed with error: %d\n", errno);//消息队列中的信息被取完?? exit(EXIT_FAILURE);//消息队列为空的时候,就跳出。也可以设计成,消息队列为空时,不跳出,而是等待。 } else { std::string hashline = msgs.msgtext;//接收到的是一行的信息,需要将该行的数据进行切分 // std::cout<<hashline.c_str()<<std::endl;// if (hashline.size()<10) { printf("datasize=%d,data=%s\n",hashline.size(),hashline.c_str()); exit(EXIT_SUCCESS);//换成break的效果呢???是不一样的啊 } } //因为在send的时候,只send了一个end,当该标志信息被读取之后,其他的进程自然是读取不到信息的, } return; } void SendHashLine() { struct msgstrunew_hash_align msgs; int msg_type; char str[80]; int ret_value; int msqid; msqid=msgget(MSGKEY,IPC_EXCL ); /*检查消息队列是否存在*/ if(msqid < 0){ msqid = msgget(MSGKEY,IPC_CREAT|0666);/*创建消息队列*/ if(msqid <0){ printf("failed to create msq | errno=%d [%s]\n",errno,strerror(errno)); exit(EXIT_FAILURE); } } ifstream m_InStream("hash_list.txt"); string oneline; int running=1; int linenun=0; while (running) { if(!getline(m_InStream, oneline,'\n')) { running = 0; for(int i = 0; i<ChildNum;++i) { msgs.msgtype = linenun; snprintf(msgs.msgtext,sizeof(msgs.msgtext),"%s","nil");//strncpy(response.m_ClientIp, request.m_ClientIp, sizeof(response.m_ClientIp));//记录客户端的ip,昵称服务需要 // msgs.msgtext = "nil"; // strcpy(msgs.msgtext, str); /* 发送消息队列 */ std::cout<<"task deliver pid="<<getpid()<<","<<msgs.msgtext<<std::endl; ret_value = msgsnd(msqid,&msgs,sizeof(struct msgstrunew_hash_align),0);//消息队列标识符,准备发现信息的指针,信息的长度,控制标志位 if ( ret_value < 0 ) { printf("msgsnd() write msg failed,errno=%d[%s]\n",errno,strerror(errno)); exit(EXIT_FAILURE); } } break; } linenun++; strncpy(str,oneline.c_str(),80); msgs.msgtype = linenun; // msgs.msgtext = str; strncpy(msgs.msgtext,str,sizeof(msgs.msgtext)); // strcpy(msgs.msgtext, str); /* 发送消息队列 */ std::cout<<"task deliver pid="<<getpid()<<","<<msgs.msgtext<<std::endl; int test_len = sizeof(struct msgstrunew_hash_align); ret_value = msgsnd(msqid,&msgs,sizeof(struct msgstrunew_hash_align),0);//消息队列标识符,准备发现信息的指针,信息的长度,控制标志位 // sleep(1); if ( ret_value < 0 ) { printf("msgsnd() write msg failed,errno=%d[%s]\n",errno,strerror(errno)); exit(EXIT_FAILURE); } } } void MultiOldAligndata_test() { //先用一个进程进行任务的创建,即用一个进程做消息队列的创建 pid_t fpid; //fpid表示fork函数返回的值 int count=0; fpid=fork(); if (fpid < 0) printf("error in fork!"); else if (fpid == 0) { printf("i am the child process, my process id is %d\n",getpid()); printf("子进程\n"); count++; pid_t fpid_2; fpid_2 = fork(); if(fpid_2 <0) { printf("error in fork_second!\n"); } else if(fpid_2 == 0) { //该孙进程处理自己想要做的事情,即做任务的分配 SendHashLine();// } exit(EXIT_SUCCESS); } else { printf("i am the parent process, my process id is %d\n",getpid()); count++; } int status_1 =0; int mpid_1 =0; mpid_1 = wait(&status_1); //基于消息队列的多进程版本----------begin---------------------------- time_t start,stop; start = time(NULL); int data_processed; string some_data; int i,cpid; // /* create 5 child process */ for (i=0;i<ChildNum;i++){ cpid = fork(); if (cpid < 0) printf("fork failed\n"); else if (cpid ==0) /*child process*/ { childprocnew_hash_test(); } } int status =0; int mpid =0; for(int i=0;i<ChildNum;i++) { mpid = wait(&status); printf("pid[%d] is exit with status[%d]\n",mpid,status); } stop = time(NULL); std::cout<<"cost:"<<double(stop-start)<<std::endl; //基于消息队列的多进程版本----------end----------------------------

如果采用主进程直接fork一个子进程作为任务分配进程的话,该任务分配子进程也会执行后期的任务处理进程的多进程创建,使得并行的进程数是预期的double。 且由于主进程的等待操作并没有考虑到这多出来的一倍进程,导致这部分进程没有退出,僵死状态。同时,任务分配进程也没有退出。所以此时通过ps命令查看,发现进程数为7(=5并行进程+2(任务分配进程和主进程)) 并行执行过程输出界面信息: 这里写图片描述 ps查看进程情况: 可以查看对应的进程号是与上面多出来的一倍进程相对应的。 这里写图片描述

方案2:

通过主进程fork一个子进程作为任务分配进程。主进程创建多个并发的任务处理进程。且在主进程wait各个任务处理进程之前进行一次任务进程的wait操作,使得任务进程得以回收,再进程并发进程的wait操作。

代码:

//exit with status void childprocnew_hash_test() { struct msgstrunew_hash_align msgs; int msgid,ret_value; char str[512]; /* First, we set up the message queue. */ // msgid = msgget((key_t)MSGKEY, 0666 | IPC_CREAT);//该键值则唯一对应一个消息队列 while(1) { msgid = msgget(MSGKEY,IPC_EXCL );/*检查消息队列是否存在 */ if(msgid < 0){ printf("msq not existed! errno=%d [%s]\n",errno,strerror(errno)); exit(EXIT_FAILURE); } /*接收消息队列*/ ret_value = msgrcv(msgid,&msgs,sizeof(struct msgstrunew_hash_align),0,0); std::cout<<"get task pid="<<getpid()<<","<<msgs.msgtext<<std::endl; usleep(100); if(ret_value == -1) { fprintf(stderr, "msgrcv failed with error: %d\n", errno);//消息队列中的信息被取完?? exit(EXIT_FAILURE);//消息队列为空的时候,就跳出。也可以设计成,消息队列为空时,不跳出,而是等待。 } else { std::string hashline = msgs.msgtext;//接收到的是一行的信息,需要将该行的数据进行切分 if (hashline.size()<10) { printf("datasize=%d,data=%s\n",hashline.size(),hashline.c_str()); exit(EXIT_SUCCESS);//换成break的效果呢???是不一样的啊 } } } return; } void SendHashLine() { struct msgstrunew_hash_align msgs; int msg_type; char str[80]; int ret_value; int msqid; msqid=msgget(MSGKEY,IPC_EXCL ); /*检查消息队列是否存在*/ if(msqid < 0){ msqid = msgget(MSGKEY,IPC_CREAT|0666);/*创建消息队列*/ if(msqid <0){ printf("failed to create msq | errno=%d [%s]\n",errno,strerror(errno)); exit(EXIT_FAILURE); } } ifstream m_InStream("hash_list.txt"); string oneline; int running=1; int linenun=0; while (running) { if(!getline(m_InStream, oneline,'\n')) { running = 0; for(int i = 0; i<ChildNum;++i) { msgs.msgtype = linenun; snprintf(msgs.msgtext,sizeof(msgs.msgtext),"%s","nil"); /* 发送消息队列 */ std::cout<<"task deliver pid="<<getpid()<<","<<msgs.msgtext<<std::endl; ret_value = msgsnd(msqid,&msgs,sizeof(struct msgstrunew_hash_align),0); if ( ret_value < 0 ) { printf("msgsnd() write msg failed,errno=%d[%s]\n",errno,strerror(errno)); exit(EXIT_FAILURE); } } break;//可以对比下述两种退出方式 //exit(EXIT_SUCCESS);//EXIT_SUCCESS } linenun++; SplitString(oneline.c_str(),oneline.size(),"\t",scidlist); strncpy(str,oneline.c_str(),80); msgs.msgtype = linenun; strncpy(msgs.msgtext,str,sizeof(msgs.msgtext)); /* 发送消息队列 */ std::cout<<"task deliver pid="<<getpid()<<","<<msgs.msgtext<<std::endl; int test_len = sizeof(struct msgstrunew_hash_align); ret_value = msgsnd(msqid,&msgs,sizeof(struct msgstrunew_hash_align),0); if ( ret_value < 0 ) { printf("msgsnd() write msg failed,errno=%d[%s]\n",errno,strerror(errno)); exit(EXIT_FAILURE); } } } void MultiOldAligndata_test() { pid_t fpid; int count=0; fpid=fork(); if (fpid < 0) printf("error in fork!"); else if (fpid == 0) { printf("i am the child process, my process id is %d\n",getpid()); printf("子进程\n"); SendHashLine();//任务分配进程 } else { printf("i am the parent process, my process id is %d\n",getpid()); count++; //基于消息队列的多进程版本----------begin---------------------------- time_t start,stop; start = time(NULL); int data_processed; string some_data; int i,cpid; // /* create 5 child process */ for (i=0;i<ChildNum;i++){ cpid = fork(); if (cpid < 0) printf("fork failed\n"); else if (cpid ==0) /*child process*/ { childprocnew_hash_test(); } } //需要等待任务分配的进程 int status_1 =0; int mpid_1 =0; mpid_1 = wait(&status_1); int status =0; int mpid =0; for(int i=0;i<ChildNum;i++) { mpid = wait(&status); printf("pid[%d] is exit with status[%d]\n",mpid,status); } stop = time(NULL); std::cout<<"cost:"<<double(stop-start)<<std::endl; //基于消息队列的多进程版本----------end---------------------------- } // int status_1 =0; // int mpid_1 =0; // mpid_1 = wait(&status_1);//不等待的话??? // sleep(10);//不加sleep操作的话则任务进程会先退出???因为任务进程最先执行结束 }