Skip to main content
Version: 3.4.x

Integration Examples

info

This guide assumes that a Moth server is already deployed. For installation and operations, refer to the separate documents provided with CoBiz delivery.

Use the examples below as a starting point for custom Agent and web UI development. Full code is available in the example repository on GitHub, which also includes a LiDAR example.

Robot integration example

This example subscribes to a ROS topic, receives video frames, encodes them with GStreamer using H.264, and forwards them to Moth.

Initialization

Constructor

This is the constructor of the Video object used in this example. Set the following values.

  • host_: Moth server address
  • port_: Moth server port
  • ROS TOPIC(/your/image_raw/topic): ROS topic that publishes video frames from the robot
Video::Video() : Node("video")
{
host_ = "your_host";
port_ = "your_port";
text = "video/h264;width=640;height=480;framerate=30;codecs=avc1.42002A";

while (!initWebSocket()){
std::cout << "reconnecting..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}

initVideo();

RCLCPP_INFO(this->get_logger(), "Node has been started.");
subscription_ = this->create_subscription<sensor_msgs::msg::Image>("/your/image_raw/topic", 10, std::bind(&Video::video_callback, this, std::placeholders::_1));
}

WebSocket initialization

This code initializes the WebSocket connection to the Moth server. Set the following value.

  • "your_path": the path below the domain used to connect to Moth over WebSocket. See the Service API for details.
bool Video::initWebSocket(){
try{
tcp::resolver resolver{ioc}; // resolver that converts HOST and PORT into an IP
ws_ptr = std::make_shared<websocket::stream<tcp::socket>>(ioc); // websocket pointer object

auto const results = resolver.resolve(host_, port_); // resolve IP
auto ep = net::connect(ws_ptr->next_layer(), results); // connect to the IP

std::string host = host_ + ':' + std::to_string(ep.port());

ws_ptr->set_option(websocket::stream_base::decorator( // set HTTP headers
[](websocket::request_type& req)
{
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-coro");
}));
ws_ptr->handshake(host, "your_path"); // set websocket PATH

ws_ptr->write(net::buffer(text)); // send MIME (currently a string)
ws_ptr->binary(true); // send subsequent data as binary

std::cout << "connect" << std::endl;
return true;
} catch (...){
std::cout << "ERR initWebSocket()" << std::endl;
return false;
}
}

GStreamer initialization

This code initializes GStreamer for video encoding. Frames pushed into the appsrc pipeline are encoded by GStreamer. Encoded frames are delivered to appsink, and appsink_callback() sends them to the server.

void Video::initVideo(){
// appsrc: source that accepts raw frames, videoconvert: converts video format, openh264enc: hardware H.264 encoder, h264parse: parser for NAL units and SPS/PPS, appsink: sink that lets the app read output directly
const char* pipeline_cmd = "appsrc name=src do-timestamp=true is-live=true emit-signals=true format=time caps=video/x-raw,format=RGB,width=640,height=480 ! videoconvert ! openh264enc bitrate=1000000 ! video/x-h264, profile=high, alignment=au, stream-format=byte-stream ! h264parse config-interval=1 ! queue leaky=2 ! appsink name=sink sync=false drop=true emit-signals=true max-buffers=3";
pipeline = gst_parse_launch(pipeline_cmd, NULL); // create pipeline from pipeline_cmd
appsrc = gst_bin_get_by_name(GST_BIN(pipeline), "src"); // fetch the object named src into appsrc
appsink = gst_bin_get_by_name(GST_BIN(pipeline), "sink"); // fetch the object named sink into appsink
g_signal_connect(appsink, "new-sample", G_CALLBACK(appsink_callback), this); // connect appsink output to appsink_callback
gst_element_set_state(pipeline, GST_STATE_PLAYING); // set pipeline state to PLAYING, i.e. start it
}

Receiving ROS frames

This callback subscribes to the ROS topic and receives frames. The received frame is pushed into the GStreamer appsrc pipeline.

void Video::video_callback(const sensor_msgs::msg::Image::SharedPtr msg){
GstBuffer* buffer = gst_buffer_new_allocate(NULL, msg->data.size(), NULL); // allocate a GstBuffer for the incoming ROS message size
GstMapInfo map; // mapping object for buffer data
gst_buffer_map(buffer, &map, GST_MAP_WRITE); // map buffer address into map
std::memcpy(map.data, msg->data.data(), msg->data.size()); // copy ROS message data into map
GstFlowReturn ret;
g_signal_emit_by_name(appsrc, "push-buffer", buffer, &ret); // push the buffer to the GStreamer appsrc pipeline
gst_buffer_unmap(buffer, &map);
gst_buffer_unref(buffer);
if (ret != GST_FLOW_OK) {
g_printerr("Failed to push buffer to appsrc.\\n");
}
}

Sending GStreamer-encoded frames

This callback sends frames encoded by GStreamer to the Moth server.

GstFlowReturn Video::appsink_callback(GstAppSink *appsink, gpointer user_data){
Video *self = static_cast<Video *>(user_data);
return self->send_frame(appsink);
}

GstFlowReturn Video::send_frame(GstAppSink *appsink){
GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink)); // store appsink output into sample
if (!sample) {
std::cerr << "Failed to pull sample" << std::endl;
return GST_FLOW_ERROR;
}
GstBuffer *buffer_ = gst_sample_get_buffer(sample); // extract the GstBuffer from sample
GstMapInfo map_; // mapping object used to read buffer data
if (gst_buffer_map(buffer_, &map_, GST_MAP_READ)){ // map buffer_ into map_
try{
if (ws_ptr && ws_ptr->is_open() && (ws_ptr->next_layer().available()==0)){
ws_ptr->write(net::buffer(map_.data, map_.size)); // send mapped data to the Moth server over WebSocket
}
} catch (const beast::system_error& se) {
std::cerr << "Broken pipe error. reconnecting..." << std::endl;
exit(0);
} catch (...) {
std::cout << "unknown Error" << std::endl;
}
gst_buffer_unmap(buffer_, &map_);
}
gst_sample_unref(sample);
return GST_FLOW_OK;
}

