Python Vehicle App Development

Learn how to develop and test a Vehicle App using Python.

We recommend that you make yourself familiar with the Vehicle App SDK first, before going through this tutorial.

The following information describes how to develop and test the sample Vehicle App that is included in the Python template repository . You will learn how to use the Vehicle App Python SDK and how to interact with the Vehicle Model.

Once you have completed all steps, you will have a solid understanding of the development workflow and you will be able to reuse the template repository for your own Vehicle App development project.

Develop your first Vehicle App

This section describes how to develop your first Vehicle App. Before you start building a new Vehicle App, make sure you have already read this manual:

Once you have established your development environment, you will be able to start developing your first Vehicle App.

For this tutorial, you will recreate the Vehicle App that is included with the SDK repository : The Vehicle App allows to change the position of the driver’s seat in the car and also provides its current positions to other applications. A detailed explanation of the use case and the example is available here .

Setting up the basic skeleton of your app

At first, you have to create the main Python script called main.py in /app/src. All the relevant code for your new Vehicle App goes there.

If you’ve created your app development repository from our Python template repository , the Velocitas CLI create command or via digital.auto prototyping a file with this name is already present and can be adjusted to your needs.

Setting up the basic skeleton of an app consists of the following steps:

  1. Manage your imports
  2. Enable logging
  3. Initialize your class
  4. Define the entry point of your app

Manage your imports

Before you start development in the main.py you just created, it will be necessary to include the imports required, which you will understand better later through the development:

import asyncio
import json
import logging
import signal

from velocitas_sdk.util.log import (  # type: ignore
    get_opentelemetry_log_factory,
    get_opentelemetry_log_format,
)
from velocitas_sdk.vdb.reply import DataPointReply
from velocitas_sdk.vehicle_app import VehicleApp, subscribe_topic
from vehicle import Vehicle, vehicle  # type: ignore

Enable logging

The following logging configuration applies the default log format provided by the SDK and sets the log level to INFO:

logging.setLogRecordFactory(get_opentelemetry_log_factory())
logging.basicConfig(format=get_opentelemetry_log_format())
logging.getLogger().setLevel("INFO")
logger = logging.getLogger(__name__)

Initialize your class

The main class of your new Vehicle App needs to inherit the VehicleApp provided by the Python SDK .

class MyVehicleApp(VehicleApp):

In class initialization, you have to pass an instance of the Vehicle Model:

def __init__(self, vehicle_client: Vehicle):
    super().__init__()
    self.Vehicle = vehicle_client

We save the vehicle object to use it in our app. Now, you have initialized the app and can continue developing relevant methods.

Entry point of your app

Here’s an example of an entry point to the MyVehicleApp that we just developed:

async def main():
    """Main function"""
    logger.info("Starting my VehicleApp...")
    vehicle_app = MyVehicleApp(vehicle)
    await vehicle_app.run()

LOOP = asyncio.get_event_loop()
LOOP.add_signal_handler(signal.SIGTERM, LOOP.stop)
LOOP.run_until_complete(main())
LOOP.close()

With this your app can now be started. In order to provide some meaningful behaviour of the app, we will enhance it with more features in the next sections.

Vehicle Model Access

In order to facilitate the implementation, the whole vehicle is abstracted into model classes. Please check tutorial about creating models for more details about this topic. In this section, the focus is on using the model.

The first thing you need to do is to get access to the Vehicle Model. If you derived your project repository from our template, we already provide a generated model installed as a Python package named vehicle. Hence, in most cases no additional setup is necessary. How to tailor the model to your needs or how you could get access to vehicle services is described in the tutorial linked above.

If you want to access a single DataPoint e.g. for the vehicle speed, this can be done via

vehicle_speed = (await self.Vehicle.Speed.get()).value

As the get() method of the DataPoint-class there is a coroutine you have to use the await keyword when using it and access its .value.

If you want to get deeper inside the vehicle, to access a single seat for example, you just have to go the model-chain down:

self.DriverSeatPosition = await self.Vehicle.Cabin.Seat.Row1.DriverSide.Position.get()

Subscription to Data Points

If you want to get notified about changes of a specific DataPoint, you can subscribe to this event, e.g. as part of the on_start() method in your app.

    async def on_start(self):
        """Run when the vehicle app starts"""
        await self.Vehicle.Cabin.Seat.Row1.DriverSide.Position.subscribe(
            self.on_seat_position_changed
        )

Every DataPoint provides a .subscribe() method that allows for providing a callback function which will be invoked on every data point update. Subscribed data is available in the respective DataPointReply object and need to be accessed via the reference to the subscribed data point. The returned object is of type TypedDataPointResult which holds the value of the data point and the timestamp at which the value was captured by the Databroker. Therefore the on_seat_position_changed callback function needs to be implemented like this:

    async def on_seat_position_changed(self, data: DataPointReply):
        # handle the event here
        response_topic = "seatadjuster/currentPosition"
        position = data.get(self.Vehicle.Cabin.Seat.Row1.DriverSide.Position).value
        # ...

Subscription using Annotations

The Python SDK also supports annotations for subscribing to data point changes with @subscribe_data_points defined by the whole path to the DataPoint of interest. This would replace the implementation of the Subscription to Data Points

@subscribe_data_points("Vehicle.Cabin.Seat.Row1.DriverSide.Position")
async def on_seat_position_changed(self, data: DataPointReply):
    response_topic = "seatadjuster/currentPosition"
    response_data = {"position": data.get(self.Vehicle.Cabin.Seat.Row1.DriverSide.Position).value}

    await self.publish_event(response_topic, json.dumps(response_data))

Similarly, subscribed data is available in the respective DataPointReply object and needs to be accessed via the reference to the subscribed data point.

