Example Application

Because Strong-RL takes care of the overall framework for your application and provides a host of default, configurable components, you can develop an entire, highly-scalable reinforcement application in just a single Python script:

import datetime
import tempfile
import shutil
import os
import numpy as np
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType, DoubleType, FloatType, BooleanType

from strong_rl.base.app import Application, AppConfig
from strong_rl.base.environment import Environment
from strong_rl.actions.actionspace import ActionSpace
from strong_rl.actions.set import ActionSet
from strong_rl.actions.action import NullAction
from strong_rl.events.event import Event
from strong_rl.actions.action import Action
from strong_rl.events.set import EventSet
from strong_rl.models.model import Model
from strong_rl.models.set import ModelSet
from strong_rl.storage.local import LocalStorage
from strong_rl.cache.file import FileCache
from strong_rl.batch.datalog import BatchDataLog
from strong_rl.batch.datamodeler import BatchDataModeler
from strong_rl.batch.targeter import BatchTargeter
from strong_rl.batch.actor import BatchActor
from strong_rl.internals.base_data import DataField

from strong_rl.algorithms.agents.set import AgentSet
from strong_rl.algorithms.states.state import AgentState
from strong_rl.algorithms.memories.experience_replay import ExperienceReplay
from strong_rl.algorithms.preprocessors.scaling_preprocessor import ScalingPreprocessor
from strong_rl.algorithms.agents.traart_dqn import DQN
from strong_rl.algorithms.models.nn import MultiActionDQN
from strong_rl.algorithms.policies.epsilon_greedy import EpsilonGreedy


"""
Events, Models, and Actions
----
Describe the data your application collects (events),
the data models you will build (models), and the
actions your agent can take (actions).
"""


class Impression(Event):
    """
    Register a single ad impression to a user, and whether
    or not they clicked on it (our behavior of interest).
    """
    name = "impression"

    name = DataField(StringType(), False)
    happened_at = DataField(TimestampType(), False)
    created_at = DataField(TimestampType(), False)
    user_id = DataField(LongType(), False)
    product_id = DataField(LongType(), False)
    click = DataField(BooleanType(), False)

    def validate(self):
        if self.product_id > 100:
            raise Exception("Impossible product ID received. We only have 100 products.")


class User(Model):
    """
    Build our target model (of users), summarising their most recent
    10 impressions.
    """
    name = "user"

    key = "user_id"

    query = """
        WITH ranked_impressions AS (
          SELECT user_id,
                 product_id AS recent_product,
                 click,
                 ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY happened_at DESC) as n_back,
                 ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY happened_at ASC) as cum_num_impressions
          FROM impressions
        )

        SELECT user_id,
               LAST(cum_num_impressions) AS num_impressions,
               LAST(recent_product) AS recent_product,
               AVG(CAST(click AS INTEGER)) AS click_rate
        FROM ranked_impressions
        WHERE n_back <= 10
        GROUP BY user_id
    """

    user_id = DataField(LongType(), False)
    num_impressions = DataField(IntegerType(), False)
    recent_product = DataField(LongType(), False)
    click_rate = DataField(DoubleType(), False)


class Coupon(Action):
    """
    Describe the only kind of action we can take:
    sending a coupon of a particular value.
    """
    name = "Offer"

    value = DataField(DoubleType(), False)


class CouponActionSpace(ActionSpace):
    """
    Describe our complete action space: what coupon values we can send,
    and whether we can choose to do nothing at all (a NullAction).
    """
    actions = [
        (NullAction(),),
        (Coupon(value=10),),
        (Coupon(value=50),),
    ]

    def constrain(self, target):
        # constrain our actions so we only send $50 coupons
        # if the user has seen 5+ ads
        self.actions = [a for a in self.actions if a[0].null or target.num_impressions >= 5 or a[0].value < 50]


class SimulationEnvironment(Environment):
    """
    Implement a simulation environment.

    In this simulation, we instantiate 100
    users who receive impressions. If we send them a $50 coupon,
    they have an 80% chance of clicking on the next ad (our target behavior, thus our reward).
    If we don't send them a $50 coupon, they have only a 20% chance of clicking on the next ad.
    """
    def __init__(self, app):
        super().__init__(app=app)

        self.num_users = 100

        self.users = {
            i: {
                'last_actions': (NullAction(),)
            } for i in range(self.num_users)
        }

    def current_reward(self):
        return np.mean([1.0 if self.reward(state) else 0.0 for state in self.users.values()])

    def reward(self, state):
        correct_action = True if Coupon(value=50) in state['last_actions'] else False
        p = .8 if correct_action else .2
        reward = bool(np.random.binomial(1, p=p))

        return reward

    def simulate_events(self):
        return [
            Impression(
                happened_at=self.app.config.time_cursor.get() - datetime.timedelta(seconds=1),
                user_id=i,
                product_id=99,
                click=self.reward(state),
            ) for i, state in self.users.items()
        ]

    def act(self, recommended_actions):
        for ra in recommended_actions:
            self.users[ra.target.id()]['last_actions'] = ra.actions


class ExampleRLApplication:
    """
    Build our application. To allow for simple and side-effect free testing,
    we build the application in a context and then tear it down (i.e.,
    delete all data) when the context is exited.
    """
    def __init__(self):
        self.temp_dir = None
        self.app = None

    def __enter__(self):
        # create temporary folder
        self.temp_dir = tempfile.mkdtemp()

        # configure our data store
        storage = LocalStorage(base_path=os.path.join(self.temp_dir, 'app'))

        # configure the application
        eventset = EventSet(custom_events=(Impression,))

        modelset = ModelSet(custom_models=(),
                            target_model=User)

        actionset = ActionSet(actions=(Coupon,))

        config = AppConfig(eventset=eventset,
                           modelset=modelset,
                           actionset=actionset,
                           earliest_time=datetime.datetime(year=2018, month=1, day=1))

        # import a default Strong-RL agent (normally, you would extend a default Strong-RL agent to build your own)
        agent = DQN(
            config=config,
            state=AgentState(name="my_agent_v1", cache=FileCache(storage=LocalStorage(base_path=os.path.join(self.temp_dir, 'agent')))),
            memory=ExperienceReplay(buffer_size=10000),
            preprocessor=ScalingPreprocessor(reward_col='click_rate',
                                             state_cols=['recent_product']),
            model=MultiActionDQN(action_dim=len(CouponActionSpace.actions)),
            policy=EpsilonGreedy(epsilon_start=.5, epsilon_step=.05, epsilon_stop=.1),
            action_space=CouponActionSpace,
            update_size=10,
            update_iters=5,
            discount=.1
        )

        agentset = AgentSet(agent)

        # wire all of the application components together
        self.app = Application(config). \
                   set_datalog(BatchDataLog(storage=storage)). \
                   set_datamodeler(BatchDataModeler()). \
                   set_targeter(BatchTargeter(query="SELECT * FROM users")). \
                   set_actor(BatchActor(agentset=agentset, batch_size=32)). \
                   set_environment(SimulationEnvironment())

        return self.app

    def __exit__(self, *args):
        shutil.rmtree(self.temp_dir)


if __name__ == "__main__":
    # Run our application for 90 days and watch the agent learn which coupons to send!
    with ExampleRLApplication() as app:
        current_date = app.config.earliest_time

        while current_date <= current_date + datetime.timedelta(days=90):
            app.config.time_cursor.set(current_date)

            app.datalog.write(events=app.environment.simulate_events())
            app.datamodeler.run()
            app.targeter.run()
            app.actor.run(observe_since=(app.config.time_cursor.get() - datetime.timedelta(days=1)))

            current_date += datetime.timedelta(days=1)