Token exchange is a powerful feature in Keycloak that allows you to trade one token for another. This is particularly useful in microservices architectures where you need to exchange tokens between different clients or services while maintaining security and user context.
In this guide, we'll implement a complete token exchange flow step-by-step, covering both the Keycloak configuration and the application code.
Token exchange (RFC 8693) allows a client to exchange one type of token for another. Common use cases include:
Before starting, ensure you have:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Client │ │ Keycloak │ │ Service B │ │ (Service A)│────────▶│ Server │────────▶│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ 1. Request Token │ │ │ Exchange │ │ │───────────────────────▶│ │ │ │ │ │ 2. Validate & Exchange│ │ │ │ │ │ 3. Return New Token │ │ │◀───────────────────────│ │ │ │ │ │ 4. Call Service B │ │ │────────────────────────────────────────────────▶│
# Login to Keycloak Admin Console # Navigate to: Master dropdown → Create Realm
Configuration:
token-exchange-demoThis is the client that will initiate the token exchange.
# Navigate to: Clients → Create Client
Basic Settings:
service-aCapability Config:
Settings:
http://localhost:8080/*http://localhost:8080Save the client and note down the Client Secret from the Credentials tab.
This is the client for which we want to exchange tokens.
# Navigate to: Clients → Create Client
Basic Settings:
service-bCapability Config:
Settings:
This is the most critical step. Keycloak uses fine-grained permissions to control token exchange.
# Navigate to: Clients → service-b → Permissions → Permissions Enabled: ON
# Navigate to: Clients → service-b → Permissions → token-exchange
Create a Policy:
service-a-client-policyservice-aApply Policy to Permission:
token-exchange permissionservice-a-client-policySome Keycloak versions require explicit feature enablement:
# Edit standalone.xml or standalone-ha.xml <subsystem xmlns="urn:jboss:domain:keycloak-server:1.1"> <web-context>auth</web-context> <features> <feature>token-exchange</feature> </features> </subsystem>
Or start Keycloak with:
./kc.sh start --features=token-exchange
# Navigate to: Users → Add user
Configuration:
testuser[email protected]Set Password:
password123First, authenticate as the user to get an initial token:
curl -X POST \ 'http://localhost:8080/realms/token-exchange-demo/protocol/openid-connect/token' \ -H 'Content-Type: application/x-www-form-urlencoded' \ -d 'grant_type=password' \ -d 'client_id=service-a' \ -d 'client_secret=YOUR_SERVICE_A_SECRET' \ -d 'username=testuser' \ -d 'password=password123' \ -d 'scope=openid profile email'
Response:
{ "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...", "expires_in": 300, "refresh_expires_in": 1800, "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "token_type": "Bearer" }
Save the access_token value.
Now exchange the token from Service A for a token valid for Service B:
curl -X POST \ 'http://localhost:8080/realms/token-exchange-demo/protocol/openid-connect/token' \ -H 'Content-Type: application/x-www-form-urlencoded' \ -d 'grant_type=urn:ietf:params:oauth:grant-type:token-exchange' \ -d 'client_id=service-a' \ -d 'client_secret=YOUR_SERVICE_A_SECRET' \ -d 'subject_token=YOUR_ACCESS_TOKEN_FROM_STEP_4.1' \ -d 'requested_token_type=urn:ietf:params:oauth:token-type:access_token' \ -d 'audience=service-b'
Response:
{ "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9...", "expires_in": 300, "token_type": "Bearer", "issued_token_type": "urn:ietf:params:oauth:token-type:access_token" }
import requests from typing import Dict, Optional class KeycloakTokenExchange: def __init__(self, base_url: str, realm: str, client_id: str, client_secret: str): self.base_url = base_url self.realm = realm self.client_id = client_id self.client_secret = client_secret self.token_endpoint = f"{base_url}/realms/{realm}/protocol/openid-connect/token" def get_user_token(self, username: str, password: str) -> Optional[Dict]: """Get initial user token""" data = { 'grant_type': 'password', 'client_id': self.client_id, 'client_secret': self.client_secret, 'username': username, 'password': password, 'scope': 'openid profile email' } response = requests.post(self.token_endpoint, data=data) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None def exchange_token(self, subject_token: str, target_audience: str) -> Optional[Dict]: """Exchange token for another service""" data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange', 'client_id': self.client_id, 'client_secret': self.client_secret, 'subject_token': subject_token, 'requested_token_type': 'urn:ietf:params:oauth:token-type:access_token', 'audience': target_audience } response = requests.post(self.token_endpoint, data=data) if response.status_code == 200: return response.json() else: print(f"Error: {response.status_code} - {response.text}") return None # Usage Example if __name__ == "__main__": # Initialize client kc_client = KeycloakTokenExchange( base_url="http://localhost:8080", realm="token-exchange-demo", client_id="service-a", client_secret="YOUR_SECRET_HERE" ) # Step 1: Get initial token print("Getting initial token...") token_response = kc_client.get_user_token("testuser", "password123") if token_response: access_token = token_response['access_token'] print(f"Access Token (first 50 chars): {access_token[:50]}...") # Step 2: Exchange token print("\nExchanging token for service-b...") exchanged_token = kc_client.exchange_token( subject_token=access_token, target_audience="service-b" ) if exchanged_token: new_token = exchanged_token['access_token'] print(f"Exchanged Token (first 50 chars): {new_token[:50]}...") print("\nToken exchange successful!")
const axios = require('axios'); class KeycloakTokenExchange { constructor(baseUrl, realm, clientId, clientSecret) { this.baseUrl = baseUrl; this.realm = realm; this.clientId = clientId; this.clientSecret = clientSecret; this.tokenEndpoint = `${baseUrl}/realms/${realm}/protocol/openid-connect/token`; } async getUserToken(username, password) { try { const params = new URLSearchParams({ grant_type: 'password', client_id: this.clientId, client_secret: this.clientSecret, username: username, password: password, scope: 'openid profile email' }); const response = await axios.post(this.tokenEndpoint, params.toString(), { headers: { 'Content-Type': 'application/x-www-form-urlencoded' } }); return response.data; } catch (error) { console.error('Error getting user token:', error.response?.data || error.message); return null; } } async exchangeToken(subjectToken, targetAudience) { try { const params = new URLSearchParams({ grant_type: 'urn:ietf:params:oauth:grant-type:token-exchange', client_id: this.clientId, client_secret: this.clientSecret, subject_token: subjectToken, requested_token_type: 'urn:ietf:params:oauth:token-type:access_token', audience: targetAudience }); const response = await axios.post(this.tokenEndpoint, params.toString(), { headers: { 'Content-Type': 'application/x-www-form-urlencoded' } }); return response.data; } catch (error) { console.error('Error exchanging token:', error.response?.data || error.message); return null; } } } // Usage Example (async () => { const kcClient = new KeycloakTokenExchange( 'http://localhost:8080', 'token-exchange-demo', 'service-a', 'YOUR_SECRET_HERE' ); // Get initial token console.log('Getting initial token...'); const tokenResponse = await kcClient.getUserToken('testuser', 'password123'); if (tokenResponse) { const accessToken = tokenResponse.access_token; console.log(`Access Token (first 50 chars): ${accessToken.substring(0, 50)}...`); // Exchange token console.log('\nExchanging token for service-b...'); const exchangedToken = await kcClient.exchangeToken(accessToken, 'service-b'); if (exchangedToken) { const newToken = exchangedToken.access_token; console.log(`Exchanged Token (first 50 chars): ${newToken.substring(0, 50)}...`); console.log('\nToken exchange successful!'); } } })();
package main import ( "encoding/json" "fmt" "io" "net/http" "net/url" "strings" ) type KeycloakClient struct { BaseURL string Realm string ClientID string ClientSecret string TokenEndpoint string } type TokenResponse struct { AccessToken string `json:"access_token"` ExpiresIn int `json:"expires_in"` RefreshToken string `json:"refresh_token,omitempty"` TokenType string `json:"token_type"` IssuedTokenType string `json:"issued_token_type,omitempty"` } func NewKeycloakClient(baseURL, realm, clientID, clientSecret string) *KeycloakClient { return &KeycloakClient{ BaseURL: baseURL, Realm: realm, ClientID: clientID, ClientSecret: clientSecret, TokenEndpoint: fmt.Sprintf("%s/realms/%s/protocol/openid-connect/token", baseURL, realm), } } func (kc *KeycloakClient) GetUserToken(username, password string) (*TokenResponse, error) { data := url.Values{ "grant_type": {"password"}, "client_id": {kc.ClientID}, "client_secret": {kc.ClientSecret}, "username": {username}, "password": {password}, "scope": {"openid profile email"}, } resp, err := http.Post(kc.TokenEndpoint, "application/x-www-form-urlencoded", strings.NewReader(data.Encode())) if err != nil { return nil, err } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("error: %d - %s", resp.StatusCode, string(body)) } var tokenResp TokenResponse if err := json.Unmarshal(body, &tokenResp); err != nil { return nil, err } return &tokenResp, nil } func (kc *KeycloakClient) ExchangeToken(subjectToken, targetAudience string) (*TokenResponse, error) { data := url.Values{ "grant_type": {"urn:ietf:params:oauth:grant-type:token-exchange"}, "client_id": {kc.ClientID}, "client_secret": {kc.ClientSecret}, "subject_token": {subjectToken}, "requested_token_type": {"urn:ietf:params:oauth:token-type:access_token"}, "audience": {targetAudience}, } resp, err := http.Post(kc.TokenEndpoint, "application/x-www-form-urlencoded", strings.NewReader(data.Encode())) if err != nil { return nil, err } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("error: %d - %s", resp.StatusCode, string(body)) } var tokenResp TokenResponse if err := json.Unmarshal(body, &tokenResp); err != nil { return nil, err } return &tokenResp, nil } func main() { // Initialize client kcClient := NewKeycloakClient( "http://localhost:8080", "token-exchange-demo", "service-a", "YOUR_SECRET_HERE", ) // Get initial token fmt.Println("Getting initial token...") tokenResp, err := kcClient.GetUserToken("testuser", "password123") if err != nil { fmt.Printf("Error: %v\n", err) return } fmt.Printf("Access Token (first 50 chars): %s...\n", tokenResp.AccessToken[:50]) // Exchange token fmt.Println("\nExchanging token for service-b...") exchangedToken, err := kcClient.ExchangeToken(tokenResp.AccessToken, "service-b") if err != nil { fmt.Printf("Error: %v\n", err) return } fmt.Printf("Exchanged Token (first 50 chars): %s...\n", exchangedToken.AccessToken[:50]) fmt.Println("\nToken exchange successful!") }
Use jwt.io or decode programmatically:
import jwt def decode_token(token): # Decode without verification (for inspection only) decoded = jwt.decode(token, options={"verify_signature": False}) return decoded # Original token original = decode_token(access_token) print("Original Token Claims:") print(f" Audience: {original.get('aud')}") print(f" Issued for: {original.get('azp')}") print(f" Subject: {original.get('sub')}") # Exchanged token exchanged = decode_token(new_token) print("\nExchanged Token Claims:") print(f" Audience: {exchanged.get('aud')}") print(f" Issued for: {exchanged.get('azp')}") print(f" Subject: {exchanged.get('sub')}")
The exchanged token should show:
aud): Changed to service-bazp): Still service-a (the requester)sub): Same user ID (preserved identity)Allow Service A to impersonate users:
# Enable impersonation permission # Navigate to: Clients → service-b → Permissions → impersonate # Apply the same service-a-client-policy
Exchange with impersonation:
curl -X POST \ 'http://localhost:8080/realms/token-exchange-demo/protocol/openid-connect/token' \ -d 'grant_type=urn:ietf:params:oauth:grant-type:token-exchange' \ -d 'client_id=service-a' \ -d 'client_secret=YOUR_SECRET' \ -d 'subject_token=SERVICE_A_TOKEN' \ -d 'requested_subject=testuser' \ -d 'audience=service-b'
Exchange tokens from external providers:
# Configure Identity Provider in Keycloak # Navigate to: Identity Providers → Add provider (Google, GitHub, etc.) # Exchange external token curl -X POST \ 'http://localhost:8080/realms/token-exchange-demo/protocol/openid-connect/token' \ -d 'grant_type=urn:ietf:params:oauth:grant-type:token-exchange' \ -d 'client_id=service-a' \ -d 'client_secret=YOUR_SECRET' \ -d 'subject_token=GOOGLE_ACCESS_TOKEN' \ -d 'subject_issuer=google' \ -d 'subject_token_type=urn:ietf:params:oauth:token-type:access_token' \ -d 'audience=service-b'
Request tokens with limited scopes:
curl -X POST \ 'http://localhost:8080/realms/token-exchange-demo/protocol/openid-connect/token' \ -d 'grant_type=urn:ietf:params:oauth:grant-type:token-exchange' \ -d 'client_id=service-a' \ -d 'client_secret=YOUR_SECRET' \ -d 'subject_token=ORIGINAL_TOKEN' \ -d 'audience=service-b' \ -d 'scope=profile email'
Always validate tokens before trusting them:
from jose import jwt, JWTError import requests def validate_token(token, realm_url): # Get public keys from Keycloak certs_url = f"{realm_url}/protocol/openid-connect/certs" certs = requests.get(certs_url).json() try: # Validate and decode decoded = jwt.decode( token, certs, algorithms=['RS256'], audience='service-b', options={'verify_aud': True} ) return decoded except JWTError as e: print(f"Token validation failed: {e}") return None
1. "Token exchange not permitted" Error
Solution: Verify that: - Token exchange permission is enabled for target client - Proper policy is created and applied - Source client is included in the policy
2. "Feature not enabled" Error
Solution: Enable token-exchange feature: ./kc.sh start --features=token-exchange
3. "Invalid audience" Error
Solution: Ensure the audience parameter matches the target client ID exactly
4. "Expired token" Error
Solution: Check token expiration times and implement token refresh
Enable debug logging in Keycloak:
<!-- standalone.xml --> <logger category="org.keycloak.protocol.oidc"> <level name="DEBUG"/> </logger>
Token exchange in Keycloak provides a secure way to propagate identity across microservices while maintaining fine-grained access control. Key takeaways:
This implementation guide gives you a production-ready starting point for implementing token exchange in your applications. Adjust security settings and token lifetimes based on your specific requirements.