Web integration example

This example receives H.264 video frames over WebSocket, decodes them, and displays them on screen.

script.js

This script connects to Moth over WebSocket, decodes incoming data, and renders it.

const videoElement = document.getElementById("videoElement");

const textEncoder = new TextEncoder();
let mime, mimeObject;
let videoStreamTrack, videoWriter;

const HOST = ""; // Enter the Moth host
const PORT = ""; // Enter the Moth port
const CHANNEL_ID = ""; // Enter the Moth channel
const TRACK = ""; // Enter the Moth track

// Main function executed on page load
window.onload = async () => {
videoStreamTrack = new MediaStreamTrackGenerator({
kind: "video",
});
videoWriter = videoStreamTrack.writable.getWriter();
await videoWriter.ready;

const mediaStream = new MediaStream([videoStreamTrack]);
videoElement.srcObject = mediaStream;

connectWebsocket();
};

// Connect to Moth via WebSocket
const connectWebsocket = () => {
const socket = new WebSocket(
`wss://${HOST}:${PORT}/pang/ws/sub?channel=${CHANNEL_ID}&track=${TRACK}&mode=bundle`
);
socket.binaryType = "arraybuffer";

socket.onopen = () => {
console.log("Connected to server");
// send ping every 10 seconds to keep the connection alive
setInterval(() => {
socket.send(textEncoder.encode("ping"));
}, 10000);
};

socket.onmessage = async (event) => {
if (typeof event.data === "string") {
// configure the video decoder when MIME is received
if (mime === event.data) return; // skip if MIME is unchanged

mime = event.data;
mimeObject = mimeStringToMimeObject(mime);

// write decoded video frames into the video stream
const handleVideoFrame = async (frame) => {
console.log("handleVideoFrame");
if (frame && videoStreamTrack) {
videoWriter.write(frame);
frame.close();
}
};

await setDecoder(mimeObject, handleVideoFrame);
} else if (typeof event.data === "object") {
// ignore data until MIME arrives & ignore ping responses
if (!mimeObject || new TextDecoder().decode(event.data) === "ping") {
return;
}
// decode incoming video data
await startDecode(processVideoData(event, mimeObject));
}
};

socket.onclose = () => {
console.log("Disconnected from server");
};

socket.onerror = (error) => {
alert(`Error: ${error.message}`);
};
};

