API_Server_and_Client_Example/apiserver_with_sqlite.py

160 lines
4.6 KiB
Python

from flask import Flask, request, jsonify, g
import sqlite3
make_table = r'''
CREATE TABLE IF NOT EXISTS counters
(counter_name TEXT PRIMARY KEY, counter_value INTEGER)
'''
add_counter = r'''
INSERT INTO counters
VALUES(?, ?)
'''
get_all_counters = r'''
SELECT * FROM counters;
'''
increment_counter = r'''
UPDATE counters
SET counter_value = (SELECT counter_value FROM counters WHERE counter_name = ?) + 1
WHERE counter_name = ?
'''
decrement_counter = r'''
UPDATE counters
SET counter_value = (SELECT counter_value FROM counters WHERE counter_name = ?) - 1
WHERE counter_name = ?
'''
app = Flask(__name__)
def get_db():
db = getattr(g, "_database", None)
if db is None:
db = g._database = sqlite3.connect("counters.db")
db.execute(make_table)
return db
def get_all_counters_as_dict():
cursor = get_db().execute(get_all_counters)
all_counters = cursor.fetchall()
print(all_counters)
return {counter[0]:counter[1] for counter in all_counters}
def add_counter_to_db(counter_name, counter_initial_value):
cursor = get_db().execute(add_counter, [counter_name, counter_initial_value])
get_db().commit()
def modify_counter_in_db(counter_name, increment):
if increment:
get_db().execute(increment_counter, [counter_name, counter_name])
else:
get_db().execute(decrement_counter, [counter_name, counter_name])
get_db().commit()
@app.route("/counters", methods=['GET', 'POST'])
def handle_counters_endpoint():
if request.method == 'GET':
response_dict = {
'all_counters': get_all_counters_as_dict()
}
return jsonify(response_dict), 200
elif request.method == 'POST':
if request.content_type == 'application/json':
body = request.get_json()
if not "new_counter_name" in body.keys() or not "new_counter_starting_value" in body.keys():
error_response = {
'status': 'ERROR',
'reason': 'Body for posting to /counters endpoint must contain both new_counter_name and new_counter_starting_value',
'input_body': body
}
return jsonify(error_response), 400
add_counter_to_db(request.get_json()['new_counter_name'], request.get_json()['new_counter_starting_value'])
return jsonify({'status': 'OK'}), 200
else:
error_response = {
'status': 'ERROR',
'reason': 'Content Type for endpoint /counters must be application/json'
}
return jsonify(error_response), 400
@app.route("/counters/<counter_name>", methods=['GET'])
def handle_counters_subcounter_endpoint(counter_name):
all_counters = get_all_counters_as_dict()
if not counter_name in all_counters.keys():
error_response = {
'status': 'ERROR',
'reason': 'Counter name {counter} does not exist'.format(counter=counter_name)
}
return jsonify(error_response), 404
data_response = {
'counter_name': counter_name,
'counter_value': all_counters[counter_name]
}
return jsonify(data_response), 200
@app.route("/counters/<counter_name>/increment", methods=['POST'])
def handle_counters_subcounter_increment(counter_name):
all_counters = get_all_counters_as_dict()
if not counter_name in all_counters.keys():
error_response = {
'status': 'ERROR',
'reason': 'Counter name {counter} does not exist'.format(counter=counter_name)
}
return jsonify(error_response), 404
data_response = {
'counter_name': counter_name,
'old_value': all_counters[counter_name],
'new_value': all_counters[counter_name] + 1
}
modify_counter_in_db(counter_name, True)
return jsonify(data_response), 200
@app.route("/counters/<counter_name>/decrement", methods=['POST'])
def handle_counters_subcounter_decrement(counter_name):
all_counters = get_all_counters_as_dict()
if not counter_name in all_counters.keys():
error_response = {
'status': 'ERROR',
'reason': 'Counter name {counter} does not exist'.format(counter=counter_name)
}
return jsonify(error_response), 404
data_response = {
'counter_name': counter_name,
'old_value': all_counters[counter_name],
'new_value': all_counters[counter_name] - 1
}
modify_counter_in_db(counter_name, False)
return jsonify(data_response), 200
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
if __name__ == '__main__':
app.run(debug=True)