混合编程之多线程


前言: 小白一个, 没有系统性的学习过python网络通信,只是单纯的cv加修改代码,仅作留念以及参考用,感谢互联网博主们和bito插件,使得chatGPT得以免费使用. 另外该多线程传输图片的速度比没有多线程执行还满,后续不对python服务端做优化,而改为C++服务端实现.写出来继续再分享把

前篇博客地址

#python客户端

采用生存者消费者模式模式2joinablequeue库. 客户端实现还是比较简单的,麻烦在server端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import pickle
import time
from multiprocessing import Process, JoinableQueue
from queue import Queue

from multiprocessing.connection import Client, Listener

from client_apart import draw_box
from image import plot_boxes, img_encode
import os
from natsort import ns, natsorted

host = 'localhost'
port = 9006
total_time = 0


def img_product(img_queue, path, path_mode='image'):
    if path_mode == 'image':
        img = img_encode(path)
        img_obj = {'frame_num': 1, 'image': img}  # need frame_num?
        img_queue.put(img_obj)
    elif path_mode == 'dir':
        dir_list = os.listdir(path)
        files = natsorted(dir_list, alg=ns.PATH)  # 顺序读取文件名
        i = 1
        for filename in files:
            img_path = path + '/' + filename
            img = img_encode(img_path)
            img_obj = {'frame_num': i, 'image': img}  # need frame_num?
            i += 1
            img_queue.put(img_obj)
        img_queue.put({'frame_num': 0, 'image': "end"})  # end signal 
    img_queue.join()


def server_consumer(img_queue):
    # 1. send data
    while True:
        img_obj = img_queue.get()
        if img_obj is None:
            client.close()  # avoid connection-reset-by-peer
            break  # exit end
        data_bytes = pickle.dumps(img_obj)
        start = int(round(time.time() * 1000))
        start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        client.send(data_bytes)  # 40ms/per send img
        # print('send cost time: ', (end - start))
        img_queue.task_done()
        try:
            det_result = client.recv()
            end = int(round(time.time() * 1000))
            end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            print('recv cost time: ', (end-start))
        except EOFError:
            break
        det_result = pickle.loads(det_result)
        draw_box(det_result, img_obj)


if __name__ == '__main__':
    client = Client((host, port))
    Listener()
    img_dir = './data'
    one_img = './data/Enterprise001.jpg'
    mode = 'dir'

    img_jq = JoinableQueue()
    producer = Process(target=img_product, args=(img_jq, img_dir, mode,))
    consumer = Process(target=server_consumer, args=(img_jq,))
    consumer.daemon = True  # set daemon but not set join()

    producer.start()
    consumer.start()

    producer.join()

#Python服务端

此处接受图片和发送结果线程处在一个类内,使得可以共享一个Listener,来自chatGPT的idea.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from ctypes import *
import ctypes
import time
import pickle
from PHeader import *

import cv2
import numpy as np
from multiprocessing.connection import Listener
from multiprocessing import JoinableQueue, Process, Queue, connection
import threading


