Functional Python

MC706.io

Recently for a weekend project, I built a simple python script that looked up the current price of a cryptocurrency, looked up the current spot prices of AWS EC2 instances, and calculated if any of mining scenarios with a spot fleet were cheaper than buying the coins on an exchange. For those interested in the answer, the result is basically that it only gets close, it never gets profitable.

As an exercise on the programming half, since the rest of the problem was rather simple, I wanted to stretch my functional programming muscles and see how far I could push python into functional programming and find out what that would look like.

Starting point.

So to start out, here is what some of that original code would look like:

Calculating Profitability

For those of you who dont know, the simplified version of how to calculate payout for a cryptocurrency is your contribution to the global hashrate multiplied by the last reward. The rewards move slowly over time based on the global hashrate, but using the last one will get you very close.

To zero in a little bit more on the calculation, you get paid based on your portion of the mining pool you contribute, and the chance that the mining pool gets the reward is the ratio of the mining pools hashrate to the global hashrate. So if the global hashrate of a crypto is 100MH/s, and your mining pool has a hash rate of 10MH/s, and you are contributing 500KH/s; for a crypto currency that has an average block solve time of 2 minutes, every 2 minutes, your pool has a 1/10 chance of solving the block. When that happens, you get 5% (500KH/10MH) of the reward.

Multiply that out by the reward to get your reward per 2 minute block, then multiply to get reward per hour. Mulptily by the current market price to get the dollars per hour, which can be compared directly to spot prices for profitability.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import requests

def get_pool_hashrate() -> float:
response = requests.get("https://monero.hashvault.pro/api/pool/stats")
return response.json()['pool_statistics']['hashRate']

def get_monero_stats() -> dict:
response = requests.get("https://moneroblocks.info/api/get_stats")
return response.json()

def get_current_price() -> float:
response = requests.get("https://api.coinmarketcap.com/v1/ticker/monero/")
return response.json()[0]['price_usd']

def get_estimated_payout_per_hour(hashrate: float) -> float:
monero_stats = get_monery_stats()
last_reward = monero_stats["last_reward"] / 1E12 # convert to whole XMR
global_hashrate = monero_stats["hashrate"]
pool_hashrate = get_pool_hashrate()
current_price = get_current_price()

awarded_monero_per_second = last_reward / 120 # convert to XMR/s
percentage_of_pool = hashrate / (hashrate + pool_hashrate)
pool_percentage_of_global = (hashrate + pool_hashrate) / (hashrate + global_hashrate)

average_xmr_per_second = awarded_monero_per_second * percentage_of_pool * pool_percentage_of_global
xmr_per_hour = average_xmr_per_second * 3600
hourly_payout = xmr_per_hour * current_price
return hourly_payout

Getting Spot Prices

To get the spot prices, I used boto3. I then looked up the hardware on all of the boxes specialized for GPUs, looked up their hashrates, and multiplied it by the number of GPU’s available on those boxes.

That looked a little bit like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from boto3 import client

ec2 = client(service_name='ec2')

INSTANCE_TYPES = {
'p3.2xlarge': 1677,
'p3.8xlarge': 6708,
'p3.16xlarge': 13416,
'p2.xlarge': 480,
'p2.8xlarge': 3840,
'p2.16xlarge': 7680,
'g2.2xlarge': 360,
'g2.8xlarge': 720,
'g3.4xlarge': 410,
'g3.8xlarge': 820,
'g3.16xlarge': 1640,
}

def get_spot_price_for_az(availability_zone: str, instance_type: str) -> Optional[float]:
spot_price_history = ec2.describe_spot_price_history(
InstanceTypes=[instance_type],
AvailabilityZone=availability_zone
)
history = spot_price_history['SpotPriceHistory']
price = history[0]['SpotPrice']
return float(price)

def get_availability_zones() -> List[str]:
zones = ec2.decribe_availability_zones()
azs = zones['AvailabilityZones']
return [zone['ZoneName'] for zone in azs]

def get_prices() -> List[Tuple[str, str], float]:
zones = get_availability_zones()
instances = INSTANCES_TYPES=keys()
combos = [(az, instance) for az in zones for instance in instances]
results = []
for combo in combos:
price = get_spot_price_for_az(*combo)
if price:
results.append((combo, price))
return results

Putting it together

From here the rest is pretty straightforward, you take the list of prices with their paired hashrates, feed it into the payout per hour, and see if there are any where there payout is greater than the cost.

