Examples
Image Processing System
Below is an example of what an image processing system might look like. In this robot, there are three components. The robot’s computation flow is as below
A camera takes an image and posts it to Redis
A computer processes that image and posts the result to Redis. Here, the process is just to compute the mean value of the image.
An actuator reads the result of the computation and does something with it. Here, it just logs it.
All the code and each component’s logs can be found in the repository
Camera
from reem import PublishSpace
import numpy as np
import time
import logging
# Logging Configuration
logging.basicConfig(
format="%(asctime)20s %(filename)30s:%(lineno)3s %(funcName)20s() %(levelname)10s %(message)s",
filename="camera.log",
filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)
TIME_TO_RUN = 5.0 # seconds
start_time = time.time()
# --------------------------- Main -----------------------------------
pspace = PublishSpace("localhost")
image_count = 0
while time.time() < start_time + TIME_TO_RUN:
image = np.random.rand(640, 480, 3)
data = {
"image": image,
"images_sent": image_count,
"time_stamp": time.time(),
}
pspace["raw_image"] = data
logger.info("Published Image {}".format(image_count))
image_count += 1
Processor
from reem import PublishSpace, CallbackSubscriber
import numpy as np
import time
import logging
# Logging Configuration
logging.basicConfig(
format="%(asctime)20s %(filename)30s:%(lineno)3s %(funcName)20s() %(levelname)10s %(message)s",
filename="processor.log",
filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)
TIME_TO_RUN = 5.0 # seconds
# --------------------------- Main -----------------------------------
pspace = PublishSpace("localhost")
def callback(data, updated_path):
pspace["processed_info"] = {
"mean": np.mean(data["image"]),
"time_stamp": time.time(),
"images_sent": data["images_sent"]
}
logger.info("Processed image {}".format(data["images_sent"]))
subscriber = CallbackSubscriber(
channel="raw_image",
interface="localhost",
callback_function=callback,
kwargs={}
)
subscriber.listen()
time.sleep(TIME_TO_RUN)
Actuator
from reem import PublishSpace, CallbackSubscriber
import time
import logging
# Logging Configuration
logging.basicConfig(
format="%(asctime)20s %(filename)30s:%(lineno)3s %(funcName)20s() %(levelname)10s %(message)s",
filename="actuator.log",
filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)
TIME_TO_RUN = 5.0 # seconds
# --------------------------- Main -----------------------------------
pspace = PublishSpace("localhost")
def callback(data, updated_path):
logger.info("Processed image {}".format(data["images_sent"]))
subscriber = CallbackSubscriber(
channel="processed_info",
interface="localhost",
callback_function=callback,
kwargs={}
)
subscriber.listen()
time.sleep(TIME_TO_RUN)
Arm Actuator
This examples tries to mimic a system where one computer is responsible for actuating motors at a specific frequency while the set point is controlled by another computer at a different frequency.
We have implemented in two ways - using a database paradigm and a publish/subscribe paradigm.
All the code and each component’s logs can be found in the repository
Database
Controller
from reem.datatypes import KeyValueStore, CallbackSubscriber
from reem.connection import RedisInterface
import time
import logging
# Logging Configuration
logging.basicConfig(
format="%(asctime)20s %(filename)30s:%(lineno)3s %(funcName)20s() %(levelname)10s %(message)s",
filename="controller_kvs.log",
filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)
TIME_TO_RUN = 5.0 # seconds
start_time = time.time()
# --------------------------- Main -----------------------------------
interface = RedisInterface(host="localhost")
kvs = KeyValueStore(interface=interface)
set_frequency = 100 # Hz
set_period = 1.0/set_frequency
while time.time() < start_time + TIME_TO_RUN:
next_iteration = time.time() + set_period
command = time.time()
kvs["set_point"] = command
logger.info("Wrote Set Point: {}".format(command))
time.sleep(max(0.0, next_iteration - time.time()))
Actuator
from reem import KeyValueStore
import time
import logging
# Logging Configuration
logging.basicConfig(
format="%(asctime)20s %(filename)30s:%(lineno)3s %(funcName)20s() %(levelname)10s %(message)s",
filename="actuator_kvs.log",
filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)
TIME_TO_RUN = 5.0 # seconds
start_time = time.time()
# --------------------------- Main -----------------------------------
kvs = KeyValueStore('localhost')
polling_frequency = 1000 # Hz
polling_period = 1.0/polling_frequency
while time.time() < start_time + TIME_TO_RUN:
next_iteration = time.time() + polling_period
command = kvs["set_point"].read()
logger.info("Read Set Point: {}".format(command))
time.sleep(max(0.0, next_iteration - time.time()))
Publish/Subscribe
Controller
from reem import PublishSpace
import time
import logging
# Logging Configuration
logging.basicConfig(
format="%(asctime)20s %(filename)30s:%(lineno)3s %(funcName)20s() %(levelname)10s %(message)s",
filename="controller_silent_subcsriber.log",
filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)
TIME_TO_RUN = 10.0 # seconds
# --------------------------- Main -----------------------------------
pspace = PublishSpace("localhost")
set_frequency = 100 # Hz
set_period = 1.0/set_frequency
print("Writting to channel 'command' for",TIME_TO_RUN,"seconds...")
print("(Run actuator.py at the same time)")
num_messages = 0
start_time = time.time()
next_iteration = time.time()
while time.time() < start_time + TIME_TO_RUN:
#this does the publishing
command = time.time()
pspace["command"] = command
logger.info("Published Set Point: {}".format(command))
#intelligently sleeps
next_iteration += set_period
t_now = time.time()
time.sleep(max(0.0, next_iteration - t_now))
if next_iteration < t_now:
next_iteration = t_now + set_period
num_messages += 1
print("Quitting, published",num_messages,"messages")
Actuator
from reem.connection import SilentSubscriber
import time
import logging
# Logging Configuration
logging.basicConfig(
format="%(asctime)20s %(filename)30s:%(lineno)3s %(funcName)20s() %(levelname)10s %(message)s",
filename="actuator_silent_subscriber.log",
filemode='w')
logger = logging.getLogger("script")
logger.setLevel(logging.INFO)
TIME_TO_RUN = 10.0 # seconds
start_time = time.time()
# --------------------------- Main -----------------------------------
subscriber = SilentSubscriber(channel="command", interface="localhost")
subscriber.listen()
frequency = 100 # Hz
period = 1.0/frequency
print("Reading from channel 'command' for",TIME_TO_RUN,"seconds...")
print("(Run controller.py at the same time)")
while time.time() < start_time + TIME_TO_RUN:
next_iteration = time.time() + period
command = subscriber.value()
logger.info("Read Set Point: {} at time {}".format(command,time.time()))
time.sleep(max(0.0, next_iteration - time.time()))
print("Quitting.")