class ListenerThread(threading.Thread):
    def __init__(self, address, data_handler, lib):
        super().__init__()
        self.daemon = True
        self.address = address
        self.data_handler = data_handler
        self.lib = lib
        self.listener = Listener(self.address, backlog=5)
        self.conn = None
        self.is_running = True
        
    def run(self):
        print('Listening on', self.listener.address)
        while self.is_running:
            try:
                self.conn = self.listener.accept()  # ready to accept data continually
                print('Connected by', self.listener.last_accepted)
                t1 = threading.Thread(target=self.receive_data)
                t2 = threading.Thread(target=self.send_result)
                t1.start()
                t2.start()
                t1.join()
                t2.join()
            except OSError as e:
                if e.errno != 98:  # Address already in use
                    raise
                print('Address already in use, retrying in 1 second...')
                time.sleep(1)
        

    def destroy_Model(self):  # when qt send a signal
        self.lib.interface_destoryThread()

    def receive_data(self):
        time_cost1 = 0
        while True:
            try:
                start = int(round(time.time() * 1000))
                received_bytes = self.conn.recv()  # recv Client data
                end = int(round(time.time() * 1000))
                print('recv time cost: ', end-start)
                time_cost1 += end-start
                img_dict = pickle.loads(received_bytes)
                if img_dict["frame_num"] == 0:
                    print("receive's thread already receive all data, close thread!!")
                    print("recv: ", time_cost1)
                    self.lib.interface_setEmptyFlag()  # make send thread break
                    break
                img_dict['image'] = cv2.imdecode(img_dict['image'], cv2.IMREAD_COLOR)
                self.data_handler.sendImgtoC(self.lib, img_dict, 0)  # prepare to send img
            except EOFError:
                print('Connection closed')
                self.conn.close()
                break

    def send_result(self):
        time_cost1 = 0
        time_cost2 = 0
        while True:
            self.lib.interface_getClsQueue.restype = ObjClassifyOutput
            start = int(round(time.time() * 1000))
            output = self.lib.interface_getClsQueue()  # get result from model
            end = int(round(time.time() * 1000))
            time_cost1 += end-start
            print('get cls time cost: ', end-start)
            if output.object_list.object_num >= 0:
                cls_result = self.data_handler.CtoP(output)
                cls_result = pickle.dumps(cls_result)
                start = int(round(time.time() * 1000))
                self.conn.send(cls_result)
                end = int(round(time.time() * 1000))
                print('send time cost: ', end-start)
                time_cost2 += end-start
            elif output.object_list.object_num == -1:   # queue is empty for now
                time.sleep(0.04)
                continue
            elif output.object_list.object_num == -2:   # all data is classify
                print("send's thread alreay handle all data, close thread!!")
                print("cls: ", time_cost1, ' send: ', time_cost2)
                # self.close()
                break

    def close(self):  # useless for now
        self.conn.close()
        # self.listener.close()
        self.run()


class DataHandler:
    def __init__(self):
        self.data = None

    def CtoP(self, output):  # 将模型结果解析为python列表
        # [cv_object_list: [cv_object: [cv_box: [] ]]]
        cv_object_list = []
        cls_out = []
        obj_list = output.object_list
        if obj_list.object_num != 0:
            for i in range(obj_list.object_num):
                cv_box = []
                cv_object = []
                obj = obj_list.object[i]
                # bbox
                cv_box.append(obj.bbox.left_top_x)
                cv_box.append(obj.bbox.left_top_y)
                cv_box.append(obj.bbox.w)
                cv_box.append(obj.bbox.h)
                cv_object.append(cv_box)
                # classes/objectness/prob
                cv_object.append(obj.classes)
                cv_object.append(obj.objectness)
                prob = POINTER(c_float)(obj.prob)
                cv_object.append(prob.contents.value)
                # cv_object
                cv_object_list.append(cv_object)
            cv_object_list.append(obj_list.object_num)

            # cv_object_list
            cls_out.append(cv_object_list)
        return cls_out

    def sendImgtoC(self, lib, img_dict, i):
        lib.interface_receive.argtypes = [PythonMat]
        # 1. combine info to img
        pi = PythonMat()
        pi.frame_num[0] = img_dict["frame_num"]
        img = img_dict['image']
        # 1.1 set width/height
        PARAM = c_int * 32
        height = PARAM()
        height[0] = img.shape[0]
        pi.height = height
        width = PARAM()
        width[0] = img.shape[1]
        pi.width = width
        # 1.2 set Mat
        frame_data = np.asarray(img, dtype=np.uint8)
        frame_data = frame_data.ctypes.data_as(c_char_p)
        pi.frame[0] = frame_data
        # 2. send img to detect model
        lib.interface_receive(pi)


if __name__ == '__main__':
    address = ('localhost', 9006)
    data_handler = DataHandler()
    ll = ctypes.cdll.LoadLibrary
    lib = ll("../../lib/libDetClsController.so")  # create a C lib
    listener_thread = ListenerThread(address, data_handler, lib)
    listener_thread.start()
    try:
        det_process = threading.Thread(target=lib.interface_initDetectImage)
        cls_process = threading.Thread(target=lib.interface_initClassifyImage)
        det_process.start()
        cls_process.start()
        det_process.join()  # need a break signal
        cls_process.join()
    except KeyboardInterrupt:
        # 程序被强制关闭
        print('Program terminated')
        # 关闭ListenerThread对象
        listener_thread.is_running = False
        listener_thread.join()
    finally:
        # 关闭ListenerThread对象
        listener_thread.is_running = False
        listener_thread.join()