1
2
3
4
5
6
7
def main():
for az_instance, price, in get_prices():
az, instance_type = az_instance
hashrate = INSTANCE_TYPES[instance_type]
revenue = get_estimated_payout_per_hour(hashrate)
if revenue > price:
print(az_instance, revenue, price)

Refactoring for Functional

At this point I had my answer; no it was never profitable to rent hardward to mine monero. This script was quick and dirty, and in the interest of increasing the ratio of signal to noise in my code, and flexing my functional programming muscles, I started refactoring.

Note, the goal after a little bit shifted from increasing the signal to noise ratio to making it as functional programming style as possible.

Memoization

One of the big things that functional programming touts as one of its pillars is referential transparency. Any function can be replace with the value it would have returned and it should not affect the end result. As a result of this, pure functions can be easily memoized: their evaluations for specific arguemnts can be stored and used in place of calling the function again.

Luckily this is pretty easy to do in a number of ways in python.

The first is sometimes considered a bug, but it has to do with passing a mutable object to the default value of a function. To explore this, lets look at this example that has some unexpected results for beginners:

1
2
3
4
5
6
7
8
9
def add_to_list(value, list=[]):
list.append(value)
return list

first = add_to_list(1)
print(first)
second = add_to_list(2)
print(second)
print(first)

Now what most beginners would expect from this, is we default the argument list to an empty list: so first should be [1] and second should b e [2].

However this is the output of the above:

1
2
3
[1]
[1, 2]
[1, 2]

So what is happening here? Turns out when you pass a mutable data structure to a default argument in a function, all invocations of the function share the initial one. Which means all calls of add_to_list that don’t specify a list, share the default list.

We can use this oddity to our advantage for memoization:

1
2
3
4
5
def do_something_hard(arg, mem={}):
if arg in mem:
return mem[arg]
mem[arg] = hard_work(arg)
return mem[arg]

Or more succinctly using pythons dictionary’s setdefault:

1
2
def do_something_hard(arg, mem={}):
return mem.setdefault(arg, hard_work(arg))

Now all calls with the same arguments to do_something_hard will execute without calling hard_work. This is extremely helpful for things like API calls, since calls seconds apart from each other dont necessarily need to be made.

The other way to do memoization in python is with a decorator:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import collections
from functools import partial

class memoized:
"""
decorator for memoizing functions
"""

def __init__(self, func):
self.func = func
self.cache = {}

def __call__(self, *args):
if not isinstance(args, collections.Hashable):
return self.func(*args)
return self.cache.setdefault(args, self.func(*args))

def __repr__(self):
return self.func.__doc__

def __get__(self, instance, owner):
return partial(self.__call__, instance)

Now to implement our do_something_hard as a memoized function, we can just do:

1
2
3
@memoized
def do_something_hard(arg):
return hard_work(arg)

These two methods can be used to greatly increase the performance of the original code by not pounding the AWS API or Monero API’s. Instead we can memoize all of the values we need and do the calculations we need:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@memoized
def get_pool_hashrate() -> float:
response = requests.get("https://monero.hashvault.pro/api/pool/stats")
return response.json()['pool_statistics']['hashRate']

@memoized
def get_monero_stats() -> dict:
response = requests.get("https://moneroblocks.info/api/get_stats")
return response.json()

@memoized
def get_current_price() -> float:
response = requests.get("https://api.coinmarketcap.com/v1/ticker/monero/")
return response.json()[0]['price_usd']

Simplifying API Calls

All of those API calls to get the hashrates, prices, rewards have a lot of repeated code, and we have a call to get_monero_stats that needs to different keys pulled out. So lets cleanup these nested JSON lookups with a functional tool called lenses.

A lens is a function that given a keypath, will return a function, that given a nested structure, will return the value of that keypath in the structure.

1
2
3
4
5
6
7
def lens(keyList: List[str]):
def wraps(data) -> Any:
_data = data.copy() # dont mutate the reference
for key in keyList:
_data = _data[key]
return _data
return wraps

This would allow us to do something like:

1
2
3
4
5
6
7
8
9
response_json = {
'Body': {
"Foo": {
"Bar": "baz"
}
}
}
get_bar = lens(["Body", "Foo", "Bar"])
bar = get_bar(response_json) # == "baz"

This allows us to use a known structure, lens, to clean up some of that lookup code. At this point I went a little further, after playing the the command line tool jq, by allowing me to replace a dot separated path instead of a list, and also allowing for indexes to be passed:

