Summarized From
http://www.sqlalchemy.org/docs/core/tutorial.html
This covers the basics of the Non-orm usage of SQLAlchemy
# Define connection engine
from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:', echo=True)
# Create some database tables
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
users = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('fullname', String),
)
addresses = Table('addresses', metadata,
Column('id', Integer, primary_key=True),
Column('user_id', None, ForeignKey('users.id')),
Column('email_address', String, nullable=False)
)
metadata.create_all(engine)
# Insert into the tables
ins = users.insert()
ins.values(name='jack', fullname='Jack the Pumpkin king')
conn = engine.connect()
result = conn.execute(ins)
result.inserted_primary_key
# Multiple insert into the tables
conn.execute(addresses.insert(), [
{'user_id': 1, 'email_address' : 'jack@yahoo.com'},
{'user_id': 1, 'email_address' : 'jack@msn.com'},
{'user_id': 2, 'email_address' : 'www@www.org'},
{'user_id': 2, 'email_address' : 'wendy@aol.com'},
])
result = engine.execute(users.insert(), name='fred', fullname="Fred Flintstone")
# Select from the tables
from sqlalchemy.sql import select
# Select all fields in user table
s = select([users])
result = conn.execute(s)
list(result)
# Select name and fullname columns from user table
s = select([users.c.name, users.c.fullname])
# Join two user and address tables
# Cartesian product
list(conn.execute(select([users, addresses])))
# Join on id columns
list(conn.execute(select([users, addresses], users.c.id==addresses.c.user_id)))
# create conditions for the where clause
from sqlalchemy.sql import and_, or_, not_
print and_(users.c.name.like('j%'), users.c.id==addresses.c.user_id, )
s = select([(users.c.fullname + "--" + addresses.c.email_address).label('title')])
print conn.execute(s).fetchall()
print users.join(addresses)
print users.join(addresses, addresses.c.email_address.like(users.c.name + '%'))
s = select([users.c.fullname], from_obj=[
users.join(addresses, addresses.c.email_address.like(users.c.name + '%'))
])
print conn.execute(s).fetchall()
# Delete records based on a condition
conn.execute(users.delete().where(users.c.name > 'm'))
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment