Examples

Image Processing System

Below is an example of what an image processing system might look like. In this robot, there are three components. The robot’s computation flow is as below

  1. A camera takes an image and posts it to Redis

  2. A computer processes that image and posts the result to Redis. Here, the process is just to compute the mean value of the image.

  3. An actuator reads the result of the computation and does something with it. Here, it just logs it.

All the code and each component’s logs can be found in the repository

Camera

from reem.datatypes import PublishSpace
from reem.connection import RedisInterface
import numpy as np
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="camera.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds
start_time = time.time()


# --------------------------- Main -----------------------------------


interface = RedisInterface(host="localhost")
pspace = PublishSpace(interface=interface)

image_count = 0
while time.time() < start_time + TIME_TO_RUN:
    image = np.random.rand(640, 480, 3)
    data = {
        "image": image,
        "images_sent": image_count,
        "time_stamp": time.time(),
    }
    pspace["raw_image"] = data
    logger.info("Published Image {}".format(image_count))
    image_count += 1

Processor

from reem.datatypes import PublishSpace, CallbackSubscriber
from reem.connection import RedisInterface
import numpy as np
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="processor.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds


# --------------------------- Main -----------------------------------


interface = RedisInterface(host="localhost")
pspace = PublishSpace(interface=interface)


def callback(data, updated_path):
    pspace["processed_info"] = {
        "mean": np.mean(data["image"]),
        "time_stamp": time.time(),
        "images_sent": data["images_sent"]
    }
    logger.info("Processed image {}".format(data["images_sent"]))


subscriber = CallbackSubscriber(
    channel="raw_image",
    interface=interface,
    callback_function=callback,
    kwargs={}
)

subscriber.listen()
time.sleep(TIME_TO_RUN)

Actuator

from reem.datatypes import PublishSpace, CallbackSubscriber
from reem.connection import RedisInterface
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="actuator.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds


# --------------------------- Main -----------------------------------


interface = RedisInterface(host="localhost")
pspace = PublishSpace(interface=interface)


def callback(data, updated_path):
    logger.info("Processed image {}".format(data["images_sent"]))


subscriber = CallbackSubscriber(
    channel="processed_info",
    interface=interface,
    callback_function=callback,
    kwargs={}
)

subscriber.listen()
time.sleep(TIME_TO_RUN)

Arm Actuator

This examples tries to mimic a system where one computer is responsible for actuating motors at a specific frequency while the set point is controlled by another computer at a different frequency.

We have implemented in two ways - using a database paradigm and a publish/subscribe paradigm.

All the code and each component’s logs can be found in the repository

Database

Controller

from reem.datatypes import KeyValueStore, CallbackSubscriber
from reem.connection import RedisInterface
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="controller_kvs.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds
start_time = time.time()


# --------------------------- Main -----------------------------------


interface = RedisInterface(host="localhost")
kvs = KeyValueStore(interface=interface)

set_frequency = 100  # Hz
set_period = 1.0/set_frequency

while time.time() < start_time + TIME_TO_RUN:
    next_iteration = time.time() + set_period
    command = time.time()
    kvs["set_point"] = command
    logger.info("Wrote Set Point: {}".format(command))
    time.sleep(max(0.0, next_iteration - time.time()))

Actuator

from reem.datatypes import KeyValueStore
from reem.connection import RedisInterface
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="actuator_kvs.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds
start_time = time.time()

# --------------------------- Main -----------------------------------

interface = RedisInterface(host="localhost")
kvs = KeyValueStore(interface)

polling_frequency = 1000  # Hz
polling_period = 1.0/polling_frequency

while time.time() < start_time + TIME_TO_RUN:
    next_iteration = time.time() + polling_period
    command = kvs["set_point"].read()
    logger.info("Read Set Point: {}".format(command))
    time.sleep(max(0.0, next_iteration - time.time()))

Publish/Subscribe

Controller

from reem.datatypes import PublishSpace
from reem.connection import RedisInterface
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="controller_silent_subcsriber.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds
start_time = time.time()


# --------------------------- Main -----------------------------------


interface = RedisInterface(host="localhost")
pspace = PublishSpace(interface=interface)

set_frequency = 100  # Hz
set_period = 1.0/set_frequency

while time.time() < start_time + TIME_TO_RUN:
    next_iteration = time.time() + set_period
    command = time.time()
    pspace["command"] = command
    logger.info("Published Set Point: {}".format(command))
    time.sleep(max(0.0, next_iteration - time.time()))

Actuator

from reem.datatypes import SilentSubscriber
from reem.connection import RedisInterface
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="actuator_silent_subscriber.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds
start_time = time.time()

# --------------------------- Main -----------------------------------

interface = RedisInterface(host="localhost")
subscriber = SilentSubscriber(channel="command", interface=interface)
subscriber.listen()

frequency = 1000  # Hz
period = 1.0/frequency

while time.time() < start_time + TIME_TO_RUN:
    next_iteration = time.time() + period
    command = subscriber.value()
    logger.info("Read Set Point: {}".format(command))
    time.sleep(max(0.0, next_iteration - time.time()))