Examples

Image Processing System

Below is an example of what an image processing system might look like. In this robot, there are three components. The robot’s computation flow is as below

  1. A camera takes an image and posts it to Redis

  2. A computer processes that image and posts the result to Redis. Here, the process is just to compute the mean value of the image.

  3. An actuator reads the result of the computation and does something with it. Here, it just logs it.

All the code and each component’s logs can be found in the repository

Camera

from reem import  PublishSpace
import numpy as np
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="camera.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds
start_time = time.time()


# --------------------------- Main -----------------------------------


pspace = PublishSpace("localhost")

image_count = 0
while time.time() < start_time + TIME_TO_RUN:
    image = np.random.rand(640, 480, 3)
    data = {
        "image": image,
        "images_sent": image_count,
        "time_stamp": time.time(),
    }
    pspace["raw_image"] = data
    logger.info("Published Image {}".format(image_count))
    image_count += 1

Processor

from reem import PublishSpace, CallbackSubscriber
import numpy as np
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="processor.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds


# --------------------------- Main -----------------------------------


pspace = PublishSpace("localhost")


def callback(data, updated_path):
    pspace["processed_info"] = {
        "mean": np.mean(data["image"]),
        "time_stamp": time.time(),
        "images_sent": data["images_sent"]
    }
    logger.info("Processed image {}".format(data["images_sent"]))


subscriber = CallbackSubscriber(
    channel="raw_image",
    interface="localhost",
    callback_function=callback,
    kwargs={}
)

subscriber.listen()
time.sleep(TIME_TO_RUN)

Actuator

from reem import PublishSpace, CallbackSubscriber
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="actuator.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds


# --------------------------- Main -----------------------------------


pspace = PublishSpace("localhost")


def callback(data, updated_path):
    logger.info("Processed image {}".format(data["images_sent"]))


subscriber = CallbackSubscriber(
    channel="processed_info",
    interface="localhost",
    callback_function=callback,
    kwargs={}
)

subscriber.listen()
time.sleep(TIME_TO_RUN)

Arm Actuator

This examples tries to mimic a system where one computer is responsible for actuating motors at a specific frequency while the set point is controlled by another computer at a different frequency.

We have implemented in two ways - using a database paradigm and a publish/subscribe paradigm.

All the code and each component’s logs can be found in the repository

Database

Controller

from reem.datatypes import KeyValueStore, CallbackSubscriber
from reem.connection import RedisInterface
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="controller_kvs.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds
start_time = time.time()


# --------------------------- Main -----------------------------------


interface = RedisInterface(host="localhost")
kvs = KeyValueStore(interface=interface)

set_frequency = 100  # Hz
set_period = 1.0/set_frequency

while time.time() < start_time + TIME_TO_RUN:
    next_iteration = time.time() + set_period
    command = time.time()
    kvs["set_point"] = command
    logger.info("Wrote Set Point: {}".format(command))
    time.sleep(max(0.0, next_iteration - time.time()))

Actuator

from reem import KeyValueStore
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="actuator_kvs.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 5.0  # seconds
start_time = time.time()

# --------------------------- Main -----------------------------------

kvs = KeyValueStore('localhost')

polling_frequency = 1000  # Hz
polling_period = 1.0/polling_frequency

while time.time() < start_time + TIME_TO_RUN:
    next_iteration = time.time() + polling_period
    command = kvs["set_point"].read()
    logger.info("Read Set Point: {}".format(command))
    time.sleep(max(0.0, next_iteration - time.time()))

Publish/Subscribe

Controller

from reem import PublishSpace
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="controller_silent_subcsriber.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 10.0  # seconds


# --------------------------- Main -----------------------------------


pspace = PublishSpace("localhost")

set_frequency = 100  # Hz
set_period = 1.0/set_frequency

print("Writting to channel 'command' for",TIME_TO_RUN,"seconds...")
print("(Run actuator.py at the same time)")
num_messages = 0
start_time = time.time()
next_iteration = time.time()
while time.time() < start_time + TIME_TO_RUN:
    #this does the publishing
    command = time.time()
    pspace["command"] = command
    logger.info("Published Set Point: {}".format(command))

    #intelligently sleeps
    next_iteration += set_period
    t_now = time.time()
    time.sleep(max(0.0, next_iteration - t_now))
    if next_iteration < t_now:
        next_iteration = t_now + set_period
    num_messages += 1
print("Quitting, published",num_messages,"messages")

Actuator

from reem.connection import SilentSubscriber
import time
import logging

# Logging Configuration
logging.basicConfig(
    format="%(asctime)20s %(filename)30s:%(lineno)3s  %(funcName)20s() %(levelname)10s     %(message)s",
    filename="actuator_silent_subscriber.log",
    filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)

TIME_TO_RUN = 10.0  # seconds
start_time = time.time()

# --------------------------- Main -----------------------------------

subscriber = SilentSubscriber(channel="command", interface="localhost")
subscriber.listen()

frequency = 100  # Hz
period = 1.0/frequency

print("Reading from channel 'command' for",TIME_TO_RUN,"seconds...")
print("(Run controller.py at the same time)")
while time.time() < start_time + TIME_TO_RUN:
    next_iteration = time.time() + period
    command = subscriber.value()
    logger.info("Read Set Point: {} at time {}".format(command,time.time()))
    time.sleep(max(0.0, next_iteration - time.time()))
print("Quitting.")