Services

Services are used to communicate with other parts of the vehicle via remote function calls (RPC). Please read the basics about them here .

You can ignore the following step if you would like to reach the final implementation of the example .
The following code snippet shows how to use the MoveComponent() method of the SeatService from the vehicle model:

location = SeatLocation(row=1, index=1)
await self.vehicle_client.Cabin.SeatService.MoveComponent(
    location, BASE, data["position"]
    )

In order to define which seat you like to move, you have to pass a SeatLocation object as the first parameter. The second argument specifies the component of the seat to be moved. The possible components are defined in the proto files. The last parameter to be passed into the method is the desired position of the component.

Make sure to use the await keyword when calling service methods, since these methods are asynchronously working coroutines.

MQTT

Interaction with other Vehicle Apps or with the cloud is enabled by using the Mosquitto MQTT Broker. The MQTT broker runs inside a docker container, which is started as part of one of our predefined runtimes .

In the quickstart section about the Vehicle App, you already tested sending MQTT messages to the app. In the previous sections, you generally saw how to use Vehicle Models, DataPoints and Services. In this section, you will learn how to combine them with MQTT.

In order to receive and process MQTT messages inside your app, simply use the @subscribe_topic annotations from the SDK for an additional method on_set_position_request_received() you have to implement:

    @subscribe_topic("seatadjuster/setPosition/request")
    async def on_set_position_request_received(self, data_str: str) -> None:
        logger.info(f"Got message: {data_str!r}")
        data = json.loads(data_str)
        response_topic = "seatadjuster/setPosition/response"
        response_data = {"requestId": data["requestId"], "result": {}}

        # ...

The on_set_position_request_received method will now be invoked every time a message is published to the subscribed topic "seatadjuster/setPosition/response". The message data (string) is provided as parameter. In the example above the data is parsed from json (data = json.loads(data_str)).

In order to publish data to topics, the SDK provides the appropriate convenience method: self.publish_event() which will be added to the on_seat_position_changed callback function from before.

    async def on_seat_position_changed(self, data: DataPointReply):
        response_topic = "seatadjuster/currentPosition"
        position = data.get(self.Vehicle.Cabin.Seat.Row1.DriverSide.Position).value
        await self.publish_event(
            response_topic,
            json.dumps({"position": position}),
        )

The above example illustrates how one can easily publish messages. In this case, every time the seat position changes, the new position is published to seatadjuster/currentPosition

Your main.py should now have a full implementation for class MyVehicleApp(VehicleApp): containing:

  • __init__()
  • on_start()
  • on_seat_position_changed()
  • on_set_position_request_received()

and last but not least a main() method to run the app.

Check the seat-adjuster example to see a more detailed implementation including error handling.

UnitTests

Unit testing is an important part of the development, so let’s have a look at how to do that. You can find some example tests in /app/tests/unit. First, you have to import the relevant packages for unit testing and everything you need for your implementation:

from unittest import mock

import pytest
from sdv.vehicle_app import VehicleApp
from sdv_model.Cabin.SeatService import SeatService  # type: ignore
from sdv_model.proto.seats_pb2 import BASE, SeatLocation  # type: ignore
@pytest.mark.asyncio
async def test_for_publish_to_topic():
    # Disable no-value-for-parameter, seems to be false positive with mock lib

    with mock.patch.object(
        VehicleApp, "publish_event", new_callable=mock.AsyncMock, return_value=-1
    ):
        response = await VehicleApp.publish_event(
            str("sampleTopic"), get_sample_request_data()  # type: ignore
        )
        assert response == -1


def get_sample_request_data():
    return {"position": 330, "requestId": 123}

Looking at a test you notice the annotation @pytest.mark.asyncio. This is required if the test is defined as a coroutine. The next step is to create a mock from all the external dependencies. The method takes 4 arguments: first is the object to be mocked, second the method for which you want to modify the return value, third a callable and the last argument is the return value. After creating the mock, you can test the method and check the response. Use asserts to make your test fail if the response does not match. Check the seat-adjuster unit tests to see a more detailed implementation.

See the results

Once the implementation is done, it is time to run and debug the app.

Run your App

In order to run the app:

Now chose one of the options to start the VehicleApp under development:

  1. Press F5

or:

  1. Press F1
  2. Select command Tasks: Run Task
  3. Select Local Runtime - Run VehicleApp

Debug your Vehicle App

In the introduction about debugging , you saw how to start a debugging session. In this section, you will learn what is happening in the background.

The debug session launch settings are already prepared for the VehicleApp in /.vscode/launch.json.

{
    "configurations": [
        {
            "type": "python",
            "justMyCode": false,
            "request": "launch",
            "name": "VehicleApp",
            "program": "${workspaceFolder}/app/src/main.py",
            "console": "integratedTerminal",
            "env": {
                "SDV_MIDDLEWARE_TYPE": "native",
                "SDV_VEHICLEDATABROKER_ADDRESS": "grpc://127.0.0.1:55555",
                "SDV_MQTT_ADDRESS": "mqtt://127.0.0.1:1883"
            }
        }
    ]
}

We specify which python-script to run using the program key.

You can adapt the configuration in /.vscode/launch.json and in /.vscode/tasks.json to your needs (e.g., change the ports, add new tasks) or even add a completely new configuration for another Vehicle App.

Once you are done, you have to switch to the debugging tab (sidebar on the left) and select your configuration using the dropdown on the top. You can now start the debug session by clicking the play button or F5. Debugging is now as simple as in every other IDE, just place your breakpoints and follow the flow of your Vehicle App.

Next steps

Last modified October 30, 2024: Improve service documentation (#129) (5a3f592)