1
2
3
4
5
6
7
8
9
KeyList = NewType('KeyList', List[Union[str, int]])

def key_path(keyPath: str) -> KeyPath:
def clean_key(key, str) -> Union[str, int]:
return int(key) if key.isdigit() else key
return list(map(clean_key, keyPath.split('.'))

def lens(keyList: KeyList):
# ...

Here I added a type, KeyList to make sure lens cant get anything but what I expect, a function key_path that takes a string and returns a KeyList so I can pass things like "Body.0.price_usd"" to it if there is a list in the json response.

Armed with memoized functions, lens, and KeyList, I can write a generic external data function:

1
2
3
4
5
def get_keypath_from_api(keyList: KeyList, api_url: str, mem={}) -> Callable[[], Any]:
def fetch():
body = mem.setdefault(api_url, requests.get(api_url).json())
return lens(keyList, body)
return fetch

We didn’t use @memoized here so I can memoized keyed off of url and not off of keyList, allowing multiple keys to be pulled from a single API response without remaking the call.

This allows our calls to be refactored as so:

1
2
3
4
current_price = get_keypath_from_api(key_path('0.price_usd'), "https://api.coinmarketcap.com/v1/ticker/monero/")
pool_hashrate = get_keypath_from_api(key_path('pool_statistics.hashRate'), 'https://monero.hashvault.pro/api/pool/stats')
monero_hashrate = get_keypath_from_api(key_path('hashrate'), "https://moneroblocks.info/api/get_stats")
last_reward = get_keypath_from_api(key_path('last_reward'), "https://moneroblocks.info/api/get_stats")

Now each of these items can be called (like current_price()) and the returned value will either be retrieved from the API or from memory if the call has already been made. Either way, we can now treat these functions as values, and we are sure that the minimum amount of work required will be done.

Python’s Partial

Lets readdress our lens function. Here we have a function of arity 1, that returns a callable. If we wanted a result of it right away, I would have to call it like

1
lens('0.price_usd')(data)

However we have the wraps inside function making it a bit messy. Ideally we could define the function do what we want, with all of the arguements, then if we pass less than all of the arguements, it just returns a function with the arguments given as partially applied.

Unlike elm, python does not allow this by default, however functools has a function, partial, that does allow us to do this. Lets use it to clean up our lens.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from functools import partial

def lens(keyList: KeyList, data: dict) -> Any:
_data = data.copy()
for key in keyList:
_data = _data[key]
return data

response_json = {
'Body': {
"Foo": {
"Bar": "baz"
}
}
}
get_bar = partial(lens, key_path("Body.Foo.Bar"))
bar = get_bar(response_json) # == "baz"
# or
bar = lens(key_path("Body.Foo.Bar"), response_json)

Here we can see that we can more cleanly define our functions and use partial to provide functional partial application where we need it.

Function Composition

One of the pillars of functional programming is functional composition. With all pure functions, you can take a chain of nested calls and pass them to compose to flatten out the structure. For instance instead of f(g(h(1))), we would like to flatten this to compose(f, g, h)(1). This makes seeing what is happening a lot more simple.

To implement a simple compose (functools compose only takes 2 functions), we can build a simple one:

1
2
3
4
from functools import reduce

def compose(*functions):
return reduce(lambda f, g: lambda x: f(g(x)), functions, lambda x: x)

Here we are reducing a list of functions, starting with a simple identity function, lambda x: x, then calling the next function with the previous result.

Pipelines

One of my favorite things of elixir and elm are pipelines. Being able to do something like

1
2
3
4
5
1
|> add 1
|> multiply 2
|> div 4
|> multiply 10

makes the signal to noise ratio very clear. It is very easy to see what is going on here. If you look carefully, a pipeline is nothing but a reversed compose, where instead of the functions appearing in outermost first, they appear in innermost first.

1
2
3
4
5
def pipeline(*functions):
return compose(*reversed(functions))

funcs = pipeline(h, g, f)
funcs(1)

This makes the order of the functions called a little bit clearer, but we still get a composed function back, in elm, we can start with a value. So less make an evaluate_pipeline, that allows us to pipe a value in:

1
2
def evaluate_pipeline(start, *functions):
return pipeline(*functions)(start)

This allows us to get pretty close to elm:

1
2
3
4
5
6
7
evaluate_pipeline(
1,
partial(add, 1),
partial(multiply, 2),
partial(div, 4)
partial(multiply, 10),
)

Helper Functions and Exceptions

Elm has a Maybe type, which allows it to handle errors. to get around some pesky errors, we can use python’s Optional type which is either something or None. To cast functions to this, we can write a function that catches exceptions and returns None:

1
2
3
4
5
def safe(func, *args) -> Optional[Callable[]]:
try:
return func(*args)
except Exception:
return None

This allows us to write some nice pipeline casting functions like:

1
toFloat = partial(safe, float)

Then we can put toFloat in a pipeline to safely cast a result to float. In a similar vein, we can write some helper functions for getting indexes:

1
2
3
4
5
6
7
8
9
10
def index(i, iterable) -> Optional[Any]:
try:
reteurn iterable[i]
except IndexError:
return None

first = head = partial(index, 0)
second = partial(index, 1)
third = partial(index, 2)
last = partial(index, -1)

To make the math bits more functional, I defined the basic operations:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def add (a, b):
return a + b

def _multiply(a, b):
return toFloat(a) * toFloat(b)

product = partial(safe, _multiply)

def _div(a, b):
return toFloat(a) / toFloat(b)

divided = under = partial(safe, _div)

def divides(b, a):
return safe(_div(a, b))

over = divides

Moving all of our utility functions to their own package, we can convert our original script to a series of composed pipelines:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
current_price = get_key_from_api(key_path('0.price_usd'), "https://api.coinmarketcap.com/v1/ticker/monero/")
pool_hashrate = get_key_from_api(key_path('pool_statistics.hashRate'), 'https://monero.hashvault.pro/api/pool/stats')
monero_hashrate = get_key_from_api(key_path('hashrate'), "https://moneroblocks.info/api/get_stats")
last_reward = get_key_from_api(key_path('last_reward'), "https://moneroblocks.info/api/get_stats")

@memoized
def get_estimated_payout_per_hour(hashrate: float) -> float:
return evaluate_pipeline(
last_reward(),
partial(over, 1E12), # convert to whole XMR
partial(over, 120), # split reward into average reward per second (1 block every 2 minutes)
partial(product, evaluate_pipeline(
pool_hashrate(),
partial(add, hashrate),
partial(under, hashrate)
)), # calculate pool percentage
partial(product, div(
evaluate_pipeline(pool_hashrate(), partial(add, hashrate)),
evaluate_pipeline(monero_hashrate(), partial(add, hashrate))
)), # calculate network percentage
partial(product, 3600), # calculate XMR per hour
partial(product, current_price()), # calculate profit
toFloat
)

@memoized
def get_availability_zones() -> List[str]:
return input_pipeline(
ec2.describe_availability_zones(),
partial(lens, key_path('AvailabilityZones')),
partial(map_lens, key_path('ZoneName'))
)

@memoized
def get_spot_price_for_az(availability_zone: str, instance_type: str) -> Optional[float]:
return input_pipeline(
ec2.describe_spot_price_history(
InstanceTypes=[instance_type],
AvailabilityZone=availability_zone
),
partial(lens, key_path('SpotPriceHistory.0.SpotPrice')),
toFloat,
)

@memoized
def get_prices() -> Tuple[Tuple[str, str], float]:
return input_pipeline(
get_availability_zones(),
partial(combinations, b=INSTANCE_TYPES.keys()),
partial(map, lambda key: (key, get_spot_price_for_az(*key))),
partial(filter, second)
)

def get_opportunities() -> List[Tuple[Tuple[str, str], float, float]]:
get_revenue = pipeline(
first,
second,
partial(INSTANCE_TYPES.get),
get_estimated_payout_per_hour,
)
return input_pipeline(
get_prices(),
partial(map, lambda x: x + (get_revenue(x),)),
)

def get_profit_opportunities() -> List[Tuple[Tuple[str, str], float, float]]:
return input_pipeline(
get_opportunities(),
partial(filter, lambda x: second(x) < last(x)),
partial(list)
)

With this final version, almost all of the functions are pure, memoized, and composed of pipelines. Since everything is called via functional composition, everything returns a generator, which allows the final filter or things like any to short circuit evaluation.

Conclusion

I had a lot of fun pushing python to the extreme of functional programming. Whether or not this is more readible and provides more a better signal to noise ratio than the regular procedural python is yet to be seen. Since python doesn’t easily allow for multi-threading/multip processing, the ability of pure functions to be easily computed over distributed computing doesn’t provide much of a benefit here. Guido historically hasn’t been a fan of functional programming (hence why reduce was relegated to functools in python3), and the optimizations made are not focused on this paradigm.