混合编程python与C++


上个版本: 只是用到ctypes进行传输, 这次将python服务端更改为C++服务端,方便后续维护. 本文实现功能: python传输图片给C++, C++接受图片后对图片进行处理,并将结果返回给python客户端, pass image from python to C++

#C++ 服务端

#.h文件

注意文中的model

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// .h
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <signal.h>
#include <opencv2/opencv.hpp>

using namespace std;
using namespace cv;

class ModelManager;

class ServerManager
{
private:
	int m_port;
	char *m_addr;
	cv::VideoCapture m_cap;
	int m_server;
	int m_accept; // client conn
public:
	bool initialization(const int &port, const cv::VideoCapture &cap, char *addr = nullptr);
	bool initialization(const int &port, char *addr = nullptr);
	bool build_connect();
	bool acceptClient();
	void error_print(const char *ptr);
	bool free_connect();

	bool send_data_frame(ModelManager& model);
	bool receive_data_frame(cv::Mat &frame, ModelManager& model);
};

#.cpp文件

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#include "ServerManager.h"
#include "ModelManager.h"
#define BUFFER_SIZE 65538

void ServerManager::error_print(const char * ptr) {
        perror(ptr);
        exit(EXIT_FAILURE);
}

bool ServerManager::initialization(const int& port, const cv::VideoCapture& cap, char* addr){
    m_port = htons(port);
	m_addr = addr;
	m_cap = cap;
	return true;
}

bool ServerManager::initialization(const int& port, char* addr){
    m_port = htons(port);
    m_addr = addr;
	return true;
}