#C++模型调用

此处涉及其他人员写的代码,做修改处理

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
typedef struct 
{
int width[CV_MAX_BATCH_SIZE];
int height[CV_MAX_BATCH_SIZE];
char* frame[CV_MAX_BATCH_SIZE]; 
int frame_num[CV_MAX_BATCH_SIZE]; 
}PythonMat;

// 多线程控制相关
mutex mtxQueueDet;  // mutex for detect queue
mutex mtxQueueImg;  // mutex for image queue
mutex mtxQueueCls;  // mutex for classify queue
mutex mtxif;
queue<> queueDetOut;// Det output queue
queue<> queueClsOut;// Det classify queue
queue<cv::Mat> queueMat;
bool DetectFlag = true;
bool ClassifyFlag = true;
bool empty_flag = false;

void receiveImg(PythonMat &img)
{
    cv::Mat frame(img.height[0], img.width[0], CV_8UC3, img.frame[0]);

    mtxQueueImg.lock();
    queueMat.push(frame);
    cout << "frame num: "<<  img.frame_num[0] << endl;
    mtxQueueImg.unlock();
}

void DetectImage()
{
    Detect detect_output;
    Detect detectmodel;
    detectmodel.Init(config_path, 1);
    cv::Mat frame;
    while(1)
        { 
            if (queueMat.empty()) 
            {
                if(!DetectFlag)
                {
                    break;
                }
                usleep(2000);
                continue;
            }
            mtxQueueImg.lock();
            frame = queueMat.front();
            queueMat.pop();
            mtxQueueImg.unlock();

            detect_output = detectmodel.Run(detect_input);
            
            // lock
            mtxQueueDet.lock();
            queueDetOut.push(detect_output);
            cout << "detect run !!" << endl;
            mtxQueueDet.unlock();
        }
    return;
}

void ClassifyImage()
{
    Classify objclassify;
    objclassify.Init(config_path, 1);
    ClassifyInput input;
    ClassifyOutput output;
    cv::Mat frame;
    while(1)
    {
        if (queueDetOut.empty()) 
        {
            if(!ClassifyFlag)
            {
                break;
            }
			usleep(2000);
            continue;
		}
        
        mtxQueueDet.lock();
        detect_result = queueDetOut.front();
        queueDetOut.pop();
        mtxQueueDet.unlock();
        
        output = objclassify.Run(input);
        
        mtxQueueCls.lock();
        queueClsOut.push(output);
        mtxQueueCls.unlock();
    }
    return;
}


ClassifyOutput getClsQueue(){
    ObjClassifyOutput output;
    if (queueClsOut.empty()){
        usleep(2000);
        output.object_list.object_num = -1;  // -1 is now empty; -2 is all empty 
        if (empty_flag){
            output.object_list.object_num = -2;
            empty_flag = false;
        }
        return output;
    }
    mtxQueueCls.lock();
    output = queueClsOut.front();
    queueClsOut.pop();
    cout << "cls_out pop " << output.object_list.object_num << endl;
    mtxQueueCls.unlock();
    return output;
    
}

extern "C" {
int i = 0;
void interface_initDetectImage(){
    // if (flag ) exit thread detect/classify 
    DetectImage();
}
void interface_initClassifyImage(){
    ClassifyImage();
}
void interface_receive(PythonMat &img){
    printf("run %d times\n", i++);
    receiveImg(img); 
}
void interface_setEmptyFlag(){
    empty_flag = true;
}
void testThread(){
    printf("C++ method!!\n");
}
ClassifyOutput interface_getClsQueue(){
    ClassifyOutput output;
    output = getClsQueue();
    return output;
}

void interface_destoryThread(){
    DetectFlag = false;
    ClassifyFlag = false;
}
}

至此这个混合编程项目结束,但是server接收到client耗时居然300ms,需要优化...

#参考博客

  1. 共享内存
  2. Address already in use
  3. connection-reset-by-peer 我是把client.close()加上就不报错
  4. (最终版)linux下python和c++相互调用共享内存通信
  5. 常见python通信错误
    1. EOFError
    2. OSError: handle is closed
    3. Broken pipe