![]() |
![]() |
![]() |
![]() |
![]() |
xA,xB=number of rewards from users shown variant A,BxA∼Binomial(nA,rA)xB∼Binomial(nB,rB)rA,rB∼Beta(1,1)
rA | nA,xA∼Beta(xA+1,nA−xA+1)rB | nB,xB∼Beta(xB+1,nB−xB+1)
Thompson sampling randomizes user/variant assignment according to the probabilty that each variant maximizes the posterior expected reward.
The probability that a user is assigned variant A is
P(rA>rB | D)=∫10P(rA>r | D) πB(r | D) dr=∫10(∫1rπA(s | D) ds) πB(r | D) dr∝∫10(∫1rsαA−1(1−s)βA−1 ds)rαB−1(1−r)βB−1 dr
fig
fig
(a_samples > b_samples).mean()
0.24299999999999999
class BetaBinomial:
def __init__(self, a0=1., b0=1.):
self.a = a0
self.b = b0
def sample(self):
return sp.stats.beta.rvs(self.a, self.b)
def update(self, n, x):
self.a += x
self.b += n - x
class Bandit:
def __init__(self, a_post, b_post):
self.a_post = a_post
self.b_post = b_post
def assign(self):
return 1 * (self.a_post.sample() < self.b_post.sample())
def update(self, arm, reward):
arm_post = self.a_post if arm == 0 else self.b_post
arm_post.update(1, reward)
A_RATE, B_RATE = 0.05, 0.1
N = 1000
rewards_gen = generate_rewards(A_RATE, B_RATE, N)
bandit = Bandit(BetaBinomial(), BetaBinomial())
arms = np.empty(N, dtype=np.int64)
rewards = np.empty(N)
for t, arm_rewards in tqdm(enumerate(rewards_gen), total=N):
arms[t] = bandit.assign()
rewards[t] = arm_rewards[arms[t]]
bandit.update(arms[t], rewards[t])
100%|██████████| 1000/1000 [00:00<00:00, 6266.30it/s]
fig
N_BANDIT = 100
arms = np.empty((N_BANDIT, N), dtype=np.int64)
rewards = np.empty((N_BANDIT, N))
for i in trange(N_BANDIT):
arms[i], rewards[i] = simulate_bandit(
generate_rewards(A_RATE, B_RATE, N), N
)
100%|██████████| 100/100 [00:13<00:00, 7.36it/s]
fig
N = 2000
arms, rewards = simulate_bandits(
lambda: generate_rewards(A_RATE, A_RATE, N),
N, N_BANDIT
)
100%|██████████| 100/100 [00:23<00:00, 4.18it/s]
fig
fig
fig
For 0<ε≪1
˜πt+1(θ | Dt+1,D1,t)⏟decayed posterior after time t+1∝(f(Dt+1 | θ)⋅˜πt(θ | D1,t))1−ε⋅π0(θ)⏟no-data priorε
If we stop collecting data at time T,
˜πt(θ | D1,T)t→∞⟶π0(θ).
If
π0(r)=Beta(α0,β0)˜πt(r | D1,t)=Beta(αt,βt)
and no data is observed at time t+1, then
˜πt+1(r | Dt+1=∅,D1,t)=Beta((1−ε)αt+εα0,(1−ε)βt+εβ0).
fig
class DecayedBetaBinomial(BetaBinomial):
def __init__(self, eps, a0=1., b0=1.):
super(DecayedBetaBinomial, self).__init__(a0=a0, b0=b0)
self.eps = eps
self.a0 = a0
self.b0 = b0
def update(self, n, x):
super(DecayedBetaBinomial, self).update(n, x)
self.a = (1 - self.eps) * self.a + self.eps * self.a0
self.b = (1 - self.eps) * self.b + self.eps * self.b0
fig
arms, rewards = simulate_bandits(
lambda: generate_changing_rewards(A_RATES, B_RATES, N),
N, N_BANDIT,
)
decay_arms, decay_rewards = simulate_bandits(
lambda: generate_changing_rewards(A_RATES, B_RATES, N),
N, N_BANDIT,
prior=lambda: DecayedBetaBinomial(0.01)
)
100%|██████████| 100/100 [00:23<00:00, 4.18it/s] 100%|██████████| 100/100 [00:24<00:00, 4.02it/s]
fig
fig
fig
fig
ˆrIPSA=1nn∑t=1Yi⋅I(At=1)P(At=1 | D1,t−1)
where
At={1session t saw variant A0session t saw variant B.
Calculating P(At=1 | D1,t−1)
ts_fig
%%time
N_MC = 500
a_samples = sp.stats.beta.rvs(
a_post_alpha[..., np.newaxis],
a_post_beta[..., np.newaxis],
size=(N_BANDIT, N, N_MC)
)
b_samples = sp.stats.beta.rvs(
b_post_alpha[..., np.newaxis],
b_post_beta[..., np.newaxis],
size=(N_BANDIT, N, N_MC)
)
a_prob = np.empty((N_BANDIT, N))
a_prob[:, 0] = 0.5
a_prob[:, 1:] = (a_samples > b_samples).mean(axis=2)[:, :-1]
CPU times: user 21.2 s, sys: 650 ms, total: 21.9 s Wall time: 21.9 s
a_ips_est = 1. / plot_t * (rewards * (arms == 0) / a_prob_).cumsum(axis=1)
b_ips_est = 1. / plot_t * (rewards * (arms == 1) / (1 - a_prob_)).cumsum(axis=1)
fig
fig
Intuition: the variant with the highest reward rate should receive the most traffic.
fig