bool ServerManager::build_connect() {
	struct sockaddr_in server_addr;
    bzero(&server_addr,sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_addr.s_addr = m_addr?inet_addr(m_addr):INADDR_ANY;  
	server_addr.sin_port = m_port;
	// create socket 
	m_server = socket(AF_INET, SOCK_STREAM, 0);
	if(m_server < 0)
        error_print("socket bind error");
	// can reuse port
    int on = 1;
    if(setsockopt(m_server,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)) < 0)
        error_print("setsockopt error");
    // bind addr
    if(bind(m_server, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0)
        error_print("bind error");
    // listen only one client
    if(listen(m_server, 1) < 0)
        error_print("listen failed");
    
	cout << "ServerManager is listening, plesae wait..." << endl;
	
	return true;
}

bool ServerManager::acceptClient(){
	struct sockaddr_in accept_addr;
	socklen_t accept_len = sizeof(accept_addr);
	bzero(&accept_addr,sizeof(accept_addr));
	// accept client connection
    if((m_accept = accept(m_server,(struct sockaddr*)&accept_addr,&accept_len)) < 0)
        error_print("accept error");
	std::cout << "Connection established" << std::endl;
	return true;
}

bool ServerManager::send_data_frame(ModelManager& model) {
	char *json_output = nullptr;
	json_output = model.createJson();
	if (json_output == nullptr) {
		return false;
	}
	
	// printf("send data %s\n", json_output);
	// just send json_output, dont memcpy new char*!!! it wastes me two hours
	// send json
	int result = send(m_accept, json_output, strlen(json_output), 0);
	if (result == -1) {
		cout << "send fail" << endl;
		return false;
	}
	return true;
}


bool ServerManager::receive_data_frame(Mat& frame, ModelManager& model) {
	// recv frame size
	int data_size;
	if (recv(m_accept, &data_size, sizeof(data_size), 0) != sizeof(data_size)) {
		// when client close, then close connection
		close(m_accept);
		cout << "close connection to client" << endl;
		acceptClient(); // restart a new accept, to accept new connection
		return false;
	}
	cout << data_size << endl;
	// recv frame data
    
	// char buf[data_size];
	// std::vector<uchar> decode;
	// int bytes_received = 0;
	// do
	// {
	// 	int nBytes = recv(m_accept, buf, data_size - bytes_received, 0);
	// 	for (int i = 0; i < nBytes; i++)  // maybe can use memcpy, maybe faster
	// 	{
	// 		decode.emplace_back(buf[i]);
	// 	}
	// 	cout << bytes_received << endl;
	// 	bytes_received += nBytes;
	// } while (bytes_received < data_size);
    char *recv_char = new char[data_size];
	std::vector<uchar> decode(data_size, 0);
	int index = 0;
	int bytes_received = 0;
	int count = data_size;
	while (count > 0)// if count >= 0, dead loop
	{
		int iRet = recv(m_accept, recv_char, count, 0);
		if (index >= data_size) index = data_size;
		memcpy(&decode[index], recv_char , iRet);
		index += iRet;
		if (!iRet) { return -1; }
		count -= iRet;
	}
	// decode message
	frame = imdecode(decode, cv::IMREAD_COLOR);
	// push into Model's queueMat
	model.mtxQueueImg.lock();
	model.queueMat.push(frame);
	model.mtxQueueImg.unlock();
	return true;
}

bool ServerManager::free_connect() {
	m_cap.release();
	close(m_accept);
    close(m_server);
	return true;
}

#C++ model部分代码

#.h文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#pragma once
#include "CV_Classify.h"
#include "CV_Detect.h"
#include "ServerManager.h"
#include <opencv2/opencv.hpp>
#include <mutex>
#include <queue>
#include <unistd.h> // usleep
#include <thread>
#include "cJSON.h"
#include <string>

using namespace std;
using namespace cv;

class ModelManager{
public:
    Detect objdetect;
    Classify objclassify;
    std::mutex mtxQueueDet;  // mutex for detect queue
    std::mutex mtxQueueImg;  // mutex for image queue
    std::mutex mtxQueueCls;  // mutex for classify queue
    std::queue<cv::Mat> queueMat;
    std::queue<ObjDetectOutput> queueDetOut;// Detect queue
    std::queue<ObjClassifyOutput> queueClsOut;// Classify queue
    
    bool DetectFlag = true;
    bool ClassifyFlag = true;
    bool empty_flag = false;
    friend class ServerManager;
public:
    void initDetectModel() ;
    void initClassifyModel() ;
    void DetectImg();
    void ClassifyImg();
    void getClsResult(ObjClassifyOutput &output);
    // ObjClassifyOutput getClsResult();
    char* createJson();
};

#.cpp文件

部分有删减,createJson可参考使用,利用json来传递值

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "ModelManager.h"

void ModelManager::initDetectModel() 
{
    std::string config_path = "DetectConfig.yaml";
    objdetect.Init(config_path, 1);
}
void ModelManager::initClassifyModel() 
{
    std::string config_path = "ClassiflyConfig.yaml";
    objclassify.Init(config_path, 1);
}

void ModelManager::DetectImg()
{
    DetectInput detect_input;
    DetectOutput detect_output;
    cv::Mat frame;
    size_t mm = 0;
    while(1)
    { 
        if (queueMat.empty()) 
        {
            if(!DetectFlag)
            {
                break;
            }
			usleep(2000);
            continue;
		}
        // get image from queueMat
        mtxQueueImg.lock();
        frame = queueMat.front();
        queueMat.pop();
        mtxQueueImg.unlock();
        // run model
        objdetect.Run(detect_input, detect_output);
        // push detect result into queueDetOut
        mtxQueueDet.lock();
        queueDetOut.push(detect_output);
        // cout << "detect run !!" << endl;
        mtxQueueDet.unlock();
    }
    return;
}
void ModelManager::ClassifyImg()
{
    ObjClassifyInput input;
    ObjClassifyOutput output;
    cv::Mat frame;
    Detoutput detect_result;
    while(1)
    {
        if (queueDetOut.empty()) 
        {
            if(!ClassifyFlag)
            {
                break;
            }
			usleep(2000);
            continue;
		}
        // get detect from queueDetOut
        mtxQueueDet.lock();
        detect_result = queueDetOut.front();
        queueDetOut.pop();
        mtxQueueDet.unlock();
        // run model
        objclassify.Run(input, output);
        // push cls result into queueClsOut
        mtxQueueCls.lock();
        queueClsOut.push(output);
        mtxQueueCls.unlock();  
    }
    return;
}

void ModelManager::getClsResult(ObjClassifyOutput& output){
    if (queueClsOut.empty()){
        output.object_list.object_num = -1;  // -1 is now empty;
        return; // must return in thread otherwise cant use &output
    }
    output = queueClsOut.front();
    queueClsOut.pop();
    return;
}

char* ModelManager::createJson()  // dont know why cant use &value, need return value
{
    mtxQueueCls.lock();
    ObjClassifyOutput output;
    getClsResult(output); 
    mtxQueueCls.unlock();
	
	if (output.object_list.object_num == -1){
		return nullptr;
	}
	// prepare send data json
	cJSON* json_object_list = NULL;
	cJSON* json_ObjClassifyOutput = NULL;
	
	json_ObjClassifyOutput = cJSON_CreateObject();
	json_object_list = cJSON_CreateObject();
	cJSON_AddItemToObject(json_ObjClassifyOutput, "object_list", json_object_list);
	
	int obj_num = output.object_list.object_num;
	cJSON_AddNumberToObject(json_object_list, "object_num", obj_num);
	for (int i = 0; i < obj_num; ++i){
		cJSON* json_object = cJSON_CreateObject();
		cJSON* json_box = cJSON_CreateObject();
		cJSON_AddNumberToObject(json_box,"x", output.object_list.object[i].bbox.x);
		cJSON_AddNumberToObject(json_box,"y", output.object_list.object[i].bbox.y);
		cJSON_AddNumberToObject(json_box,"w", output.object_list.object[i].bbox.w);
		cJSON_AddNumberToObject(json_box,"h", output.object_list.object[i].bbox.h);
		cJSON_AddItemToObject(json_object,"bbox", json_box);
		cJSON_AddNumberToObject(json_object, "classes", output.object_list.object[i].classes);
		cJSON_AddNumberToObject(json_object, "objectness", output.object_list.object[i].objectness);
        // double prob = output.object_list.object[i].prob;
		// cJSON_AddNumberToObject(json_object, "prob", prob); // pointer cant use?
		string str = "object" + to_string(i);
        cJSON_AddItemToObject(json_object_list, str.c_str(), json_object);
        // printf("prob: %f", output.object_list.object[i].prob);
	}
	
	char* json_output = cJSON_Print(json_ObjClassifyOutput);
    cJSON_Delete(json_ObjClassifyOutput);
    return json_output;
}

#C++ 服务端运行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <../include/ServerManager.h>
#include <../include/ModelManager.h>
#include <thread>
#define PORT 8080
void recvServer(ServerManager& s, ModelManager& model){
    int idx = 0;
    
    while (true){
        // auto start = std::chrono::steady_clock::now();
        cv::Mat frame;
        s.receive_data_frame(frame, model);
        // cal time cost
        // auto end = std::chrono::steady_clock::now();
        // std::chrono::duration<double, std::milli> elapsed = end - start;
        // std::cout << "recv execution time: " << elapsed.count() << " ms\n";
        if (frame.empty()) {
            usleep(2000);
            continue;
        }
        // cv::imwrite("image"+to_string(idx++)+".jpg", frame);
        std::cout << "Image " << idx++ <<" received !!" << std::endl;
    }
}

void sendServer(ServerManager& s, ModelManager& model){
    while (true){
        if (s.send_data_frame(model)) {
            cout << "send success!!" << endl;
            cout << endl;
        }else{
            // cout << "send fail!!" << endl;
            usleep(2000);
        }
    }
}
 
int main()
{
    ServerManager s;
    ModelManager model;
    model.initDetectModel();
    model.initClassifyModel();
    cout << endl;
    s.initialization(PORT);
    s.build_connect();
    s.acceptClient();
    
    thread recv_server(recvServer, std::ref(s), std::ref(model));
    thread send_server(sendServer, std::ref(s), std::ref(model));
    thread detect(&ModelManager::DetectImg, &model);
    thread classfy(&ModelManager::ClassifyImg, &model);
    detect.join();
    classfy.join();
    recv_server.join();
    send_server.join();
    return 0;
}

#python客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import json
import socket
import struct
import time
from multiprocessing import JoinableQueue
from threading import Thread

import os
from natsort import ns, natsorted

host = '192.168.0.2'  # '192.168.0.2' 'localhost'
port = 8080


def img_encode(img_path):
    img = cv2.imread(img_path)
    # img = cv2.resize(img, (500, 500), interpolation=cv2.INTER_CUBIC)
    img_param = [95]  # 图片压缩率0-100
    _, img = cv2.imencode('.jpg', img, img_param)
    img = img.tobytes()
    return img


def img_product(img_queue, path, path_mode='image'):
    if path_mode == 'image':
        image = img_encode(path)
        img_queue.put(image)
    elif path_mode == 'dir':
        dir_list = os.listdir(path)
        files = natsorted(dir_list, alg=ns.PATH)  # 顺序读取文件名
        for filename in files:
            img_path = path + '/' + filename
            image = img_encode(img_path)
            img_queue.put(image)
        img_queue.put('E')
    img_queue.join()


def server_consumer(img_queue):
    while True:
        start = int(round(time.time() * 1000))
        # 1. get img from queue
        img_obj = img_queue.get()
        img_queue.task_done()
        # get end signal
        if img_obj[0] == 'E':
        client.close()
        break
        # 2. send package(img_bytes_size, img_bytes)
        pack_size = struct.pack("i", len(img_obj))
        client.send(pack_size + img_obj)
        end = int(round(time.time() * 1000))

        data = client.recv(65536)
        json_str = data.decode('utf8', 'ignore').strip(b'\x00'.decode())
        results = json.loads(json_str)
        end = int(round(time.time() * 1000))
        end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        print('send and recv cost time: ', (end - start))
        print(results)


        if __name__ == '__main__':
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect((host, port))
        img_dir = 'data'
        one_img = './data/image.jpg'
        mode = 'dir'

        img_jq = JoinableQueue()
        producer = Thread(target=img_product, args=(img_jq, img_dir, mode,))
        consumer = Thread(target=server_consumer, args=(img_jq,))
        producer.daemon = True  # set daemon but not set join()

        producer.start()
        consumer.start()

    # producer.join() // 让生产者先关闭,防止close错误
        consumer.join()

#总结

  • 其实这个项目真正做完感觉还是挺简单, 就是对socket通信不太熟悉, 以及传图片没做过.
  • 实际上传图片只需要读取图片后,imencode,然后tobytes,最后发送size和data即可.而接受端只需要拼接数组,然后imdecode即可.
  • 另外传输结果的话利用json传输可以让结果可读性可高, 传输也比较方便, 当时copy别人的发送代码, 没有细看,导致使用memcpy让json格式乱码,导致无法解码json.
  • 如果你感觉接收端没问题,一定要看看发送端.
  • 之后的新任务就是视频传输利用rtsp流,敬请期待

#参考博客