SQL 테이블을 python에서 JSON으로 반환
저는 web.py에서 작은 웹 앱을 가지고 놀고 있으며, JSON 오브젝트를 반환하기 위한 URL을 설정하고 있습니다.python을 사용하여 SQL 테이블을 JSON으로 변환하는 가장 좋은 방법은 무엇입니까?
import json
import psycopg2
def db(database_name='pepe'):
return psycopg2.connect(database=database_name)
def query_db(query, args=(), one=False):
cur = db().cursor()
cur.execute(query, args)
r = [dict((cur.description[i][0], value) \
for i, value in enumerate(row)) for row in cur.fetchall()]
cur.connection.close()
return (r[0] if r else None) if one else r
my_query = query_db("select * from majorroadstiger limit %s", (3,))
json_output = json.dumps(my_query)
JSON 오브젝트의 배열을 얻을 수 있습니다.
>>> json_output
'[{"divroad": "N", "featcat": null, "countyfp": "001",...
또는 다음과 같은 경우:
>>> j2 = query_db("select * from majorroadstiger where fullname= %s limit %s",\
("Mission Blvd", 1), one=True)
단일 JSON 오브젝트가 생성됩니다.
>>> j2 = json.dumps(j2)
>>> j2
'{"divroad": "N", "featcat": null, "countyfp": "001",...
import sqlite3
import json
DB = "./the_database.db"
def get_all_users( json_str = False ):
conn = sqlite3.connect( DB )
conn.row_factory = sqlite3.Row # This enables column access by name: row['column_name']
db = conn.cursor()
rows = db.execute('''
SELECT * from Users
''').fetchall()
conn.commit()
conn.close()
if json_str:
return json.dumps( [dict(ix) for ix in rows] ) #CREATE JSON
return rows
메서드 no json 호출 중...
print get_all_users()
인쇄:
[(1, u'orvar', u'password123'), (2, u'kalle', u'password123')]
json을 사용하여 메서드를 호출하는 중...
print get_all_users( json_str = True )
인쇄:
[{"password": "password123", "id": 1, "name": "orvar"}, {"password": "password123", "id": 2, "name": "kalle"}]
개인적으로 나는 이런 종류의 SQLObject를 선호한다.빨리 더러운 테스트 코드를 수정해서 이걸 얻었어요
import simplejson
from sqlobject import *
# Replace this with the URI for your actual database
connection = connectionForURI('sqlite:/:memory:')
sqlhub.processConnection = connection
# This defines the columns for your database table. See SQLObject docs for how it
# does its conversions for class attributes <-> database columns (underscores to camel
# case, generally)
class Song(SQLObject):
name = StringCol()
artist = StringCol()
album = StringCol()
# Create fake data for demo - this is not needed for the real thing
def MakeFakeDB():
Song.createTable()
s1 = Song(name="B Song",
artist="Artist1",
album="Album1")
s2 = Song(name="A Song",
artist="Artist2",
album="Album2")
def Main():
# This is an iterable, not a list
all_songs = Song.select().orderBy(Song.q.name)
songs_as_dict = []
for song in all_songs:
song_as_dict = {
'name' : song.name,
'artist' : song.artist,
'album' : song.album}
songs_as_dict.append(song_as_dict)
print simplejson.dumps(songs_as_dict)
if __name__ == "__main__":
MakeFakeDB()
Main()
데이터를 전송하기 전에 어떻게 작업할 것인지에 대한 자세한 정보가 도움이 될 것입니다.json 모듈은 2.6 이후를 사용하는 경우 도움이 되는 덤프 및 로드 메서드를 제공합니다.http://docs.python.org/library/json.html
--편집필 --
어떤 라이브러리를 사용하고 있는지 모르기 때문에 그런 방법을 찾을 수 있을지 확실히 말씀드릴 수 없습니다.보통 쿼리 결과는 다음과 같이 처리됩니다(현재 작업 중인 kinterbasdb의 예).
qry = "Select Id, Name, Artist, Album From MP3s Order By Name, Artist"
# Assumes conn is a database connection.
cursor = conn.cursor()
cursor.execute(qry)
rows = [x for x in cursor]
cols = [x[0] for x in cursor.description]
songs = []
for row in rows:
song = {}
for prop, val in zip(cols, row):
song[prop] = val
songs.append(song)
# Create a string representation of your array of songs.
songsJSON = json.dumps(songs)
기입된 루프의 필요성을 없애기 위해 리스트 컴플리션을 할 수 있는 뛰어난 전문가가 있을 것입니다.그러나, 이것은 유효하고, 레코드를 취득하는 모든 라이브러리에 적응할 수 있는 것이어야 합니다.
모든 테이블에서 모든 데이터를 덤프하는 짧은 스크립트를 작성했습니다.이 스크립트는 name: value 컬럼의 딕트입니다.다른 솔루션과 달리 테이블이나 컬럼에 대한 정보는 필요 없습니다.모든 것을 찾아서 덤프하기만 하면 됩니다.누가 유용하게 써주길 바래!
from contextlib import closing
from datetime import datetime
import json
import MySQLdb
DB_NAME = 'x'
DB_USER = 'y'
DB_PASS = 'z'
def get_tables(cursor):
cursor.execute('SHOW tables')
return [r[0] for r in cursor.fetchall()]
def get_rows_as_dicts(cursor, table):
cursor.execute('select * from {}'.format(table))
columns = [d[0] for d in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def dump_date(thing):
if isinstance(thing, datetime):
return thing.isoformat()
return str(thing)
with closing(MySQLdb.connect(user=DB_USER, passwd=DB_PASS, db=DB_NAME)) as conn, closing(conn.cursor()) as cursor:
dump = {
table: get_rows_as_dicts(cursor, table)
for table in get_tables(cursor)
}
print(json.dumps(dump, default=dump_date, indent=2))
Postgres JSON 기능을 사용하여 Postgresql 서버에서 직접 JSON을 가져오는 옵션을 제공한 사람은 없는 것 같습니다.https://www.postgresql.org/docs/9.4/static/functions-json.html
python측에서 구문 분석, 루프 또는 메모리 소비는 없습니다.이것은, 100,000 행 또는 수백만 개의 행을 취급하는 경우에 정말로 고려할 필요가 있습니다.
from django.db import connection
sql = 'SELECT to_json(result) FROM (SELECT * FROM TABLE table) result)'
with connection.cursor() as cursor:
cursor.execute(sql)
output = cursor.fetchall()
다음과 같은 테이블:
id, value
----------
1 3
2 7
Python JSON 개체를 반환합니다.
[{"id": 1, "value": 3},{"id":2, "value": 7}]
그 후 사용json.dumps
JSON 문자열로 덤프하다
가장 간단한 방법은
사용하다json.dumps
단, datetime을 json serializer로 해석해야 하는 경우.
여기 내 것이 있다.
import MySQLdb, re, json
from datetime import date, datetime
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj))
conn = MySQLdb.connect(instance)
curr = conn.cursor()
curr.execute("SELECT * FROM `assets`")
data = curr.fetchall()
print json.dumps(data, default=json_serial)
json 덤프를 반환합니다.
json 덤프를 사용하지 않는 간단한 방법이 하나 더 있습니다. 여기서 헤더를 가져오고 zip to map을 사용하여 각각이 최종적으로 json으로 만들어졌지만 datetime은 json serializer로 변경되지 않습니다.
data_json = []
header = [i[0] for i in curr.description]
data = curr.fetchall()
for i in data:
data_json.append(dict(zip(header, i)))
print data_json
더 뎀즈의 답변을 psycopg2 버전으로 보충하겠습니다.
import psycopg2
import psycopg2.extras
import json
connection = psycopg2.connect(dbname=_cdatabase, host=_chost, port=_cport , user=_cuser, password=_cpassword)
cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor) # This line allows dictionary access.
#select some records into "rows"
jsonout= json.dumps([dict(ix) for ix in rows])
sqlite의 경우 connection.row_factory에 호출 가능한 값을 설정하고 쿼리 결과의 형식을 python 사전 개체로 변경할 수 있습니다.메뉴얼을 참조해 주세요.다음은 예를 제시하겠습니다.
import sqlite3, json
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
# col[0] is the column name
d[col[0]] = row[idx]
return d
def get_data_to_json():
conn = sqlite3.connect("database.db")
conn.row_factory = dict_factory
c = conn.cursor()
c.execute("SELECT * FROM table")
rst = c.fetchall() # rst is a list of dict
return jsonify(rst)
는, MSSQL Server 2008 을 할 수 .SELECT
.FOR JSON AUTO
EC. E..
SELECT name, surname FROM users FOR JSON AUTO
Json을 반환한다.
[{"name": "Jane","surname": "Doe" }, {"name": "Foo","surname": "Samantha" }, ..., {"name": "John", "surname": "boo" }]
10년 후 :) . 목록 이해 없음
다음과 같은 선택 쿼리에서 단일 행의 값을 반환합니다.
"select name,userid, address from table1 where userid = 1"
json
{ name : "name1", userid : 1, address : "adress1, street1" }
코드
cur.execute(f"select name,userid, address from table1 where userid = 1 ")
row = cur.fetchone()
desc = list(zip(*cur.description))[0] #To get column names
rowdict = dict(zip(desc,row))
jsondict = jsonify(rowdict) #Flask jsonify
cur.description
아래와 같은 튜플입니다. unzip
★★★★★★★★★★★★★★★★★」zip
을
(('name', None, None, None, None, None, None), ('userid', None, None, None, None, None, None), ('address', None, None, None, None, None, None))
위의 코드가 json 날짜 시간 필드를 내보낼 수 없음을 알려드리게 되어 죄송합니다.다음과 같이 표시됩니다.
....
, 'TimeCreated': datetime.datetime(2019, 6, 17, 9, 2, 17), 'Created': 1,
....
이로 인해 출력이 유효하지 않은 json이 됩니다.
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
Base = declarative_base()
metadata = Base.metadata
class UserTable(Base):
__tablename__ = 'UserTable'
Id = Column("ID", Integer, primary_key=True)
Name = Column("Name", String(100))
class UserTableDTO:
def __init__(self, ob):
self.Id = ob.Id
self.Name = ob.Name
rows = dbsession.query(Table).all()
json_string = [json.loads(json.dumps(UserTableDTO(ob).__dict__, default=lambda x: str(x)))for ob in rows]
print(json_string)
SQL 테이블을 포맷된 JSON으로 반환하고 @Whitecat과 같은 오류를 수정하는 간단한 예를 제시하겠습니다.
오류 datetime.datetime(1941, 10, 31, 0, 0)이 JSON 시리얼화 가능하지 않습니다.
이 예에서는 JSONEncoder를 사용해야 합니다.
import json
import pymssql
# subclass JSONEncoder
class DateTimeEncoder(JSONEncoder):
#Override the default method
def default(self, obj):
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
def mssql_connection():
try:
return pymssql.connect(server="IP.COM", user="USERNAME", password="PASSWORD", database="DATABASE")
except Exception:
print("\nERROR: Unable to connect to the server.")
exit(-1)
def query_db(query):
cur = mssql_connection().cursor()
cur.execute(query)
r = [dict((cur.description[i][0], value) for i, value in enumerate(row)) for row in cur.fetchall()]
cur.connection.close()
return r
def write_json(query_path):
# read sql from file
with open("../sql/my_sql.txt", 'r') as f:
sql = f.read().replace('\n', ' ')
# creating and writing to a json file and Encode DateTime Object into JSON using custom JSONEncoder
with open("../output/my_json.json", 'w', encoding='utf-8') as f:
json.dump(query_db(sql), f, ensure_ascii=False, indent=4, cls=DateTimeEncoder)
if __name__ == "__main__":
write_json()
# You get formatted my_json.json, for example:
[
{
"divroad":"N",
"featcat":null,
"countyfp":"001",
"date":"2020-08-28"
}
]
이 간단한 코드는 Flask api에서 작동했습니다.
@app.route('/output/json/')
def outputJson():
conn = sqlite3.connect('database.db')
conn.row_factory = sqlite3.Row
temp = conn.execute('SELECT * FROM users').fetchall()
conn.close()
result = [{k: item[k] for k in item.keys()} for item in temp]
return jsonify(result)
언급URL : https://stackoverflow.com/questions/3286525/return-sql-table-as-json-in-python
'sourcecode' 카테고리의 다른 글
asp.net core 1.0 web api는 camelcase를 사용합니다. (0) | 2023.02.11 |
---|---|
웹 사이트 섹션의 도메인이 다릅니다. (0) | 2023.02.11 |
Retrofit & Gson을 사용한 JSON 어레이 응답 해석 (0) | 2023.02.11 |
Angularjs 동적으로 설정된 속성 (0) | 2023.02.11 |
스프링 부트 Tomcat 액세스 로그 (0) | 2023.02.11 |