Program Listing for File connection.h#
↰ Return to documentation for file (ipc_shared_mem\connection.h)
#ifndef CHANNEL_H
#define CHANNEL_H
#define ENABLE_REPORTING 0
#define ENABLE_DECOUPLING 0
#if ENABLE_REPORTING > 0
#define ENABLE_TRACE 1
#endif
#include <thread>
#include <algorithm>
#include "in_process_mem_buffer.h"
#include "shared_mem_buffer_posix.h"
#include "shared_mem_buffer_windows.h"
#include <interfaces/ipc.h>
#include <interfaces/process.h>
#include <support/interface_ptr.h>
#include <support/local_service_access.h>
#include <support/component_impl.h>
#include <queue>
#include <list>
#include "../../global/trace.h"
#ifdef _MSC_VER
#pragma comment(lib, "Ws2_32.lib")
#endif
// Forward declaration
class CWatchDog;
class CConnection : public std::enable_shared_from_this<CConnection>, public sdv::IInterfaceAccess, public sdv::IObjectDestroy,
public sdv::ipc::IDataSend, public sdv::ipc::IConnect
{
public:
CConnection(CWatchDog& rWatchDog, uint32_t uiSize, const std::string& rssName, bool bServer);
CConnection(CWatchDog& rWatchDog, const std::string& rssConnectionString);
virtual ~CConnection();
BEGIN_SDV_INTERFACE_MAP()
SDV_INTERFACE_ENTRY(sdv::ipc::IDataSend)
SDV_INTERFACE_ENTRY(sdv::ipc::IConnect)
SDV_INTERFACE_ENTRY(sdv::IObjectDestroy)
END_SDV_INTERFACE_MAP()
std::string GetConnectionString();
virtual bool SendData(/*inout*/ sdv::sequence<sdv::pointer<uint8_t>>& seqData) override;
virtual bool AsyncConnect(/*in*/ sdv::IInterfaceAccess* pReceiver) override;
virtual bool WaitForConnection(/*in*/ uint32_t uiWaitMs) override;
virtual void CancelWait() override;
// Suppress cppcheck warning. The destructor calls Disconnect without dynamic binding. This is correct so.
// cppcheck-suppress virtualCallInConstructor
virtual void Disconnect() override;
virtual uint64_t RegisterStatusEventCallback(/*in*/ sdv::IInterfaceAccess* pEventCallback) override;
virtual void UnregisterStatusEventCallback(/*in*/ uint64_t uiCookie) override;
virtual sdv::ipc::EConnectStatus GetStatus() const override;
virtual void DestroyObject() override;
void SetStatus(sdv::ipc::EConnectStatus eStatus);
bool IsServer() const;
#ifdef TIME_TRACKING
std::chrono::high_resolution_clock::time_point GetLastSentTime() const { return m_tpLastSent; }
std::chrono::high_resolution_clock::time_point GetLastReceiveTime() const { return m_tpLastReceived; }
std::chrono::duration<double> GetLargestReceiveLoopDuration() const { return m_durationLargestDeltaReceived; }
#endif
private:
#if ENABLE_REPORTING > 0
template <typename... TArgs>
void Trace(TArgs... tArgs) const
{
return ::Trace("this=", static_cast<const void*>(this), " ", tArgs...);
}
#endif
enum class EMsgType : uint32_t
{
sync_request = 0,
sync_answer = 1,
connect_request = 10,
connect_answer = 11,
connect_term = 90,
data = 0x10000000,
data_fragment = 0x10000001,
};
struct SMsgHdr
{
uint32_t uiVersion;
EMsgType eType;
};
struct SConnectMsg : SMsgHdr
{
sdv::process::TProcessID tProcessID;
};
struct SFragmentedMsgHdr : SMsgHdr
{
uint32_t uiTotalLength;
uint32_t uiOffset;
};
struct SEventCallback
{
uint64_t uiCookie = 0;
sdv::ipc::IConnectEventCallback* pCallback = nullptr;
};
CWatchDog& m_rWatchDog;
sdv::CLifetimeCookie m_cookie = sdv::CreateLifetimeCookie();
CSharedMemBufferTx m_sender;
CSharedMemBufferRx m_receiver;
std::thread m_threadReceive;
std::atomic<sdv::ipc::EConnectStatus> m_eStatus = sdv::ipc::EConnectStatus::uninitialized;
sdv::ipc::IDataReceiveCallback* m_pReceiver = nullptr;
std::shared_mutex m_mtxEventCallbacks;
std::list<SEventCallback> m_lstEventCallbacks;
mutable std::mutex m_mtxSend;
std::mutex m_mtxConnect;
std::condition_variable m_cvConnect;
std::condition_variable m_cvStartConnect;
bool m_bStarted = false;
bool m_bServer = false;
#if ENABLE_DECOUPLING > 0
std::mutex m_mtxReceive;
std::queue<sdv::sequence<sdv::pointer<uint8_t>>> m_queueReceive;
std::thread m_threadDecoupleReceive;
std::condition_variable m_cvReceiveAvailable;
std::condition_variable m_cvReceiveProcessed;
#endif
#ifdef TIME_TRACKING
std::chrono::high_resolution_clock::time_point m_tpLastSent{};
std::chrono::high_resolution_clock::time_point m_tpLastReceived{};
std::chrono::duration<double> m_durationLargestDeltaReceived;
#endif
uint32_t Send(const void* pData, uint32_t uiDataLength);
template <typename T>
bool Send(const T& rt)
{
return Send(&rt, sizeof(rt)) == sizeof(rt);
}
void ReceiveMessages();
class CMessage : public CAccessorRxPacket
{
public:
CMessage(CAccessorRxPacket&& rPacket);
~CMessage();
bool IsValid() const;
SMsgHdr GetMsgHdr() const;
SConnectMsg GetConnectHdr() const;
SFragmentedMsgHdr GetFragmentedHdr() const;
//union
//{
// SMsgHdr sMsgHdr; ///< Current message header
// SConnectMsg sConnectHdr; ///< Connect header
// SFragmentedMsgHdr sFragmentHdr; ///< Fragment header
// uint8_t rgData[std::max(sizeof(sConnectHdr), sizeof(sFragmentHdr))];
//};
//uint32_t uiSize = 0; ///< Complete size of the message (incl. size of the header)
//uint32_t uiOffset = 0; ///< Current read offset within the message. Complete message when offset == size.
void PrintHeader(const CConnection& rConnection) const;
};
struct SDataContext
{
uint32_t uiTotalSize = 0;
uint32_t uiCurrentOffset = 0;
size_t nChunkIndex = 0;
uint32_t uiChunkOffset = 0;
sdv::sequence<sdv::pointer<uint8_t>> seqDataChunks;
};
uint32_t ReadDataTable(CMessage& rMessage, SDataContext& rsDataCtxt);
bool ReadDataChunk(CMessage& rMessage, uint32_t uiOffset, SDataContext& rsDataCtxt);
#if ENABLE_DECOUPLING > 0
void DecoupleReceive();
#endif
void ReceiveSyncRequest(const CMessage& rMessage);
void ReceiveConnectRequest(const CMessage& rMessage);
void ReceiveSyncAnswer(const CMessage& rMessage);
void ReceiveConnectAnswer(const CMessage& rMessage);
void ReceiveConnectTerm(CMessage& rMessage);
void ReceiveDataMessage(CMessage& rMessage, SDataContext& rsDataCtxt);
void ReceiveDataFragementMessage(CMessage& rMessage, SDataContext& rsDataCtxt);
};
#endif // !define CHANNEL_H