// Convert a MIME string into the MIME object used by the app
const mimeStringToMimeObject = (mimeString) => {
const [mimeType, ...mimeOption] = mimeString.split(";");

const mimeOptionObj = mimeOption.reduce((acc, option) => {
const [key, value] = option.trim().split("=");
acc[key] = value;
return acc;
}, {});

if (!mimeOptionObj.codec) mimeOptionObj.codec = mimeOptionObj.codecs;

// Configure data mode (whether metadata is prefixed to the payload)
if (mimeType.includes("+")) {
const mode = mimeType.split("/")[1].split("+")[0];
mimeOptionObj.data_mode = mode;
}
const type = mimeType.split("/")[0];
mimeOptionObj.data_type = type;

return mimeOptionObj;
};

// Normalize video data for decoding
const processVideoData = (event, mime) => {
let sliceNumber = 0;

if (mime.data_mode === "seq") {
sliceNumber = 1;
} else if (mime.data_mode === "ts") {
sliceNumber = 8;
}

const data = new Uint8Array(event.data.slice(sliceNumber));

// Return image payloads as objects
if (mime.data_type === "image") {
return {
data,
timestamp: event.timeStamp,
};
}

// Convert video payloads into EncodedVideoChunk
const encodedChunk = new EncodedVideoChunk({
type: data.length > 100000 ? "key" : "delta",
data: data,
timestamp: event.timeStamp,
duration: 0,
});

return encodedChunk;
};

decoder.js

Helper functions that decode codecs by using WebCodecs.

let videoDecoder = null;
let decoderConfig = null;
let useWebCodec = true;
let handleVideoFrame = null;

// Initial decoder configuration
const setDecoder = async (config, handle) => {
handleVideoFrame = handle;
decoderConfig = config;

// check whether an image codec is used
if (decoderConfig.data_type === "image") {
useWebCodec = false;
} else {
useWebCodec = true;

// validate decoder configuration when a video codec is used
if (await isConfigSupported(decoderConfig).supported) {
return;
}

videoDecoder = new VideoDecoder({
// process the decoded frame
output: async (frame) => {
handleFrame(frame);
},
// handle decoder errors
error: (error) => {
console.error("Decoder error:", error);
decoder.close();
},
});

// configure decoder
videoDecoder.configure(decoderConfig);
console.log("Decoder", videoDecoder);
}
};

const isConfigSupported = async (config) => {
return await VideoDecoder.isConfigSupported(config);
};

const handleFrame = async (frame) => {
handleVideoFrame(frame);
frame.close();
};

const startDecode = async (encodedVideoChunk) => {
if (useWebCodec && typeof encodedVideoChunk === "object") {
// decode through the decoder when using a video codec
try {
if (!videoDecoder) return;

if (videoDecoder.state === "closed") {
await setDecoder(decoderConfig, handleVideoFrame);
} else {
videoDecoder.decode(encodedVideoChunk);
}
} catch (error) {
// retry setup on decoder error
console.error("Decoder error:", error);
await setDecoder(decoderConfig, handleVideoFrame);
}
} else {
// when using an image codec, convert Blob to ImageBitmap and then create a frame
const videoChunk = encodedVideoChunk;
const blob = new Blob([videoChunk.data], {type: "image/jpeg"});

createImageBitmap(blob)
.then((imageBitmap) => {
const decodedChunk = new VideoFrame(imageBitmap, {
timestamp: videoChunk.timestamp,
});
handleVideoFrame(decodedChunk);
})
.catch((error) => {
console.log("ImageBitmap creation error:", error);
});
}
};

index.html

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Video | CoBiz</title>
<style>
body {
margin: 0;
padding: 0;
box-sizing: border-box;
}

video {
width: 100%;
height: 100%;
background-color: black;
}
</style>
</head>
<body>
<video id="videoElement" autoplay muted controls></video>
<script src="scripts/decoder.js"></script>
<script src="scripts/script.js"></script>
</body>
</html>