Compare commits

..

1 Commits

Author SHA1 Message Date
wangobango
5cdabb14eb added tests 2021-11-30 16:13:45 +01:00
18 changed files with 75 additions and 425 deletions

3
.gitignore vendored
View File

@ -2,5 +2,4 @@ __pycache__/*
venv/*
*.egg-info
__pycache__
.pytest_cache
something/*
.pytest_cache

View File

@ -1,12 +1,12 @@
FROM python:3
ENV DOCKER_APP True
USER root
WORKDIR /app
RUN git clone https://git.wmi.amu.edu.pl/s415366/pbr-ayct-core
COPY . .
#RUN chmod u+x setup_core.sh
#RUN bash setup_core.sh
RUN chmod u+x setup_core.sh
RUN bash setup_core.sh
RUN pip3 install -r requirements.txt
EXPOSE 5000/udp
EXPOSE 5000/tcp
CMD ["python3","-m", "ayct_backend.app"]
CMD ["python3", "ayct-backend/app.py"]

22
Jenkinsfile vendored
View File

@ -1,22 +0,0 @@
pipeline {
agent {
docker {
image 'python:latest'
args '-u root --privileged'
}
}
stages {
stage('build') {
steps {
sh 'bash setup_core.sh'
sh 'pip install -r requirements.txt'
sh 'pip install --user -e .'
}
}
stage('test') {
steps {
sh 'pytest'
}
}
}
}

0
ayct-backend/__init__.py Normal file
View File

20
ayct-backend/app.py Normal file
View File

@ -0,0 +1,20 @@
from flask import Flask
from flask_restful import Api, Resource
import os
import pbrAyctCore.core as core
app = Flask(__name__)
api = Api(app)
class Hello(Resource):
def get(self):
return core.getTestString()
api.add_resource(Hello, "/hello")
def create_app():
return app
if __name__ == "__main__":
port = int(os.environ.get('PORT', 5000))
app.run(host = '0.0.0.0', port = port)

View File

@ -0,0 +1,20 @@
import os
import tempfile
import pytest
from app import create_app
@pytest.fixture
def client():
app = create_app()
app.run(host = '0.0.0.0', port = 5000)
with app.test_client() as client:
with app.app_context():
pass
yield client
def test_first_endpoint(client):
rv = client.get("/hello")
assert b'All You Can Tweet' in rv.data

View File

@ -1,29 +0,0 @@
import os
from flask import Flask
from ayct_backend.twitter import twitter
from ayct_backend.campaign import campaign
from ayct_backend.models import db
#import pbrAyctCore.core as core
def create_app():
app = Flask('ayct-backend')
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY')
database_uri = os.getenv('DATABASE_URL')
if database_uri and database_uri.startswith("postgres://"):
database_uri = database_uri.replace("postgres://", "postgresql://", 1)
app.config['SQLALCHEMY_DATABASE_URI'] = database_uri
app.config['TWITTER_CONSUMER_KEY'] = os.getenv('TWITTER_CONSUMER_KEY')
app.config['TWITTER_CONSUMER_SECERT'] = os.getenv('TWITTER_CONSUMER_SECERT')
db.init_app(app)
db.create_all(app=app)
app.register_blueprint(twitter, url_prefix='/twitter')
app.register_blueprint(campaign, url_prefix='/campaign')
@app.route('/hello')
def hello():
return "Hello world!"
return app

View File

@ -1,7 +0,0 @@
import os
from . import create_app
if __name__ == "__main__":
app = create_app()
port = int(os.environ.get('PORT', 5000))
app.run(host = '0.0.0.0', port = port)

View File

@ -1,164 +0,0 @@
import requests
import json
from uuid import uuid4
from requests_oauthlib import OAuth1Session
from flask import Blueprint, current_app, request, jsonify
from ayct_backend.models import *
from ayct_backend.firebase import verify_token
campaign = Blueprint('campaign', __name__)
@campaign.route('/', methods=['GET'])
def get_twitter_campaigns():
decoded_token = verify_token(request.headers)
if not decoded_token:
return "Not authorised!", 401
user_id = decoded_token['sub']
twitter_campaigns = TwitterCampaign.query.filter_by(user_id=user_id).all()
campaigns = []
for campaign in twitter_campaigns:
campaign_data = {}
campaign_data['campaign_id'] = campaign.id
campaign_data['campaign_name'] = campaign.campaign_name
campaign_data['twitter_account_id'] = campaign.twitter_account_id
campaigns.append(campaign_data)
return jsonify({
"twitter_campaigns": campaigns
}), 200
@campaign.route('/', methods=['POST'])
def add_twitter_campaign():
decoded_token = verify_token(request.headers)
if not decoded_token:
return "Not authorised!", 401
user_id = decoded_token['sub']
content_type = request.headers.get('Content-Type')
if content_type != 'application/json':
return "Content-type not supported!", 400
request_json = request.json
if 'campaign_name' not in request_json or 'user_input' not in request_json or 'twitter_account_id' not in request_json:
return "Invalid request!", 400
consumer_key = current_app.config["TWITTER_CONSUMER_KEY"]
consumer_secret = current_app.config["TWITTER_CONSUMER_SECERT"]
twitter_account = TwitterAccount.query.filter_by(twitter_account_id=request_json['twitter_account_id']).first()
# generate campaign content
core_url = 'http://65.108.80.28:4999/generate'
payload = {
"data": request_json['user_input'],
"length": 420
}
#response = requests.post(core_url, headers={"Content-Type":"application/json"}, data=json.dumps(payload))
#generated_content = response.content.decode("utf-8").replace('<|endoftext|>', '').replace(' ', ' ')
# trimmed_text = generated_content[:270]
# dot_index = trimmed_text.rfind(".")
# space_index = trimmed_text.rfind(" ")
# if dot_index > 0:
# tweet_text = trimmed_text[:dot_index + 1].capitalize()
# else:
# tweet_text = trimmed_text[:space_index + 1].capitalize()
random_part = str(uuid4().hex)
tweet_text = "This is placeholder content generated by our application. Normal generation doesn't work." + random_part
# create post on twitter
oauth = OAuth1Session(
consumer_key,
client_secret=consumer_secret,
resource_owner_key=twitter_account.access_token,
resource_owner_secret=twitter_account.access_token_secret,
)
response = oauth.post(
"https://api.twitter.com/2/tweets",
json={"text": tweet_text},
).json()['data']
# save campaign to database
twitter_campaign = TwitterCampaign(
id = str(uuid4()),
user_id = user_id,
campaign_name = request_json['campaign_name'],
twitter_account_id = request_json['twitter_account_id'],
user_input = request_json['user_input'],
)
twitter_campaign_post = TwitterPost(
id = str(uuid4()),
campaign_id = twitter_campaign.id,
user_id = user_id,
post_content = tweet_text,
twitter_post_id = response['id']
)
db.session.add(twitter_campaign)
db.session.add(twitter_campaign_post)
db.session.commit()
return "Campaign succesfully created.", 201
@campaign.route('/', methods=['DELETE'])
def delete_twitter_campaign():
decoded_token = verify_token(request.headers)
if not decoded_token:
return "Not authorised!", 401
user_id = decoded_token['sub']
content_type = request.headers.get('Content-Type')
if content_type != 'application/json':
return "Content-type not supported!", 400
request_json = request.json
if 'campaign_id' not in request_json:
return "Invalid request!", 400
twitter_campaign = TwitterCampaign.query.filter_by(user_id=user_id, id=request_json['campaign_id']).first()
if not twitter_campaign:
return "Capmaign not found!", 404
db.session.delete(twitter_campaign)
db.session.commit()
return "Twitter campaign succesfully deleted.", 200
@campaign.route('/<campaign_id>', methods=['GET'])
def get_twitter_campaign_details(campaign_id):
decoded_token = verify_token(request.headers)
if not decoded_token:
return "Not authorised!", 401
user_id = decoded_token['sub']
twitter_campaign = TwitterCampaign.query.filter_by(user_id=user_id, id=campaign_id).first()
campaign_posts = TwitterPost.query.filter_by(user_id=user_id, campaign_id=campaign_id).all()
posts = []
for post in campaign_posts:
post_data = {}
post_data['post_id'] = post.id
post_data['campaign_id'] = post.campaign_id
post_data['post_content'] = post.post_content
post_data['twitter_post_id'] = post.twitter_post_id
posts.append(post_data)
return jsonify({
"campaign_id": twitter_campaign.id,
"campaign_name": twitter_campaign.campaign_name,
"user_input": twitter_campaign.user_input,
"twitter_account_id": twitter_campaign.twitter_account_id,
"posts": posts
}), 200

View File

@ -1,16 +0,0 @@
import os
import google.oauth2.id_token
import google.auth.transport.requests
HTTP_REQUEST = google.auth.transport.requests.Request()
AUDIENCE = os.environ.get('GOOGLE_CLOUD_PROJECT')
def verify_token(headers):
id_token = headers['Authorization'].split(' ').pop()
try:
claims = google.oauth2.id_token.verify_firebase_token(id_token, HTTP_REQUEST, audience=AUDIENCE)
except:
return None
return claims

View File

@ -1,32 +0,0 @@
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class TwitterAccount(db.Model):
__tablename__ = 'twitter_account'
id = db.Column(db.String(36), primary_key=True)
user_id = db.Column(db.String(64), nullable=False)
twitter_account_id = db.Column(db.String(32), nullable=False)
username = db.Column(db.String(16), nullable=False)
access_token = db.Column(db.String(256), nullable=False)
access_token_secret = db.Column(db.String(256), nullable=False)
class TwitterCampaign(db.Model):
__tablename__ = 'twitter_campaign'
id = db.Column(db.String(36), primary_key=True)
user_id = db.Column(db.String(64), nullable=False)
campaign_name = db.Column(db.String(64), nullable=False)
twitter_account_id = db.Column(db.String(32), nullable=False)
user_input = db.Column(db.String(100), nullable=False)
posts = db.relationship('TwitterPost', backref='campaign', lazy=True)
class TwitterPost(db.Model):
__tablename__ = 'twitter_post'
id = db.Column(db.String(36), primary_key=True)
campaign_id = db.Column(db.String(36), db.ForeignKey(TwitterCampaign.id), nullable=False)
user_id = db.Column(db.String(64), nullable=False)
post_content = db.Column(db.String(300), nullable=False)
twitter_post_id = db.Column(db.String(32), unique=True, nullable=False)

View File

@ -1,103 +0,0 @@
from uuid import uuid4
from requests_oauthlib import OAuth1Session
from flask import Blueprint, Response, current_app, request, jsonify
from ayct_backend.models import *
from ayct_backend.firebase import verify_token
twitter = Blueprint('twitter', __name__)
@twitter.route('/account', methods=['GET'])
def get_twitter_accounts():
decoded_token = verify_token(request.headers)
if not decoded_token:
return "Not authorised!", 401
user_id = decoded_token['sub']
twitter_accounts = TwitterAccount.query.filter_by(user_id=user_id).all()
accounts = []
for account in twitter_accounts:
account_data = {}
account_data['twitter_account_id'] = account.twitter_account_id
account_data['username'] = account.username
accounts.append(account_data)
return jsonify({
"twitter_accounts": accounts
}), 200
@twitter.route('/account', methods=['POST'])
def add_twitter_account():
decoded_token = verify_token(request.headers)
if not decoded_token:
return "Not authorised!", 401
user_id = decoded_token['sub']
content_type = request.headers.get('Content-Type')
if content_type != 'application/json':
return "Content-type not supported!", 400
request_json = request.json
if 'veryfier' not in request_json or 'oauth_token' not in request_json or 'oauth_token_secret' not in request_json:
return "Invalid request!", 400
consumer_key = current_app.config["TWITTER_CONSUMER_KEY"]
consumer_secret = current_app.config["TWITTER_CONSUMER_SECERT"]
access_token_url = "https://api.twitter.com/oauth/access_token"
oauth = OAuth1Session(
consumer_key,
client_secret = consumer_secret,
resource_owner_key = request_json['oauth_token'],
resource_owner_secret = request_json['oauth_token_secret'],
verifier = request_json['veryfier'],
)
oauth_tokens = oauth.fetch_access_token(access_token_url)
twitter_account = TwitterAccount.query.filter_by(user_id=user_id, twitter_account_id=oauth_tokens['user_id']).first()
if twitter_account:
return "Account already exists!", 409
twitter_account = TwitterAccount(
id = str(uuid4()),
user_id = user_id,
twitter_account_id = oauth_tokens['user_id'],
username = oauth_tokens['screen_name'],
access_token = oauth_tokens['oauth_token'],
access_token_secret = oauth_tokens['oauth_token_secret']
)
db.session.add(twitter_account)
db.session.commit()
return "Twitter account succesfully added.", 201
@twitter.route('/account', methods=['DELETE'])
def delete_twitter_account():
decoded_token = verify_token(request.headers)
if not decoded_token:
return "Not authorised!", 401
user_id = decoded_token['sub']
content_type = request.headers.get('Content-Type')
if content_type != 'application/json':
return "Content-type not supported!", 400
request_json = request.json
if 'twitter_account_id' not in request_json:
return "Invalid request!", 400
twitter_account = TwitterAccount.query.filter_by(user_id=user_id, twitter_account_id=request_json['twitter_account_id']).first()
if not twitter_account:
return "Account not found!", 404
db.session.delete(twitter_account)
db.session.commit()
return "Twitter account succesfully deleted.", 200

7
pyproject.toml Normal file
View File

@ -0,0 +1,7 @@
[build-system]
requires = [
"setuptools>=42",
"wheel",
"flask"
]
build-backend = "setuptools.build_meta"

View File

@ -1,7 +1,3 @@
Flask==2.0.2
flask_restful==0.3.9
flask_sqlalchemy==2.5.1
requests_oauthlib==1.3.0
psycopg2==2.9.3
pytest==6.2.5
google-api-python-client==2.36.0

View File

@ -1,5 +1,24 @@
[bdist_wheel]
universal = True
[metadata]
name = pbrAyctBackend
version = 0.0.1
author = Łukasz Ramon Piotrek
author_email = ramon.dyzman@gmail.com
description = A small example package
long_description = file: readme.md
long_description_content_type = text/markdown
url = https://git.wmi.amu.edu.pl/s434708/pbr-ayct-backend
project_urls =
Bug Tracker = https://github.com/pypa/sampleproject/issues
classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[tool:pytest]
testpaths = tests
[options]
package_dir =
= ayct-backend
packages = find:
python_requires = >=3.6
[options.packages.find]
where = src

View File

@ -1,18 +0,0 @@
from setuptools import find_packages
from setuptools import setup
setup(
name="ayct_backend",
version="1.0.0",
url="https://git.wmi.amu.edu.pl/s434708/pbr-ayct-backend",
license="BSD",
maintainer="RŁP",
maintainer_email="random@mail.com",
description="Backend for AYCT",
packages=find_packages(),
include_package_data=True,
zip_safe=False,
install_requires=["flask"],
extras_require={"test": ["pytest"]},
)

View File

@ -1,4 +1,3 @@
git clone https://git.wmi.amu.edu.pl/s415366/pbr-ayct-core
cd pbr-ayct-core
pip3 install setuptools
python3 -m pip install --upgrade build

View File

@ -1,19 +0,0 @@
import os
import tempfile
import pytest
from ayct_backend import create_app
@pytest.fixture
def app():
app = create_app()
yield app
@pytest.fixture
def client(app):
yield app.test_client()
def test_first_endpoint(client):
rv = client.get("/hello")
assert b'All You Can Tweet' in rv.data