Program Listing for File connection.h#

Return to documentation for file (ipc_shared_mem\connection.h)

#ifndef CHANNEL_H
#define CHANNEL_H

#define ENABLE_REPORTING 0

#define ENABLE_DECOUPLING 0

#if ENABLE_REPORTING > 0
#define ENABLE_TRACE 1
#endif

#include <thread>
#include <algorithm>
#include "in_process_mem_buffer.h"
#include "shared_mem_buffer_posix.h"
#include "shared_mem_buffer_windows.h"
#include <interfaces/ipc.h>
#include <interfaces/process.h>
#include <support/interface_ptr.h>
#include <support/local_service_access.h>
#include <support/component_impl.h>
#include <queue>
#include <list>
#include "../../global/trace.h"

#ifdef _MSC_VER
#pragma comment(lib, "Ws2_32.lib")
#endif

// Forward declaration
class CWatchDog;

class CConnection : public std::enable_shared_from_this<CConnection>, public sdv::IInterfaceAccess, public sdv::IObjectDestroy,
    public sdv::ipc::IDataSend, public sdv::ipc::IConnect
{
public:
    CConnection(CWatchDog& rWatchDog, uint32_t uiSize, const std::string& rssName, bool bServer);

    CConnection(CWatchDog& rWatchDog, const std::string& rssConnectionString);

    virtual ~CConnection();

    BEGIN_SDV_INTERFACE_MAP()
        SDV_INTERFACE_ENTRY(sdv::ipc::IDataSend)
        SDV_INTERFACE_ENTRY(sdv::ipc::IConnect)
        SDV_INTERFACE_ENTRY(sdv::IObjectDestroy)
    END_SDV_INTERFACE_MAP()

    std::string GetConnectionString();

    virtual bool SendData(/*inout*/ sdv::sequence<sdv::pointer<uint8_t>>& seqData) override;

    virtual bool AsyncConnect(/*in*/ sdv::IInterfaceAccess* pReceiver) override;

    virtual bool WaitForConnection(/*in*/ uint32_t uiWaitMs) override;

    virtual void CancelWait() override;

    // Suppress cppcheck warning. The destructor calls Disconnect without dynamic binding. This is correct so.
    // cppcheck-suppress virtualCallInConstructor
    virtual void Disconnect() override;

    virtual uint64_t RegisterStatusEventCallback(/*in*/ sdv::IInterfaceAccess* pEventCallback) override;

    virtual void UnregisterStatusEventCallback(/*in*/ uint64_t uiCookie) override;

    virtual sdv::ipc::EConnectStatus GetStatus() const override;

    virtual void DestroyObject() override;

    void SetStatus(sdv::ipc::EConnectStatus eStatus);

    bool IsServer() const;

#ifdef TIME_TRACKING
    std::chrono::high_resolution_clock::time_point GetLastSentTime() const { return m_tpLastSent; }

    std::chrono::high_resolution_clock::time_point GetLastReceiveTime() const { return m_tpLastReceived; }

    std::chrono::duration<double> GetLargestReceiveLoopDuration() const { return m_durationLargestDeltaReceived; }
#endif

private:
#if ENABLE_REPORTING > 0
    template <typename... TArgs>
    void Trace(TArgs... tArgs) const
    {
        return ::Trace("this=", static_cast<const void*>(this), " ", tArgs...);
    }
#endif

    enum class EMsgType : uint32_t
    {
        sync_request = 0,
        sync_answer = 1,
        connect_request = 10,
        connect_answer = 11,
        connect_term = 90,
        data = 0x10000000,
        data_fragment = 0x10000001,
    };

    struct SMsgHdr
    {
        uint32_t    uiVersion;
        EMsgType    eType;
    };

    struct SConnectMsg : SMsgHdr
    {
        sdv::process::TProcessID    tProcessID;
    };

    struct SFragmentedMsgHdr : SMsgHdr
    {
        uint32_t            uiTotalLength;
        uint32_t            uiOffset;
    };

    struct SEventCallback
    {
        uint64_t                            uiCookie = 0;
        sdv::ipc::IConnectEventCallback*    pCallback = nullptr;
    };

    CWatchDog&                              m_rWatchDog;
    sdv::CLifetimeCookie                    m_cookie = sdv::CreateLifetimeCookie();
    CSharedMemBufferTx                      m_sender;
    CSharedMemBufferRx                      m_receiver;
    std::thread                             m_threadReceive;
    std::atomic<sdv::ipc::EConnectStatus>   m_eStatus = sdv::ipc::EConnectStatus::uninitialized;
    sdv::ipc::IDataReceiveCallback*         m_pReceiver = nullptr;
    std::shared_mutex                       m_mtxEventCallbacks;
    std::list<SEventCallback>               m_lstEventCallbacks;
    mutable std::mutex                      m_mtxSend;
    std::mutex                              m_mtxConnect;
    std::condition_variable                 m_cvConnect;
    std::condition_variable                 m_cvStartConnect;
    bool                                    m_bStarted = false;
    bool                                    m_bServer = false;
#if ENABLE_DECOUPLING > 0
    std::mutex                              m_mtxReceive;
    std::queue<sdv::sequence<sdv::pointer<uint8_t>>> m_queueReceive;
    std::thread                             m_threadDecoupleReceive;
    std::condition_variable                 m_cvReceiveAvailable;
    std::condition_variable                 m_cvReceiveProcessed;
#endif
#ifdef TIME_TRACKING
    std::chrono::high_resolution_clock::time_point  m_tpLastSent{};
    std::chrono::high_resolution_clock::time_point  m_tpLastReceived{};
    std::chrono::duration<double>           m_durationLargestDeltaReceived;
#endif

    uint32_t Send(const void* pData, uint32_t uiDataLength);

    template <typename T>
    bool Send(const T& rt)
    {
        return Send(&rt, sizeof(rt)) == sizeof(rt);
    }

    void ReceiveMessages();

    class CMessage : public CAccessorRxPacket
    {
    public:
        CMessage(CAccessorRxPacket&& rPacket);

        ~CMessage();

        bool IsValid() const;

        SMsgHdr GetMsgHdr() const;

        SConnectMsg GetConnectHdr() const;

        SFragmentedMsgHdr GetFragmentedHdr() const;

        //union
        //{
        //    SMsgHdr                 sMsgHdr;        ///< Current message header
        //    SConnectMsg             sConnectHdr;    ///< Connect header
        //    SFragmentedMsgHdr       sFragmentHdr;   ///< Fragment header
        //    uint8_t                 rgData[std::max(sizeof(sConnectHdr), sizeof(sFragmentHdr))];
        //};
        //uint32_t                uiSize = 0;     ///< Complete size of the message (incl. size of the header)
        //uint32_t                uiOffset = 0;   ///< Current read offset within the message. Complete message when offset == size.

        void PrintHeader(const CConnection& rConnection) const;
    };

    struct SDataContext
    {
        uint32_t        uiTotalSize = 0;
        uint32_t        uiCurrentOffset = 0;
        size_t          nChunkIndex = 0;
        uint32_t        uiChunkOffset = 0;
        sdv::sequence<sdv::pointer<uint8_t>> seqDataChunks;
    };

    uint32_t ReadDataTable(CMessage& rMessage, SDataContext& rsDataCtxt);

    bool ReadDataChunk(CMessage& rMessage, uint32_t uiOffset, SDataContext& rsDataCtxt);

#if ENABLE_DECOUPLING > 0
    void DecoupleReceive();
#endif

    void ReceiveSyncRequest(const CMessage& rMessage);

    void ReceiveConnectRequest(const CMessage& rMessage);

    void ReceiveSyncAnswer(const CMessage& rMessage);

    void ReceiveConnectAnswer(const CMessage& rMessage);

    void ReceiveConnectTerm(CMessage& rMessage);

    void ReceiveDataMessage(CMessage& rMessage, SDataContext& rsDataCtxt);

    void ReceiveDataFragementMessage(CMessage& rMessage, SDataContext& rsDataCtxt);
};

#endif // !define CHANNEL_H